GO channel总结

作者: 分类: php 时间: 2026-05-29 评论: 暂无评论

1. Channel 四种读取方式

1.1 直接读取(无 ok 检测)

ch := make(chan int, 1)
v := <-ch // 不会 panic,但无法区分是真实零值还是关闭后零值

1.2 带 ok 检测的读取

v, ok := <-ch
// ok 为 true 时,表示成功从 channel 读到真实数据。
// ok 为 false 时,表示 channel 已关闭且无更多数据,此时 v 是类型零值。

1.3 range 循环读取

for v := range ch {
    // ...
}
  • 会持续从 channel 读取值,直到 channel 被关闭且数据读完,循环自动终止。
  • 等效于不断执行 v, ok := <-ch 并检查 ok,写法更简洁。
  • 注意:如果 channel 永远不关闭,会一直阻塞造成死锁(除非有其他退出机制)。

1.4 select 多路复用读取

select {
case v := <-ch:
    // 处理数据
default:
    // 非阻塞
}
  • 可以同时监听多个 channel,哪个有数据就执行对应的 case。
  • 配合 default 可实现非阻塞读取:若所有 channel 都无数据则立即执行 default。
  • 即使某个 channel 已关闭,读取也会立即得到零值(需结合 v, ok := <-ch 检测关闭)。

Channel 状态速记

操作nil channel正常 channelclosed channel
读取阻塞正常读取返回零值
写入阻塞正常写入panic
关闭panic正常关闭panic

读关零值,写关 panic;nil 两边都阻塞;无缓冲要配对,有缓冲看容量;关两次也要 panic。


2. sync.WaitGroup 等待多 goroutine

场景:汇总多个请求并发处理。如同时对多个用户发送通知、对多个文件压缩、大量图片缩略图生成、多 API 聚合。

2.1 基础用法

package main

import (
    "fmt"
    "sync"
    "time"
)

func work(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second * 2)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go work(i, &wg)
    }
    wg.Wait()
    fmt.Println("ALL DONE")
}

2.2 多服务聚合(带错误处理)

package main

func AggregateData(ctx context.Context) (Result, error) {
    var wg sync.WaitGroup
    var mu sync.Mutex
    var userData *User
    var orderData *Order
    var errs []error

    wg.Add(3)

    // 调用用户服务
    go func() {
        defer wg.Done()
        data, err := callUserService(ctx)
        mu.Lock()
        userData = data
        if err != nil {
            errs = append(errs, err)
        }
        mu.Unlock()
    }()

    // 调用订单服务
    go func() {
        defer wg.Done()
        data, err := callOrderService(ctx)
        mu.Lock()
        orderData = data
        if err != nil {
            errs = append(errs, err)
        }
        mu.Unlock()
    }()

    // 调用库存服务(类似)

    wg.Wait()
    // 汇总结果或处理错误
    if len(errs) > 0 {
        return Result{}, combineErrors(errs)
    }
    return merge(userData, orderData), nil
}

3. 生产者-消费者模式(Channel 实现)

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "sync"
    "time"
)

// Order 任务结构体(模拟外卖订单)
type Order struct {
    ID       int
    Shop     string  // 店铺名
    Dish     string  // 菜品
    Distance float64 // 配送距离(km)
}

func GenOrderNo() string {
    now := time.Now().Unix()
    randNum := rand.Intn(10000)
    orderNo := fmt.Sprintf("%d%d", now, randNum)
    return orderNo
}

func main() {
    orderChan := make(chan Order, 5)

    var producerWg sync.WaitGroup
    var consumerWg sync.WaitGroup

    // 启动生产者(15个)
    for i := 1; i <= 15; i++ {
        producerWg.Add(1)
        go func(id int) {
            defer producerWg.Done()
            orderId, _ := strconv.ParseInt(GenOrderNo(), 10, 64)
            order := Order{
                ID:       int(orderId),
                Shop:     "厦门小吃店",
                Dish:     "海蛎煎",
                Distance: float64(9%3 + 1),
            }
            fmt.Printf("【生产者-%d】新订单:%d\n", id, order.ID)
            orderChan <- order
            time.Sleep(time.Millisecond * 10)
        }(i)
    }

    // 启动消费者(3个骑手)
    for i := 1; i <= 3; i++ {
        consumerWg.Add(1)
        go func(id int) {
            defer consumerWg.Done()
            for order := range orderChan {
                fmt.Printf("【骑手-%d】接到订单 %d,配送 %.1f km\n", id, order.ID, order.Distance)
                time.Sleep(time.Second * 5)
                fmt.Printf("【骑手-%d】订单 %d 已送达\n", id, order.ID)
            }
        }(i)
    }

    // 等待所有生产者完成 → 关闭 channel → 等待消费者完成
    producerWg.Wait()
    close(orderChan)
    consumerWg.Wait()

    fmt.Println("厦门外卖系统打烊啦~")
}

