协程
协程池
goroutine 调度时机有哪些
- go 创建一个新的 goroutine,Go scheduler 会考虑调度
- GC
- 系统调用
- 内存同步访问:atomic,mutex,channel 操作等会使 goroutine 阻塞,因此会被调度走。
goroutine和线程的区别
- 内存占用:一个 goroutine 的栈内存消耗为 2 KB,栈空间不够用,会自动进行扩容。一个 thread 则需要消耗 1 MB 栈内存。
- 创建和销毀:Thread 创建和销毀都会有巨大的消耗,因为要和操作系统打交道,是内核级的,通常解决的办法就是线程池。而 goroutine 因为是由 Go runtime 负责管理的,创建和销毁的消耗非常小,是用户级。
- 切换:当 threads 切换时,需要保存各种寄存器,以便将来恢复。而 goroutines 切换只需保存三个寄存器:Program Counter, Stack Pointer and BP。一般而言,线程切换会消耗 1000-1500 纳秒,Goroutine 的切换约为 200 ns。
并发的安全退出
- close + sync
package main
import (
"fmt"
"time"
"sync"
)
func worker(wg *sync.WaitGroup, cannel chan bool) {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-cannel:
fmt.Println("cancle")
return //退出
}
}
}
func main() {
cancel := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(&wg, cancel)
}
time.Sleep(time.Second)
close(cancel) //接收一个关闭通道的广播
wg.Wait()
}
- context包(推荐)
package main
import (
"fmt"
"time"
"sync"
"context"
)
func worker(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-ctx.Done()://接收到取消后,执行的操作
return ctx.Err()
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(ctx, &wg)
}
time.Sleep(time.Second)
cancel() //接收一个取消的操作
wg.Wait()
}
异常
即使在包内部使用了panic,但是在导出函数时会被转化为明确的错误值。
2层嵌套的defer函数中直接调用recover和1层defer函数中调用包装的MyRecover函数一样,都是经过了2个函数帧才到达真正的recover函数,这个时候Goroutine的对应上一级栈帧中已经没有异常信息。
recover必须在defer函数中运行
func ParseJSON(input string) (s *Syntax, err error) {
defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("JSON: internal error: %v", p)
}
}()
// ...parser...
}
独占CPU导致其它Goroutine饿死
func main() {
runtime.GOMAXPROCS(1)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(i)
}
}()
for {
runtime.Gosched() //解决办法1:出让时间片
}
//select{} ////解决办法2:阻塞
}
不同Goroutine之间不满足顺序一致性内存模型
因为在不同的Goroutine,main函数可能无法观测到done的状态变化, 那么for循环会陷入死循环:
var msg string
var done bool = false
func main() {
runtime.GOMAXPROCS(1)
go func() {
msg = "hello, world"
done = true
}()
for {
if done {
println(msg)
break
}
}
}
解决办法:用显示同步
var msg string
var done = make(chan bool)
func main() {
runtime.GOMAXPROCS(1)
go func() {
msg = "hello, world"
done <- true
}()
<-done
println(msg)
}
Goroutine泄露
func main() {
ch := func() <-chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
} ()
return ch
}()
for v := range ch {
fmt.Println(v)
if v == 5 {
break //退出后,Goroutine无法回收
}
}
}
加context包解决
func main() {
ctx, cancel := context.WithCancel(context.Background())
ch := func(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
select {
case <- ctx.Done():
return
case ch <- i:
}
}
} ()
return ch
}(ctx)
for v := range ch {
fmt.Println(v)
if v == 5 {
cancel()
break
}
}
}
goroutine和panic
每个goroutine必须自己捕获panic,如果未作处理,panic会导致主main退出,即使mian作了recover,也不能阻止
协程的优雅退出
- for range
go func(in <-chan int) {
// Using for-range to exit goroutine
// range has the ability to detect the close/end of a channel
for x := range in {
fmt.Printf("Process %d\n", x)
}
}(inCh)
- 使用,ok退出
go func() {
// in for-select using ok to exit goroutine
for {
select {
case x, ok := <-in:
if !ok {
return
}
fmt.Printf("Process %d\n", x)
processedCnt++
case <-t.C:
fmt.Printf("Working, processedCnt = %d\n", processedCnt)
}
}
}()
如何避免 Go 语言中的 Goroutine Leak(协程泄漏)
Goroutine 被创建后,由于某些原因无法正常退出,导致它一直存活,占用资源。泄漏的 Goroutine 不会被 GC(垃圾回收)自动回收,因为它们仍然在运行或被其他对象引用。
原因
- goroutine由于channel的读/写端退出而一直阻塞,导致goroutine一直占用资源,而无法退出
- goroutine进入死循环中,导致资源一直无法释放
常见场景
阻塞性 Channel 操作
// 错误示例:若 ch 未被写入,goroutine 将永远阻塞
go func() {
data := <-ch
// ...
}()
解决方法:
- 使用带缓冲的 Channel:确保发送方不被阻塞。
- 超时控制:通过 select 和 time.After 设置超时。
- Context 取消:使用 context.Context 传递终止信号。
go func(ctx context.Context) { select { case data := <-ch: // 正常处理 case <-ctx.Done(): // 收到取消信号,退出 case <-time.After(2 * time.Second): // 超时退出 } }(ctx)
未关闭的 Channel
// 发送方未关闭 Channel,导致接收方 Goroutine 无限等待。
// 错误示例:若 ch 未被关闭,range 会一直阻塞
go func() {
for data := range ch {
// 处理数据
}
}()
解决方法:
- 明确关闭 Channel:当数据发送完毕后,由发送方关闭 Channel。
- 结合 select 和退出信号:允许主动终止接收。
go func() { for { select { case data, ok := <-ch: if !ok { // Channel 已关闭 return } // 处理数据 case <-done: // 外部终止信号 return } } }()
无限循环未设退出条件
// 错误示例:无限循环且无法终止
go func() {
for {
// 持续执行任务
}
}()
解决方法:
- 通过 Channel 或 Context 传递退出信号。
go func(ctx context.Context) { for { select { case <-ctx.Done(): return // 收到取消信号退出 default: // 执行任务 } } }(ctx)
使用 Context 传递终止信号
// 通过 context.Context 统一管理 Goroutine 的生命周期:
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
return // 收到取消信号退出
case data := <-ch:
// 处理数据
}
}()
// 需要终止时调用 cancel()
cancel()
使用 sync.WaitGroup 同步
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// 执行任务
}()
wg.Wait() // 等待所有 Goroutine 完成
总结
- 使用 context.Context 或 chan 传递退出信号 避免 Goroutine 无法退出。
- 避免永久阻塞:通过超时、非阻塞操作或退出信号。
- 关闭无用 Channel:通知接收方资源已释放。
- pprof 监控运行中的 Goroutine 数量,可进一步定位潜在泄漏。