golang kafka 使用方法,秒杀案例(segmentio/kafka-go)

作者: 时间: 2025-01-06 评论: 暂无评论
package main

import (
    "context"
    "github.com/segmentio/kafka-go"
    "gotribe-admin/pkg/util"
    "log"
)

func main() {
    type DoOrderEvent struct {
        ShopID int `json:"shop_id"`
        Uid    int `json:"uid"`
    }
    data := DoOrderEvent{
        ShopID: 10086,
        Uid:    44,
    }
    w := &kafka.Writer{
        Addr:        kafka.TCP("localhost:9092"),
        Topic:       "order_event",
        MaxAttempts: 3,
        Async:       true, //异步发送消息 提高效率
        BatchSize:   10,   //批量发送消息
        //WriteTimeout:  3,

        RequiredAcks: kafka.RequireAll, // 表示所有副本节点都确认消息后才返回
        // RequireNone (0)  fire-and-forget, do not wait for acknowledgements from the
        //  RequireOne  (1)  wait for the leader to acknowledge the writes
        //  RequireAll  (-1) wait for the full ISR to acknowledge the writes

        //Completion: ,表示消息发送完成后的回调函数
        AllowAutoTopicCreation: true, //否允许自动创建主题

    }
    err := w.WriteMessages(context.Background(), kafka.Message{
        Key:        []byte(util.UUID()),
        Value:      []byte(util.Struct2Json(data)),
        WriterData: data,
    })
    if err != nil {
        log.Fatal("failed to write messages:", err)
    }

    // 关闭生产者
    if err := w.Close(); err != nil {
        log.Fatal("failed to close writer:", err)
    }

    //启用消费者
    go func() {
        kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{})
        r := kafka.NewReader(kafka.ReaderConfig{
            Brokers:  []string{"localhost:9092"},
            Topic:    "order_event",
            GroupID:  "consumer_order_sec",
            MinBytes: 10e3, // 10KB
            MaxBytes: 10e6, // 10MB
        })
        defer r.Close()
        for {
            //m, err := r.ReadMessage(c) //阻塞获取消息 如果是消费组消费 自动提交偏移量
            m, err := r.FetchMessage(c) //获取消息 如果是消费组消费 不自动提交偏移量  需要手动提交 Consumer Lag
            if err != nil {
                log.Fatal("failed to read message:", err)
            }
            log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
            if err := r.CommitMessages(c, m); err != nil {
                log.Fatal("bad commit message: ", err)
            }

        }

    }()
    select {}

}

go学习笔记2

作者: 时间: 2024-12-25 评论: 暂无评论
var policies []model.RoleAct
policies6 := &[]model.RoleAct{}
fmt.Printf("初始化 policies %+v", policies) //分配了零值+内存地址
fmt.Printf("初始化 policies %+v", policies6) //分配了零值+内存地址

var policies2 *[]model.RoleAct //nil没有被初始化 没有分配内存地址
fmt.Printf("没有初始化 policies2 %+v\n", policies2)
policies2 = &[]model.RoleAct{}
fmt.Printf("初始化 policies2 %+v\n", *policies2) //解引用看到  分配了零值+内存地址
//为什么要强调这个,因为有些方法 是必须给内存地址的,比如
//grom 中find方法必须给内存地址的
list:=&[]model.RoleAct{}
DB.Find(list).Error

go学习笔记1

作者: 时间: 2024-10-05 评论: 暂无评论

1.打印多维数组

var data= [3][2]string{}
fmt.Printf("%#v\n", data))
[3][2]string{[2]string{"", ""}, [2]string{"", ""}, [2]string{"", ""}}

2.字母转int strconv.Atoi() alphabetic to int
int转字母 strconv.Itoa() int to alphabetic

3.time 常用方法

time.Now().Unix() 获取时间戳 
time.Now().AddDate(years int, months int, days int) 增加时间
time.Now().Format("2006-01-02 15:04:05")  //格式化时间

4.以下返回值是一样的

func getX2X3(a int) (int, int) {
    return a * 2, a * 3
}
func getX2X32(a int) (a2 int, a3 int) {
    a2 = a * 2
    a3 = a * 3
    return
}

golang new

作者: 时间: 2023-02-01 评论: 暂无评论

在 golang 中 new 是另外一种创建变量的方式。通过 new(T) 可以创建 T 类型的变量(这里 T 表示类型),初始值为 T 类型的 零值返回值为其地址 (地址类型是 *T)。

package main

import "fmt"

func newInt1() *int {

return new(int)

}

func newInt2() *int {

var a int
return &a

}

func main() {

p := newInt1()
q := newInt2()
fmt.Println(p, q) // 0xc00001c0b8 0xc00001c0c0

}

go的切片

作者: 时间: 2022-05-29 评论: 暂无评论

1.看一段代码
func main() {

    s1 := [5]int{0, 1, 2, 3, 4}
    s2 := s1
    s2[0] = 999
    fmt.Println(s1)   //[0 1 2 3 4]

    s3 := []int{0, 1, 2, 3, 4}
    s4 := s3
    s4[0] = 999    //[999 1 2 3 4]   s3数组是值类型,s3 切片本身是个引用类型
    fmt.Println(s3)
}