1. Channel 四种读取方式
1.1 直接读取(无 ok 检测)
ch := make(chan int, 1)
v := <-ch // 不会 panic,但无法区分是真实零值还是关闭后零值1.2 带 ok 检测的读取
v, ok := <-ch
// ok 为 true 时,表示成功从 channel 读到真实数据。
// ok 为 false 时,表示 channel 已关闭且无更多数据,此时 v 是类型零值。1.3 range 循环读取
for v := range ch {
// ...
}- 会持续从 channel 读取值,直到 channel 被关闭且数据读完,循环自动终止。
- 等效于不断执行
v, ok := <-ch并检查 ok,写法更简洁。 - 注意:如果 channel 永远不关闭,会一直阻塞造成死锁(除非有其他退出机制)。
1.4 select 多路复用读取
select {
case v := <-ch:
// 处理数据
default:
// 非阻塞
}- 可以同时监听多个 channel,哪个有数据就执行对应的 case。
- 配合
default可实现非阻塞读取:若所有 channel 都无数据则立即执行 default。 - 即使某个 channel 已关闭,读取也会立即得到零值(需结合
v, ok := <-ch检测关闭)。
Channel 状态速记
| 操作 | nil channel | 正常 channel | closed channel |
|---|---|---|---|
| 读取 | 阻塞 | 正常读取 | 返回零值 |
| 写入 | 阻塞 | 正常写入 | panic |
| 关闭 | panic | 正常关闭 | panic |
读关零值,写关 panic;nil 两边都阻塞;无缓冲要配对,有缓冲看容量;关两次也要 panic。
2. sync.WaitGroup 等待多 goroutine
场景:汇总多个请求并发处理。如同时对多个用户发送通知、对多个文件压缩、大量图片缩略图生成、多 API 聚合。
2.1 基础用法
package main
import (
"fmt"
"sync"
"time"
)
func work(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second * 2)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go work(i, &wg)
}
wg.Wait()
fmt.Println("ALL DONE")
}2.2 多服务聚合(带错误处理)
package main
func AggregateData(ctx context.Context) (Result, error) {
var wg sync.WaitGroup
var mu sync.Mutex
var userData *User
var orderData *Order
var errs []error
wg.Add(3)
// 调用用户服务
go func() {
defer wg.Done()
data, err := callUserService(ctx)
mu.Lock()
userData = data
if err != nil {
errs = append(errs, err)
}
mu.Unlock()
}()
// 调用订单服务
go func() {
defer wg.Done()
data, err := callOrderService(ctx)
mu.Lock()
orderData = data
if err != nil {
errs = append(errs, err)
}
mu.Unlock()
}()
// 调用库存服务(类似)
wg.Wait()
// 汇总结果或处理错误
if len(errs) > 0 {
return Result{}, combineErrors(errs)
}
return merge(userData, orderData), nil
}3. 生产者-消费者模式(Channel 实现)
package main
import (
"fmt"
"math/rand"
"strconv"
"sync"
"time"
)
// Order 任务结构体(模拟外卖订单)
type Order struct {
ID int
Shop string // 店铺名
Dish string // 菜品
Distance float64 // 配送距离(km)
}
func GenOrderNo() string {
now := time.Now().Unix()
randNum := rand.Intn(10000)
orderNo := fmt.Sprintf("%d%d", now, randNum)
return orderNo
}
func main() {
orderChan := make(chan Order, 5)
var producerWg sync.WaitGroup
var consumerWg sync.WaitGroup
// 启动生产者(15个)
for i := 1; i <= 15; i++ {
producerWg.Add(1)
go func(id int) {
defer producerWg.Done()
orderId, _ := strconv.ParseInt(GenOrderNo(), 10, 64)
order := Order{
ID: int(orderId),
Shop: "厦门小吃店",
Dish: "海蛎煎",
Distance: float64(9%3 + 1),
}
fmt.Printf("【生产者-%d】新订单:%d\n", id, order.ID)
orderChan <- order
time.Sleep(time.Millisecond * 10)
}(i)
}
// 启动消费者(3个骑手)
for i := 1; i <= 3; i++ {
consumerWg.Add(1)
go func(id int) {
defer consumerWg.Done()
for order := range orderChan {
fmt.Printf("【骑手-%d】接到订单 %d,配送 %.1f km\n", id, order.ID, order.Distance)
time.Sleep(time.Second * 5)
fmt.Printf("【骑手-%d】订单 %d 已送达\n", id, order.ID)
}
}(i)
}
// 等待所有生产者完成 → 关闭 channel → 等待消费者完成
producerWg.Wait()
close(orderChan)
consumerWg.Wait()
fmt.Println("厦门外卖系统打烊啦~")
}4. select 超时控制
package main
import (
"context"
"fmt"
"time"
)
func callUserServe(ctx context.Context) (string, error) {
timer := time.NewTimer(time.Second * 2)
select {
case <-timer.C:
return "user_info", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func callOrderServe(ctx context.Context) (string, error) {
timer := time.NewTimer(time.Second * 3)
select {
case <-timer.C:
return "order_info", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
type result struct {
name string
data string
err error
}
resultCh := make(chan result, 2)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
go func() {
data, err := callUserServe(ctx)
resultCh <- result{"user", data, err}
}()
go func() {
data, err := callOrderServe(ctx)
resultCh <- result{"order", data, err}
}()
ret := make(map[string]string)
for i := 0; i < 2; i++ {
select {
case <-ctx.Done(): // 整体超时或上游取消
fmt.Println("整体超时或上游取消!")
return
case res := <-resultCh:
if res.err != nil {
fmt.Printf("%s error: %v\n", "事务", res.err)
return
}
ret[res.name] = res.data
}
}
fmt.Printf("最后结果: %v\n", ret)
}5. Worker Pool(带最大并发数控制)
package main
import (
"fmt"
"sync"
"time"
)
type Task func()
type WorkPool struct {
maxWorkers int
taskQueue chan Task
wg sync.WaitGroup
stopOnce sync.Once
stop chan struct{}
}
func NewWorkPool(maxWorker int, queueSize int) *WorkPool {
return &WorkPool{
maxWorkers: maxWorker,
taskQueue: make(chan Task, queueSize),
stop: make(chan struct{}),
}
}
func (p *WorkPool) worker() {
defer p.wg.Done()
for {
select {
case task, ok := <-p.taskQueue:
if !ok {
return // 任务队列已关闭,worker 退出
}
task()
case <-p.stop:
return // 收到停止信号,退出
}
}
}
func (p *WorkPool) Start() {
for i := 0; i < p.maxWorkers; i++ {
p.wg.Add(1)
go p.worker()
}
}
func (p *WorkPool) Submit(task Task) bool {
select {
case <-p.stop:
return false
default:
}
select {
case p.taskQueue <- task:
return true
case <-p.stop:
return false
}
}
// StopGracefully 优雅关闭:不再接收新任务,等待所有已提交任务完成
func (p *WorkPool) StopGracefully() {
p.stopOnce.Do(func() {
close(p.taskQueue)
})
p.wg.Wait()
}
func main() {
pool := NewWorkPool(2, 6) // 2个协程,队列长度6
pool.Start()
for i := 0; i < 10; i++ {
taskID := i
submit := pool.Submit(func() {
fmt.Printf("task %d 开始\n", taskID)
time.Sleep(500 * time.Millisecond)
fmt.Printf("任务 %d 执行完毕\n", taskID)
})
if !submit {
fmt.Printf("任务 %d 提交失败,池已停止\n", taskID)
}
}
pool.StopGracefully()
fmt.Println("pool stopped")
}6. sync.Mutex 保护计数器
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.RWMutex
value int
}
func (c *Counter) Add() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.value
}
func main() {
var wg sync.WaitGroup
c := &Counter{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.Add()
}()
}
wg.Wait()
fmt.Println(c.Value())
}7. sync.RWMutex 实现读多写少的缓存
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.RWMutex
value int
}
func (c *Counter) Add() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.RLock()
defer c.mu.RUnlock()
return c.value
}
func main() {
var wg sync.WaitGroup
c := &Counter{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.Add()
}()
}
wg.Wait()
fmt.Println(c.Value())
}