4. select 超时控制

package main

import (
    "context"
    "fmt"
    "time"
)

func callUserServe(ctx context.Context) (string, error) {
    timer := time.NewTimer(time.Second * 2)
    select {
    case <-timer.C:
        return "user_info", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func callOrderServe(ctx context.Context) (string, error) {
    timer := time.NewTimer(time.Second * 3)
    select {
    case <-timer.C:
        return "order_info", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    type result struct {
        name string
        data string
        err  error
    }
    resultCh := make(chan result, 2)

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
    defer cancel()

    go func() {
        data, err := callUserServe(ctx)
        resultCh <- result{"user", data, err}
    }()

    go func() {
        data, err := callOrderServe(ctx)
        resultCh <- result{"order", data, err}
    }()

    ret := make(map[string]string)
    for i := 0; i < 2; i++ {
        select {
        case <-ctx.Done(): // 整体超时或上游取消
            fmt.Println("整体超时或上游取消!")
            return
        case res := <-resultCh:
            if res.err != nil {
                fmt.Printf("%s error: %v\n", "事务", res.err)
                return
            }
            ret[res.name] = res.data
        }
    }
    fmt.Printf("最后结果: %v\n", ret)
}

5. Worker Pool(带最大并发数控制)

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task func()

type WorkPool struct {
    maxWorkers int
    taskQueue  chan Task
    wg         sync.WaitGroup
    stopOnce   sync.Once
    stop       chan struct{}
}

func NewWorkPool(maxWorker int, queueSize int) *WorkPool {
    return &WorkPool{
        maxWorkers: maxWorker,
        taskQueue:  make(chan Task, queueSize),
        stop:       make(chan struct{}),
    }
}

func (p *WorkPool) worker() {
    defer p.wg.Done()
    for {
        select {
        case task, ok := <-p.taskQueue:
            if !ok {
                return // 任务队列已关闭,worker 退出
            }
            task()
        case <-p.stop:
            return // 收到停止信号,退出
        }
    }
}

func (p *WorkPool) Start() {
    for i := 0; i < p.maxWorkers; i++ {
        p.wg.Add(1)
        go p.worker()
    }
}

func (p *WorkPool) Submit(task Task) bool {
    select {
    case <-p.stop:
        return false
    default:
    }
    select {
    case p.taskQueue <- task:
        return true
    case <-p.stop:
        return false
    }
}

// StopGracefully 优雅关闭:不再接收新任务,等待所有已提交任务完成
func (p *WorkPool) StopGracefully() {
    p.stopOnce.Do(func() {
        close(p.taskQueue)
    })
    p.wg.Wait()
}

func main() {
    pool := NewWorkPool(2, 6) // 2个协程,队列长度6
    pool.Start()

    for i := 0; i < 10; i++ {
        taskID := i
        submit := pool.Submit(func() {
            fmt.Printf("task %d 开始\n", taskID)
            time.Sleep(500 * time.Millisecond)
            fmt.Printf("任务 %d 执行完毕\n", taskID)
        })
        if !submit {
            fmt.Printf("任务 %d 提交失败,池已停止\n", taskID)
        }
    }

    pool.StopGracefully()
    fmt.Println("pool stopped")
}

6. sync.Mutex 保护计数器

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.RWMutex
    value int
}

func (c *Counter) Add() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.value
}

func main() {
    var wg sync.WaitGroup
    c := &Counter{}

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            c.Add()
        }()
    }

    wg.Wait()
    fmt.Println(c.Value())
}

7. sync.RWMutex 实现读多写少的缓存

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.RWMutex
    value int
}

func (c *Counter) Add() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.value
}

func main() {
    var wg sync.WaitGroup
    c := &Counter{}

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            c.Add()
        }()
    }

    wg.Wait()
    fmt.Println(c.Value())
}

SQL 的执行顺序

