package main
import (
"context"
"github.com/segmentio/kafka-go"
"gotribe-admin/pkg/util"
"log"
)
func main() {
type DoOrderEvent struct {
ShopID int `json:"shop_id"`
Uid int `json:"uid"`
}
data := DoOrderEvent{
ShopID: 10086,
Uid: 44,
}
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "order_event",
MaxAttempts: 3,
Async: true, //异步发送消息 提高效率
BatchSize: 10, //批量发送消息
//WriteTimeout: 3,
RequiredAcks: kafka.RequireAll, // 表示所有副本节点都确认消息后才返回
// RequireNone (0) fire-and-forget, do not wait for acknowledgements from the
// RequireOne (1) wait for the leader to acknowledge the writes
// RequireAll (-1) wait for the full ISR to acknowledge the writes
//Completion: ,表示消息发送完成后的回调函数
AllowAutoTopicCreation: true, //否允许自动创建主题
}
err := w.WriteMessages(context.Background(), kafka.Message{
Key: []byte(util.UUID()),
Value: []byte(util.Struct2Json(data)),
WriterData: data,
})
if err != nil {
log.Fatal("failed to write messages:", err)
}
// 关闭生产者
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
//启用消费者
go func() {
kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{})
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "order_event",
GroupID: "consumer_order_sec",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer r.Close()
for {
//m, err := r.ReadMessage(c) //阻塞获取消息 如果是消费组消费 自动提交偏移量
m, err := r.FetchMessage(c) //获取消息 如果是消费组消费 不自动提交偏移量 需要手动提交 Consumer Lag
if err != nil {
log.Fatal("failed to read message:", err)
}
log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
if err := r.CommitMessages(c, m); err != nil {
log.Fatal("bad commit message: ", err)
}
}
}()
select {}
}