Kafka 组件

kafka 消息队列组件我们依赖的开源包是 github.com/Shopify/sarama

按照组件的设计,我们定义了自己的 kafka 结构对其进行了组合,在保留其原生的功能之外,以便扩展。

// yago/coms/kafka/kafka.go
type Kafka struct {
    connect string
    config  *cluster.Config
}

所以你可以查看 sarama 官方文档 来获取所有支持的 api。

本文中仅介绍部分常用的 api 以及扩展的 api。

1. 配置 kafka 组件

[kafka]
# 多个 broker 用逗号分隔
cluster = "127.0.0.1:9092"
topic = "demo"

我们在模版 app.toml 中默认配置开启了 kafka 组件,可根据实际情况进行调整。

2. 使用 kafka 组件

  • 同步生产消息
k := kafka.Ins()
p, err := k.SyncProducer()
if err != nil {
    log.Fatalf("init sync producer error: %s", err)
}
defer p.close()

for i := 0; i < 10; i++ {
    _, _, err = p.Produce("demo", fmt.Sprintf("sync msg: %d", i))
    if err != nil {
        log.Fatalf("sync produce error: %s", err)
    }
}
  • 异步生产消息
k := kafka.Ins()
p, err := k.AsyncProducer()
if err != nil {
    log.Fatalf("init async producer error: %s", err)
}
defer p.close()

for i := 0; i < 10; i++ {
    p.Produce("demo", fmt.Sprintf("async msg: %d", i))
}
  • 消费消息
k := kafka.Ins()
// 可以指定多个topic
consumer, err := k.NewConsumer("zjl", "demo", "demo1")
if err != nil {
    t.Errorf("new consumer error: %s", err)
}
// 如果匿名函数返回false,则跳过ack
err = consumer.Consume(func(topic string, bytes []byte) bool {
    if strings.Contains(string(bytes), "5") {
        return false
    }
    fmt.Println(topic, string(bytes))
    return true
})
if err != nil {
    t.Errorf("consume error: %s", err)
}

results matching ""

    No results matching ""