Go 并发编程常用模式模板
一份可直接套用的 Go 并发参考。每个模式给出「何时用 / 模板 / 关键陷阱」。
贯穿全文的两条判断准则:
- 能不共享就用通讯(channel / 消息传递),必须共享就用最简单的同步包起来(锁 / 原子)。
- 每启动一个 goroutine,先想清楚它在什么条件下、怎么退出——否则就是 goroutine 泄漏。
版本说明:示例按 Go 1.22+ 编写。
errgroup/singleflight来自golang.org/x/sync。
共享变量加锁
何时用:多个 goroutine 读写同一块状态,且操作不是单一原子动作(需要把「复合操作」整体保护)。
import "sync"
type Counter struct {
mu sync.Mutex
n int
}
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.n++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.n
}
能用原子就别用锁(单个计数器、标志位、指针整体替换):
import "sync/atomic"
var n atomic.Int64
n.Add(1) // 原子自增
_ = n.Load() // 原子读
读多写少用 RWMutex(多个读者可并发持读锁):
type Cache struct {
mu sync.RWMutex
m map[string]string
}
func (c *Cache) Get(k string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.m[k]
return v, ok
}
func (c *Cache) Set(k, v string) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[k] = v
}
「几乎只读、偶尔整体替换」用原子指针(COW,读者无锁):
var config atomic.Pointer[Config]
// 读(无锁,极快)
cfg := config.Load()
// 写(构造新副本,原子换上)
config.Store(buildNewConfig())
关键陷阱
- 一把锁要么保护一组相关字段,要么粒度过大成瓶颈——按「一起被读写的数据」划分锁的边界。
- 多把锁务必全局固定获取顺序,否则死锁。
defer Unlock()是默认习惯,避免 return / panic 路径忘记解锁。
单例 / 惰性初始化(Once)
何时用:某资源全局只该初始化一次且线程安全——数据库连接、连接池、全局配置。
import "sync"
var (
once sync.Once
db *DB
)
func GetDB() *DB {
once.Do(func() {
db = mustConnect() // 只会执行一次,并发调用会阻塞等它完成
})
return db
}
Go 1.21+ 更简洁的写法:
// OnceValue:把「只算一次的值」封装好,调用 GetDB() 即得
var GetDB = sync.OnceValue(func() *DB {
return mustConnect()
})
关键陷阱
- 不要自己手写「双重检查锁」——内存序极易写错,
Once已正确处理。 once.Do里 panic 会被视为「已执行」,后续调用不会重试,初始化逻辑要保证不 panic 或自行兜底。
通讯:用 channel 传递数据(生产者-消费者)
何时用:数据 / 所有权要在 goroutine 之间流转,而不是被多方共享读写。
// 无缓冲 = 同步交接(发送方阻塞到接收方就绪)
// 有缓冲 = 解耦速率(缓冲未满即可发送)
ch := make(chan Item, 100)
// 生产者:发送方负责 close
go func() {
defer close(ch)
for _, it := range items {
ch <- it
}
}()
// 消费者:range 会一直取到 channel「已关闭且取空」
for it := range ch {
handle(it)
}
关键陷阱
- 永远由发送方 close,且在所有发送完成之后。向已关闭的 channel 发送会 panic。
- 忘记 close 会让消费者的
range永远阻塞 → 泄漏。 - 多个发送方时,需用
WaitGroup等全部发送方结束后再由一处 close(见模式 6)。
发布订阅(Pub/Sub)
何时用:一条消息要广播给所有订阅者(每人一份副本)。注意与 fan-out 区别:fan-out 是「一条消息只给某一个 worker」。
import "sync"
type Broker[T any] struct {
mu sync.RWMutex
subs map[chan T]struct{}
}
func NewBroker[T any]() *Broker[T] {
return &Broker[T]{subs: make(map[chan T]struct{})}
}
func (b *Broker[T]) Subscribe() chan T {
ch := make(chan T, 16)
b.mu.Lock()
b.subs[ch] = struct{}{}
b.mu.Unlock()
return ch
}
func (b *Broker[T]) Unsubscribe(ch chan T) {
b.mu.Lock()
delete(b.subs, ch)
close(ch)
b.mu.Unlock()
}
func (b *Broker[T]) Publish(msg T) {
b.mu.RLock()
defer b.mu.RUnlock()
for ch := range b.subs {
select {
case ch <- msg:
default: // 某订阅者缓冲已满则丢弃,避免拖垮整个 Broker
}
}
}
关键陷阱
- 慢订阅者会拖累广播——用带缓冲 channel +
select/default丢弃,或为每个订阅者配独立投递 goroutine。 - 订阅者退出时必须
Unsubscribe,否则Publish会一直往无人读的 channel 发。
select 多路等待 + Actor 状态串行化
select:超时 / 取消 / 多路复用的通用积木
select {
case v := <-ch:
handle(v)
case <-ctx.Done(): // 取消信号(最常用)
return ctx.Err()
case <-time.After(2 * time.Second): // 超时
return errTimeout
}
非阻塞收发(加 default):
select {
case ch <- v:
// 发出去了
default:
// 发不出去(缓冲满/无接收者),不阻塞
}
Actor:用独占 goroutine 替代锁
何时用:让状态只属于一个 goroutine,别人想读写就发命令进去——全程无锁,因为没有共享。
type cmd struct {
op string
key string
value int
reply chan int // 一次性回信通道
}
func store(ctx context.Context, cmds <-chan cmd) {
state := map[string]int{} // 私有状态,无需任何锁
for {
select {
case <-ctx.Done():
return
case c := <-cmds:
switch c.op {
case "set":
state[c.key] = c.value
case "get":
c.reply <- state[c.key]
}
}
}
}
// 调用方
func get(cmds chan<- cmd, key string) int {
reply := make(chan int, 1)
cmds <- cmd{op: "get", key: key, reply: reply}
return <-reply
}
关键陷阱
replychannel 建议带缓冲(容量 1),避免 actor 因调用方没及时收而阻塞。- actor goroutine 必须能被
ctx取消退出,否则泄漏。
流水线 / Fan-out / Worker Pool
三者递进:pipeline 是骨架,fan-out 给慢阶段加并行,worker pool 是 fan-out 的「有界」版。
Pipeline(各阶段并发运转,数据依次穿过)
func gen(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return // 支持取消,防止下游不收时泄漏
}
}
}()
return out
}
func sq(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
// 使用:for r := range sq(ctx, gen(ctx, 1, 2, 3, 4)) { ... }
Worker Pool(固定 N 个 worker 消费任务队列)
func workerPool[J any, R any](
ctx context.Context,
jobs <-chan J,
n int,
work func(J) R,
) <-chan R {
results := make(chan R)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := range jobs {
select {
case results <- work(j):
case <-ctx.Done():
return
}
}
}()
}
// 全部 worker 退出后才关 results(关键:必须等 wg)
go func() {
wg.Wait()
close(results)
}()
return results
}
Fan-in(合并多个 channel 到一个)
func merge[T any](cs ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
wg.Add(len(cs))
for _, c := range cs {
go func(c <-chan T) {
defer wg.Done()
for v := range c {
out <- v
}
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
关键陷阱
- 关闭
out的 goroutine 必须wg.Wait()之后再 close,否则会向已关闭 channel 发送 panic。 - fan-out 多 worker 共读一个输入 channel,Go 保证每个值只被一个 worker 取到,无需额外加锁。
- 每个阶段都加
select { case <-ctx.Done(): return },否则下游提前退出时上游会永久阻塞。
context:取消与超时传播
何时用:几乎所有跨 goroutine / 跨 API 边界的并发函数——给整棵任务树装总开关,防泄漏、控超时。
import (
"context"
"time"
)
// 超时
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
defer cancel() // 一定要调用,否则计时器泄漏
// 或手动取消
ctx, cancel := context.WithCancel(parent)
defer cancel()
// goroutine 内监听取消
func worker(ctx context.Context, in <-chan Task) error {
for {
select {
case <-ctx.Done():
return ctx.Err() // 被取消或超时
case t, ok := <-in:
if !ok {
return nil
}
handle(t)
}
}
}
关键陷阱
ctx按约定永远是函数第一个参数:func F(ctx context.Context, ...)。WithCancel/WithTimeout返回的cancel必须调用(通常defer cancel()),否则资源泄漏。- 不要把
ctx存进结构体长期持有;它应随调用链传递。 - 别用 context 传业务参数,只传请求作用域的取消 / 截止时间 / 少量元数据。
结构化并发(errgroup)
何时用:并发发起一批独立任务,等它们全部完成,且任一失败就取消其余。
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([][]byte, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([][]byte, len(urls))
for i, url := range urls { // Go 1.22+ 每次迭代是独立变量,无需再 i, url := i, url
g.Go(func() error {
data, err := fetch(ctx, url)
if err != nil {
return err // 第一个非 nil error 会取消 ctx,联动其余任务
}
results[i] = data
return nil
})
}
if err := g.Wait(); err != nil { // 汇合点:等全部,返回首个错误
return nil, err
}
return results, nil
}
限制并发度(errgroup 自带,等价于内建 worker pool):
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(8) // 最多 8 个任务同时跑
for _, job := range jobs {
g.Go(func() error { return process(ctx, job) })
}
err := g.Wait()
关键陷阱
g.Go里返回 error 才会触发联动取消;任务内部要监听ctx.Done()才能真正提前退出。- 多个任务写同一切片不同下标是安全的(无重叠);写同一 map / 累加同一变量仍需加锁或原子。
- 别裸
go出去不管——凡是「发起一批、要等齐」的场景都用 errgroup 给一个明确汇合点。
附录:两个高频工具(用到再查)
Singleflight——并发请求同一 key 时只真正执行一次,其余共享结果(防缓存击穿):
import "golang.org/x/sync/singleflight"
var g singleflight.Group
func getUser(id string) (*User, error) {
v, err, _ := g.Do(id, func() (any, error) {
return queryUserFromDB(id) // 同一 id 的并发调用只查一次库
})
if err != nil {
return nil, err
}
return v.(*User), nil
}
限流(令牌桶)——控制每秒最多处理多少:
import "golang.org/x/time/rate"
limiter := rate.NewLimiter(rate.Limit(10), 1) // 10 次/秒,桶容量 1
for req := range requests {
if err := limiter.Wait(ctx); err != nil { // 取不到令牌就等(可被 ctx 取消)
return
}
go handle(req)
}
速查:怎么选
| 需求 | 用什么 |
|---|---|
| 多方读写同一状态 | 锁(Mutex);只读多用 RWMutex;单值用 atomic |
| 只初始化一次 | sync.Once / sync.OnceValue |
| 数据在 goroutine 间流转 | channel(生产者-消费者) |
| 一条消息广播给所有人 | Pub/Sub Broker |
| 一条任务只给一个 worker | fan-out / worker pool |
| 状态独占、想免锁 | Actor(独占 goroutine + 命令 channel) |
| 等多个事件(取消/超时/多路) | select |
| 分阶段处理数据流 | pipeline |
| 给慢阶段加可控并行 | worker pool / errgroup.SetLimit |
| 取消、超时、防泄漏 | context |
| 发起一批任务、等齐、失败联动 | errgroup |
| 并发请求同一资源只算一次 | singleflight |
| 控制处理速率 | rate.Limiter |
一句话收口:前半部分(锁 / Once / channel / Pub-Sub)解决「状态怎么安全共享或传递」,后半部分(select / pipeline / pool / context / errgroup)解决「并发任务怎么组织、控制和收尾」。能不共享就用通讯,必须共享就用最简单的同步包起来,且每个 goroutine 都要有明确的退出路径。