learning_notes

学习笔记

View project on GitHub

协程

协程池

goroutine 调度时机有哪些

  • go 创建一个新的 goroutine,Go scheduler 会考虑调度
  • GC
  • 系统调用
  • 内存同步访问:atomic,mutex,channel 操作等会使 goroutine 阻塞,因此会被调度走。

goroutine和线程的区别

  • 内存占用:一个 goroutine 的栈内存消耗为 2 KB,栈空间不够用,会自动进行扩容。一个 thread 则需要消耗 1 MB 栈内存。
  • 创建和销毀:Thread 创建和销毀都会有巨大的消耗,因为要和操作系统打交道,是内核级的,通常解决的办法就是线程池。而 goroutine 因为是由 Go runtime 负责管理的,创建和销毁的消耗非常小,是用户级。
  • 切换:当 threads 切换时,需要保存各种寄存器,以便将来恢复。而 goroutines 切换只需保存三个寄存器:Program Counter, Stack Pointer and BP。一般而言,线程切换会消耗 1000-1500 纳秒,Goroutine 的切换约为 200 ns。

并发的安全退出

  1. close + sync
package main

import (
	"fmt"
	"time"
	"sync"
)

func worker(wg *sync.WaitGroup, cannel chan bool) {
	defer wg.Done()

	for {
		select {
		default:
			fmt.Println("hello")
		case <-cannel:
			fmt.Println("cancle")
			return //退出
		}
	}
}

func main() {
	cancel := make(chan bool)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go worker(&wg, cancel)
	}

	time.Sleep(time.Second)
	close(cancel) //接收一个关闭通道的广播
	wg.Wait()
}

  1. context包(推荐)
package main

import (
	"fmt"
	"time"
	"sync"
	"context"
)

func worker(ctx context.Context, wg *sync.WaitGroup) error {
	defer wg.Done()

	for {
		select {
		default:
			fmt.Println("hello")
		case <-ctx.Done()://接收到取消后,执行的操作
			return ctx.Err()
		}
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go worker(ctx, &wg)
	}

	time.Sleep(time.Second)
	cancel() //接收一个取消的操作

	wg.Wait()
}

异常

即使在包内部使用了panic,但是在导出函数时会被转化为明确的错误值。

2层嵌套的defer函数中直接调用recover和1层defer函数中调用包装的MyRecover函数一样,都是经过了2个函数帧才到达真正的recover函数,这个时候Goroutine的对应上一级栈帧中已经没有异常信息。

recover必须在defer函数中运行

func ParseJSON(input string) (s *Syntax, err error) {
    defer func() {
        if p := recover(); p != nil {
            err = fmt.Errorf("JSON: internal error: %v", p)
        }
    }()
    // ...parser...
}

独占CPU导致其它Goroutine饿死

func main() {
	runtime.GOMAXPROCS(1)

	go func() {
		for i := 0; i < 10; i++ {
			fmt.Println(i)
		}
	}()

	for {
		runtime.Gosched() //解决办法1:出让时间片
	}
	//select{} ////解决办法2:阻塞
}

不同Goroutine之间不满足顺序一致性内存模型

因为在不同的Goroutine,main函数可能无法观测到done的状态变化, 那么for循环会陷入死循环:

var msg string
var done bool = false

func main() {
	runtime.GOMAXPROCS(1)

	go func() {
		msg = "hello, world"
		done = true
	}()

	for {
		if done {
			println(msg)
			break
		}
	}
}

解决办法:用显示同步

var msg string
var done = make(chan bool)

func main() {
	runtime.GOMAXPROCS(1)

	go func() {
		msg = "hello, world"
		done <- true
	}()

	<-done
	println(msg)
}

Goroutine泄露

func main() {
	ch := func() <-chan int {
		ch := make(chan int)
		go func() {
			for i := 0; ; i++ {
				ch <- i
			}
		} ()
		return ch
	}()

	for v := range ch {
		fmt.Println(v)
		if v == 5 {
			break //退出后,Goroutine无法回收
		}
	}
}