作者: 分类: mysql 时间: 2026-05-23 评论: 暂无评论

原始表

FROM (获取数据源)

ON (筛选连接前的行)

JOIN (生成连接表)

WHERE (过滤行)

GROUP BY (分组)

HAVING (过滤分组)

SELECT (计算列、聚合、别名)

DISTINCT (去重)

ORDER BY (排序)

LIMIT (限制行数)

最终结果

FROM – 确定数据源表

ON – 筛选连接前的行(对 JOIN 而言)

JOIN – 执行连接操作,生成虚拟表

WHERE – 对连接后的行进行过滤

GROUP BY – 将行分组

HAVING – 对分组后的结果进行过滤

SELECT – 选择列、计算表达式、聚合函数

DISTINCT – 去除重复行

ORDER BY – 排序

LIMIT / OFFSET – 限制返回行数

注意:ON 只对 JOIN 有效,WHERE 则在连接完成后整体过滤。SELECT 中的列别名通常无法在 WHERE / GROUP BY / HAVING 中使用,因为它们在 SELECT 之后才生成。

WHERE和ON区别:ON 的过滤发生在连接前,而 WHERE 发生在连接后。

WHERE 和 HAVING 的区别:
WHERE 过滤行,在分组前;HAVING 过滤分组,在 GROUP BY 后,可使用聚合函数。

Channel有缓冲和无缓冲有什么区别?

作者: 分类: go 时间: 2026-05-13 评论: 暂无评论

无缓冲Channel本质是同步通信——发送和接收必须同时准备好,否则阻塞。它是一种“握手”机制,常用于两个goroutine之间做精准的同步信号。有缓冲Channel是异步通信——生产者可以一口气往缓冲区写,消费者可以慢慢读,解耦了生产和消费的节奏。但缓冲区满了依然会阻塞,千万别把缓冲当“存储用”。

示例:

无缓冲:ch := make(chan int),发送方和接收方必须配对才能继续;

有缓冲:ch := make(chan int, 3),可连续发送3次不阻塞,第4次才阻塞。

关于select:select会随机选择一个可执行的case(不是顺序的),这是Go语言故意为之,避免饥饿问题。

mysql读写分离

作者: 分类: php 时间: 2026-05-11 评论: 暂无评论

主从同步延迟是核心痛点
一、为什么会不同步?(延迟成因)
主从复制本质是异步:主库提交事务后立即返回客户端,从库通过 IO_THREAD 拉取 binlog 并重放(SQL_THREAD)。

延迟来源:

从库硬件差(磁盘慢、CPU弱)

主库写入压力大,binlog 产生速度超过从库重放速度

从库承担了大量复杂查询,占用资源

网络延迟

大事务(如一次删除百万行)在从库重放时阻塞

二、保证同步的方法(追求强一致)
方案 原理 优点 缺点
强制读主库 对一致性要求高的操作(如转账后立即查询余额),让代码判断路由到主库 100% 准确,无延迟 增加主库压力,无法发挥读写分离的扩展性
半同步复制 主库至少等待一个从库确认收到 binlog 后才返回客户端 基本不丢数据,延迟可控(等待一个 ack) 如果从库网络抖动,会阻塞主库写入;MySQL 5.7+ 支持 AFTER_SYNC 模式
组复制(MGR) 基于 Paxos,所有节点同步写入,强一致 真正强一致,自动故障切换 性能下降(写放大),只适合写少读多场景
等待从库位点 写入后,查询前检查从库的 Seconds_Behind_Master 或 binlog 位点,等它追上 应用层可控 增加复杂度,可能超时或死等
实际生产最常用:关键链路强制读主 + 普通查询走从库。例如:注册后立即登录 → 读主库;发布文章后作者自己刷页面 → 读主库;其他用户浏览 → 读从库。

三、同步出现延迟后如何处理?(补偿与兜底)
既然延迟无法 100% 消除,必须有应对方案:

  1. 监控与告警
    持续监控 Seconds_Behind_Master(注意它超过 MAX_ALLOWED_PACKET 会重置为 0,不够准确)。

更可靠:监控 Master_Log_File 和 Read_Master_Log_Pos vs Relay_Master_Log_File 和 Exec_Master_Log_Pos,计算位点差距。

设置阈值(如延迟 > 5秒)触发告警,自动切换读写策略。

  1. 业务容忍设计
    多数场景(评论、点赞、非实时排行榜)允许短暂不一致,用户刷新后自然可见。

