package main
import (
"context"
"github.com/segmentio/kafka-go"
"gotribe-admin/pkg/util"
"log"
)
func main() {
type DoOrderEvent struct {
ShopID int `json:"shop_id"`
Uid int `json:"uid"`
}
data := DoOrderEvent{
ShopID: 10086,
Uid: 44,
}
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "order_event",
MaxAttempts: 3,
Async: true, //异步发送消息 提高效率
BatchSize: 10, //批量发送消息
//WriteTimeout: 3,
RequiredAcks: kafka.RequireAll, // 表示所有副本节点都确认消息后才返回
// RequireNone (0) fire-and-forget, do not wait for acknowledgements from the
// RequireOne (1) wait for the leader to acknowledge the writes
// RequireAll (-1) wait for the full ISR to acknowledge the writes
//Completion: ,表示消息发送完成后的回调函数
AllowAutoTopicCreation: true, //否允许自动创建主题
}
err := w.WriteMessages(context.Background(), kafka.Message{
Key: []byte(util.UUID()),
Value: []byte(util.Struct2Json(data)),
WriterData: data,
})
if err != nil {
log.Fatal("failed to write messages:", err)
}
// 关闭生产者
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
//启用消费者
go func() {
kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{})
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "order_event",
GroupID: "consumer_order_sec",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer r.Close()
for {
//m, err := r.ReadMessage(c) //阻塞获取消息 如果是消费组消费 自动提交偏移量
m, err := r.FetchMessage(c) //获取消息 如果是消费组消费 不自动提交偏移量 需要手动提交 Consumer Lag
if err != nil {
log.Fatal("failed to read message:", err)
}
log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
if err := r.CommitMessages(c, m); err != nil {
log.Fatal("bad commit message: ", err)
}
}
}()
select {}
}
golang kafka 使用方法,秒杀案例(segmentio/kafka-go)
时间: 2025-01-06
评论: 暂无评论
go学习笔记2
时间: 2024-12-25
评论: 暂无评论
var policies []model.RoleAct
policies6 := &[]model.RoleAct{}
fmt.Printf("初始化 policies %+v", policies) //分配了零值+内存地址
fmt.Printf("初始化 policies %+v", policies6) //分配了零值+内存地址
var policies2 *[]model.RoleAct //nil没有被初始化 没有分配内存地址
fmt.Printf("没有初始化 policies2 %+v\n", policies2)
policies2 = &[]model.RoleAct{}
fmt.Printf("初始化 policies2 %+v\n", *policies2) //解引用看到 分配了零值+内存地址
//为什么要强调这个,因为有些方法 是必须给内存地址的,比如
//grom 中find方法必须给内存地址的
list:=&[]model.RoleAct{}
DB.Find(list).Error
go学习笔记1
时间: 2024-10-05
评论: 暂无评论
1.打印多维数组
var data= [3][2]string{}
fmt.Printf("%#v\n", data))
[3][2]string{[2]string{"", ""}, [2]string{"", ""}, [2]string{"", ""}}
2.字母转int strconv.Atoi() alphabetic to int
int转字母 strconv.Itoa() int to alphabetic
3.time 常用方法
time.Now().Unix() 获取时间戳
time.Now().AddDate(years int, months int, days int) 增加时间
time.Now().Format("2006-01-02 15:04:05") //格式化时间
4.以下返回值是一样的
func getX2X3(a int) (int, int) {
return a * 2, a * 3
}
func getX2X32(a int) (a2 int, a3 int) {
a2 = a * 2
a3 = a * 3
return
}
golang new
时间: 2023-02-01
评论: 暂无评论
在 golang 中 new 是另外一种创建变量的方式。通过 new(T) 可以创建 T 类型的变量(这里 T 表示类型),初始值为 T 类型的 零值 , 返回值为其地址 (地址类型是 *T)。
package main
import "fmt"
func newInt1() *int {
return new(int)
}
func newInt2() *int {
var a int
return &a
}
func main() {
p := newInt1()
q := newInt2()
fmt.Println(p, q) // 0xc00001c0b8 0xc00001c0c0
}
go的切片
时间: 2022-05-29
评论: 暂无评论
1.看一段代码
func main() {
s1 := [5]int{0, 1, 2, 3, 4}
s2 := s1
s2[0] = 999
fmt.Println(s1) //[0 1 2 3 4]
s3 := []int{0, 1, 2, 3, 4}
s4 := s3
s4[0] = 999 //[999 1 2 3 4] s3数组是值类型,s3 切片本身是个引用类型
fmt.Println(s3)
}