golang kafka 使用方法,秒杀案例(segmentio/kafka-go)

作者: 分类: go 时间: 2025-01-06 评论: 暂无评论
package main

import (
    "context"
    "github.com/segmentio/kafka-go"
    "gotribe-admin/pkg/util"
    "log"
)

func main() {
    type DoOrderEvent struct {
        ShopID int `json:"shop_id"`
        Uid    int `json:"uid"`
    }
    data := DoOrderEvent{
        ShopID: 10086,
        Uid:    44,
    }
    w := &kafka.Writer{
        Addr:        kafka.TCP("localhost:9092"),
        Topic:       "order_event",
        MaxAttempts: 3,
        Async:       true, //异步发送消息 提高效率
        BatchSize:   10,   //批量发送消息
        //WriteTimeout:  3,

        RequiredAcks: kafka.RequireAll, // 表示所有副本节点都确认消息后才返回
        // RequireNone (0)  fire-and-forget, do not wait for acknowledgements from the
        //  RequireOne  (1)  wait for the leader to acknowledge the writes
        //  RequireAll  (-1) wait for the full ISR to acknowledge the writes

        //Completion: ,表示消息发送完成后的回调函数
        AllowAutoTopicCreation: true, //否允许自动创建主题

    }
    err := w.WriteMessages(context.Background(), kafka.Message{
        Key:        []byte(util.UUID()),
        Value:      []byte(util.Struct2Json(data)),
        WriterData: data,
    })
    if err != nil {
        log.Fatal("failed to write messages:", err)
    }

    // 关闭生产者
    if err := w.Close(); err != nil {
        log.Fatal("failed to close writer:", err)
    }

    //启用消费者
    go func() {
        kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{})
        r := kafka.NewReader(kafka.ReaderConfig{
            Brokers:  []string{"localhost:9092"},
            Topic:    "order_event",
            GroupID:  "consumer_order_sec",
            MinBytes: 10e3, // 10KB
            MaxBytes: 10e6, // 10MB
        })
        defer r.Close()
        for {
            //m, err := r.ReadMessage(c) //阻塞获取消息 如果是消费组消费 自动提交偏移量
            m, err := r.FetchMessage(c) //获取消息 如果是消费组消费 不自动提交偏移量  需要手动提交 Consumer Lag
            if err != nil {
                log.Fatal("failed to read message:", err)
            }
            log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
            if err := r.CommitMessages(c, m); err != nil {
                log.Fatal("bad commit message: ", err)
            }

        }

    }()
    select {}

}
标签: none

订阅本站(RSS)