加context包解决

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	ch := func(ctx context.Context) <-chan int {
		ch := make(chan int)
		go func() {
			for i := 0; ; i++ {
				select {
				case <- ctx.Done():
					return
				case ch <- i:
				}
			}
		} ()
		return ch
	}(ctx)

	for v := range ch {
		fmt.Println(v)
		if v == 5 {
			cancel()
			break
		}
	}
}

goroutine和panic

每个goroutine必须自己捕获panic,如果未作处理,panic会导致主main退出,即使mian作了recover,也不能阻止

协程的优雅退出

  1. for range
go func(in <-chan int) {
    // Using for-range to exit goroutine
    // range has the ability to detect the close/end of a channel
    for x := range in {
        fmt.Printf("Process %d\n", x)
    }
}(inCh)

  1. 使用,ok退出
go func() {
    // in for-select using ok to exit goroutine
    for {
        select {
        case x, ok := <-in:
            if !ok {
                return
            }
            fmt.Printf("Process %d\n", x)
            processedCnt++
        case <-t.C:
            fmt.Printf("Working, processedCnt = %d\n", processedCnt)
        }
    }
}()

如何避免 Go 语言中的 Goroutine Leak(协程泄漏)

Goroutine 被创建后,由于某些原因无法正常退出,导致它一直存活,占用资源。泄漏的 Goroutine 不会被 GC(垃圾回收)自动回收,因为它们仍然在运行或被其他对象引用。

原因

  1. goroutine由于channel的读/写端退出而一直阻塞,导致goroutine一直占用资源,而无法退出
  2. goroutine进入死循环中,导致资源一直无法释放

常见场景

阻塞性 Channel 操作
// 错误示例:若 ch 未被写入,goroutine 将永远阻塞
go func() {
    data := <-ch
    // ...
}()

解决方法:

  • 使用带缓冲的 Channel:确保发送方不被阻塞。
  • 超时控制:通过 select 和 time.After 设置超时。
  • Context 取消:使用 context.Context 传递终止信号。
    go func(ctx context.Context) {
      select {
      case data := <-ch:
          // 正常处理
      case <-ctx.Done():
          // 收到取消信号,退出
      case <-time.After(2 * time.Second):
          // 超时退出
      }
    }(ctx)
    
未关闭的 Channel
// 发送方未关闭 Channel,导致接收方 Goroutine 无限等待。
// 错误示例:若 ch 未被关闭,range 会一直阻塞
go func() {
    for data := range ch {
        // 处理数据
    }
}()

解决方法:

  • 明确关闭 Channel:当数据发送完毕后,由发送方关闭 Channel。
  • 结合 select 和退出信号:允许主动终止接收。
    go func() {
      for {
          select {
          case data, ok := <-ch:
              if !ok { // Channel 已关闭
                  return
              }
              // 处理数据
          case <-done: // 外部终止信号
              return
          }
      }
    }()
    
无限循环未设退出条件
// 错误示例:无限循环且无法终止
go func() {
    for {
        // 持续执行任务
    }
}()

解决方法:

  • 通过 Channel 或 Context 传递退出信号。
    go func(ctx context.Context) {
      for {
          select {
          case <-ctx.Done():
              return // 收到取消信号退出
          default:
              // 执行任务
          }
      }
    }(ctx)
    
使用 Context 传递终止信号
// 通过 context.Context 统一管理 Goroutine 的生命周期:
ctx, cancel := context.WithCancel(context.Background())

go func() {
    select {
    case <-ctx.Done():
        return // 收到取消信号退出
    case data := <-ch:
        // 处理数据
    }
}()

// 需要终止时调用 cancel()
cancel()
使用 sync.WaitGroup 同步
var wg sync.WaitGroup

wg.Add(1)
go func() {
    defer wg.Done()
    // 执行任务
}()

wg.Wait() // 等待所有 Goroutine 完成

总结

  • 使用 context.Context 或 chan 传递退出信号 避免 Goroutine 无法退出。
  • 避免永久阻塞:通过超时、非阻塞操作或退出信号。
  • 关闭无用 Channel:通知接收方资源已释放。
  • pprof 监控运行中的 Goroutine 数量,可进一步定位潜在泄漏。