前端可做“乐观展示”:用户提交动作后,前端先本地更新 UI,后台异步确认。即使从库暂时没数据,用户感受不到。

  1. 延迟补偿机制
    读取降级:检测到延迟超过阈值时,将该请求临时切到主库(需要负载均衡器或数据库中间件支持)。

缓存兜底:写操作后更新 Redis(设置较短 TTL),读请求优先查缓存,缓存未命中再查从库。

异步修复:定时任务扫描主从差异表,修复不一致数据。

  1. 从库追赶加速
    多线程并行复制(MySQL 5.7+ 的 slave_parallel_workers,8.0 默认 logical_clock 模式),大幅提升重放速度。

提升从库硬件(SSD、更多 CPU)。

避免从库执行复杂 ANALYZE、大查询(可临时关闭从库查询服务,专心同步)。

拆解大事务:DELETE ... LIMIT 1000 循环提交。

  1. 最终一致性的兜底预案
    如果延迟长期无法解决,主动进行主从切换:将原从库提升为主库,并修复数据。配合 pt-table-checksum + pt-table-sync 工具修复不一致。

ES实际需求设计展示

作者: 分类: php 时间: 2026-05-08 评论: 暂无评论

APP搜索设计案例demo

use Elasticsearch\ClientBuilder;

public static function singleton()
{
    if (empty(self::$client)) {
        $hosts        = self::getHost();
        $client       = ClientBuilder::create()->setHosts($hosts)->build();
        self::$client = $client;
    }
    return self::$client;
}

[数据同步核心代码]

public static function addAppV2($appInfo)
    {
        $isIos   = intval(stripos($appInfo['platform'], 'IOS') !== false);
        $isAz    = intval(stripos($appInfo['platform'], '安卓') !== false);
        $isH5    = intval(stripos($appInfo['platform'], 'H5') !== false);
        $isYgj   = intval(stripos($appInfo['platform'], 'YGJ') !== false);
        $isYyx   = intval(stripos($appInfo['platform'], 'PCYYX') !== false);
        $isAzH5  = intval($isAz || $isH5);
        $isIosH5 = intval($isIos || $isH5);

        $param = [
            'index' => self::APP_INDEX_V2,
            'id'    => $appInfo['id'],
            'body'  => [
                'id'              => $appInfo['id'],
                'state'           => $appInfo['state'],
                'is_ios'          => $isIos,
                'is_az'           => $isAz,
                'is_ios_h5'       => $isIosH5,
                'is_az_h5'        => $isAzH5,
                'is_ygj'          => $isYgj,
                'is_h5'           => $isH5,
                'is_pc_yyx'       => $isYyx,
                'classid'         => $appInfo['classid'],
                'app_id'          => $appInfo['app_id'],
                'is_az_h5_pc_yyx' => intval($isYyx || $isAzH5),
                'game_name'       => $appInfo['main_title'],
                'subtitle'        => $appInfo['subtitle'] ?? '',
                'title'           => $appInfo['new_title'],
                'mix_type'        => $appInfo['mix_type'] ?? 100,
            ]
        ];

        $addIndex = self::singleton()->index($param);
        return $addIndex;
    }

[搜索核心代码]
同步游戏数据到es数据库,通过function_score 设置权重

游戏名中含有 关键字 完全匹配权重设置10000
搜索词包含在标题中,且比例 ≥ 60% → 高权重
标题包含在搜索词中,且比例 ≤ 200% → 高权重

下载加权 5
日活加权 15
支付加权80

