协程
协程池
goroutine 调度时机有哪些
- go 创建一个新的 goroutine,Go scheduler 会考虑调度
- GC
- 系统调用
- 内存同步访问:atomic,mutex,channel 操作等会使 goroutine 阻塞,因此会被调度走。
goroutine和线程的区别
- 内存占用:一个 goroutine 的栈内存消耗为 2 KB,栈空间不够用,会自动进行扩容。一个 thread 则需要消耗 1 MB 栈内存。
- 创建和销毀:Thread 创建和销毀都会有巨大的消耗,因为要和操作系统打交道,是内核级的,通常解决的办法就是线程池。而 goroutine 因为是由 Go runtime 负责管理的,创建和销毁的消耗非常小,是用户级。
- 切换:当 threads 切换时,需要保存各种寄存器,以便将来恢复。而 goroutines 切换只需保存三个寄存器:Program Counter, Stack Pointer and BP。一般而言,线程切换会消耗 1000-1500 纳秒,Goroutine 的切换约为 200 ns。
并发的安全退出
- close + sync
package main
import (
"fmt"
"time"
"sync"
)
func worker(wg *sync.WaitGroup, cannel chan bool) {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-cannel:
fmt.Println("cancle")
return //退出
}
}
}
func main() {
cancel := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(&wg, cancel)
}
time.Sleep(time.Second)
close(cancel) //接收一个关闭通道的广播
wg.Wait()
}
- context包(推荐)
package main
import (
"fmt"
"time"
"sync"
"context"
)
func worker(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-ctx.Done()://接收到取消后,执行的操作
return ctx.Err()
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(ctx, &wg)
}
time.Sleep(time.Second)
cancel() //接收一个取消的操作
wg.Wait()
}
异常
即使在包内部使用了panic,但是在导出函数时会被转化为明确的错误值。
2层嵌套的defer函数中直接调用recover和1层defer函数中调用包装的MyRecover函数一样,都是经过了2个函数帧才到达真正的recover函数,这个时候Goroutine的对应上一级栈帧中已经没有异常信息。
recover必须在defer函数中运行
func ParseJSON(input string) (s *Syntax, err error) {
defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("JSON: internal error: %v", p)
}
}()
// ...parser...
}
独占CPU导致其它Goroutine饿死
func main() {
runtime.GOMAXPROCS(1)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(i)
}
}()
for {
runtime.Gosched() //解决办法1:出让时间片
}
//select{} ////解决办法2:阻塞
}
不同Goroutine之间不满足顺序一致性内存模型
因为在不同的Goroutine,main函数可能无法观测到done的状态变化, 那么for循环会陷入死循环:
var msg string
var done bool = false
func main() {
runtime.GOMAXPROCS(1)
go func() {
msg = "hello, world"
done = true
}()
for {
if done {
println(msg)
break
}
}
}
解决办法:用显示同步
var msg string
var done = make(chan bool)
func main() {
runtime.GOMAXPROCS(1)
go func() {
msg = "hello, world"
done <- true
}()
<-done
println(msg)
}
Goroutine泄露
func main() {
ch := func() <-chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
} ()
return ch
}()
for v := range ch {
fmt.Println(v)
if v == 5 {
break //退出后,Goroutine无法回收
}
}
}
加context包解决
func main() {
ctx, cancel := context.WithCancel(context.Background())
ch := func(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
select {
case <- ctx.Done():
return
case ch <- i:
}
}
} ()
return ch
}(ctx)
for v := range ch {
fmt.Println(v)
if v == 5 {
cancel()
break
}
}
}
goroutine和panic
每个goroutine必须自己捕获panic,如果未作处理,panic会导致主main退出,即使mian作了recover,也不能阻止
goroutine leak(协程泄露)
原因
- goroutine由于channel的读/写端退出而一直阻塞,导致goroutine一直占用资源,而无法退出
- goroutine进入死循环中,导致资源一直无法释放
协程的优雅退出
- for range
go func(in <-chan int) {
// Using for-range to exit goroutine
// range has the ability to detect the close/end of a channel
for x := range in {
fmt.Printf("Process %d\n", x)
}
}(inCh)
- 使用,ok退出
go func() {
// in for-select using ok to exit goroutine
for {
select {
case x, ok := <-in:
if !ok {
return
}
fmt.Printf("Process %d\n", x)
processedCnt++
case <-t.C:
fmt.Printf("Working, processedCnt = %d\n", processedCnt)
}
}
}()
- context包