private function searchGameIdsV21()
    {
        $keyword = $this->keyword;

        $must = $mustNot = $scoreFun = [];

        $should[] = [
            'match' => [
                'game_name.keyword' => [
                    'query'     => $keyword,
                    'fuzziness' => floor(mb_strlen($keyword) / 4), // 表示容错字符
                    'boost'     => 1
                ]
            ]
        ];
        $should[] = [
            'match' => [
                'title.keyword' => [
                    'query'     => $keyword,
                    'fuzziness' => floor(mb_strlen($keyword) / 3),
                    'boost'     => 1
                ]
            ]
        ];
        $should[] = [
            'match' => [
                'cates.keyword' => [
                    'query'     => $keyword,
                    'fuzziness' => 1,
                    'boost'     => 1
                ]
            ]
        ];
        $should[] = [
            'multi_match' => [
                'query'  => $keyword,
                'fields' => [
                    'game_name', 'subtitle'
                ],
                'boost'  => 1,
            ]
        ];

        // 增加包含全量游戏名的权重
        $scoreFun[] = [
            'filter' => [
                'term' => [
                    'game_name.keyword' => $keyword,
                ]
            ],
            'weight' => 10000
        ];
        $scoreFun[] = [
            'filter' => [
                'term' => [
                    'title.keyword' => $keyword,
                ]
            ],
            'weight' => 10000
        ];
        $scoreFun[] = [
            'filter' => [
                'script' => [
                    'script' => [
                        'source' => "String title = doc['title.keyword'].value;title.contains(params.query) && (params.query.length() / (double) Math.round(doc['title_length'].value)) >= 0.6 || params.query.contains(title) && (params.query.length() / (double) Math.round(doc['title_length'].value)) <= 2",
                        'params' => [
                            'query' => $keyword
                        ]
                    ]
                ]
            ],
            'weight' => 10000
        ];
        $scoreFun[] = [
            'filter' => [
                'script' => [
                    'script' => [
                        'source' => "String title = doc['game_name.keyword'].value; title.contains(params.query) &&  (params.query.length() / (double) Math.round(doc['name_length'].value)) >= 0.6 ||  params.query.contains(title) && (params.query.length() / (double) Math.round(doc['name_length'].value)) <= 4",
                        'params' => [
                            'query' => $keyword
                        ]
                    ]
                ]
            ],
            'weight' => 10000
        ];

        // 近1日的累计实付*80%+昨日日活*15%+下载量*5%
        $scoreFun[] = [
            'script_score' => [
                'script' => [
                    'source' => "1 + Math.log1p(doc['true_down'].value)"
                ]
            ],
            'weight'       => 5,
        ];
        $scoreFun[] = [
            'script_score' => [
                'script' => [
                    'source' => "1 + Math.log1p(doc['yesterday_hy'].value)"
                ]
            ],
            'weight'       => 15,
        ];
        $scoreFun[] = [
            'script_score' => [
                'script' => [
                    'source' => "1 + Math.log1p(Math.ceil(doc['real_pay'].value))"
                ]
            ],
            'weight'       => 80,
        ];

        if (EcloudByService::isZkyCps() || AFrom::isADevice()) {
            $filter[] = [
                'match' => [
                    'is_az_h5_pc_yyx' => 1
                ]
            ];
        } else {
            $filter[] = [
                'match' => [
                    'is_ios_h5' => 1
                ]
            ];
            // iOS过滤模拟器游戏
            $mustNot[] = [
                'term' => [
                    'classid' => APP_CLASS_SIMULATOR,
                ]
            ];
        }
        $mustNot[] = [
            'term' => [
                'is_pc_yyx' => 1,
            ]
        ];

       
        // 只展示预约、运营中
        $mustNot[] = [
            'terms' => [
                'state' => [
                    XYApp::STATE_CLOSED,
                    XYApp::STATE_DISABLED,
                    XYApp::STATE_CLOSING,
                    XYApp::STATE_HX1,
                    XYApp::STATE_HX2,
                ]
            ]
        ];
        $param = [
            'index' => ElasticSearch::APP_INDEX_V3,
            'body'  => [
                'query' => [
                    'function_score' => [
                        'query'      => [
                            'bool' => []
                        ],
                        'functions'  => $scoreFun,
                        "score_mode" => "sum",
                        "boost_mode" => "sum"
                    ],
                ],

                'from' => ($this->page - 1) * $this->listRows,
                'size' => $this->listRows,
                'sort' => [
                    [
                        '_score' => ['order' => 'desc']
                    ]
                ],
            ],
        ];
        if ($must) {
            $param['body']['query']['function_score']['query']['bool']['must'] = $must;
        }
        if ($mustNot) {
            $param['body']['query']['function_score']['query']['bool']['must_not'] = $mustNot;
        }
        if ($filter) {
            $param['body']['query']['function_score']['query']['bool']['filter'] = $filter;
        }
        $param['body']['query']['function_score']['query']['bool']['should']               = $should;
        $param['body']['query']['function_score']['query']['bool']['minimum_should_match'] = 1;

        $esData   =self::singleton()->search($param);
Top ↑