Go 如何通过 Kafka 客户端库 生产与消费消息

文章目录

  • 0.前置说明
    • 1. confluent-kafka-go
    • 2. sarama
    • 3. segmentio/kafka-go
    • 4. franz-go
      • 选择建议
      • 1.启动 kafka 集群
      • 2.安装 confluent-kafka-go 库
      • 3.创建生产者
        • 特殊文件说明
        • 如何查看.log文件内容
        • 4.创建消费者

          0.前置说明

          Go 语言中有一些流行的 Kafka 客户端库。以下是几个常用的库及其优劣与区别:

          1. confluent-kafka-go

          • 优点:

            • 高性能:基于 librdkafka,性能非常高。
            • 功能全面:支持 Kafka 的所有高级功能,如事务、压缩、认证等。
            • 社区支持:由 Confluent 维护,社区活跃,文档丰富。
            • 稳定性:广泛使用于生产环境,经过大量测试和验证。
            • 缺点:

              • 依赖性:依赖于 librdkafka,需要额外安装该库。
              • 复杂性:配置和使用相对复杂,特别是对于新手。

                2. sarama

                • 优点:

                  • 纯 Go 实现:不依赖于任何 C 库,安装和使用非常方便。
                  • 社区活跃:由 Shopify 维护,社区支持良好,文档齐全。
                  • 灵活性:提供了丰富的配置选项,适用于各种使用场景。
                  • 缺点:

                    • 性能:相对于 confluent-kafka-go,性能稍逊一筹。
                    • 功能:不支持 Kafka 的一些高级功能,如事务。

                      3. segmentio/kafka-go

                      • 优点:

                        • 纯 Go 实现:不依赖于任何 C 库,安装和使用非常方便。
                        • 简洁易用:API 设计简洁,易于上手。
                        • 灵活性:支持多种配置选项,适用于各种使用场景。
                        • 缺点:

                          • 性能:相对于 confluent-kafka-go,性能稍逊一筹。
                          • 功能:不支持 Kafka 的一些高级功能,如事务。

                            4. franz-go

                            • 优点:

                              • 纯 Go 实现:不依赖于任何 C 库,安装和使用非常方便。
                              • 高性能:在纯 Go 实现中性能较为优越。
                              • 功能全面:支持 Kafka 的大部分功能,包括事务。
                              • 缺点:

                                • 社区支持:相对于 sarama 和 confluent-kafka-go,社区支持稍弱。
                                • 文档:文档相对较少,需要更多的社区贡献。

                                  选择建议

                                  • 高性能和高级功能需求:如果你需要高性能和 Kafka 的高级功能(如事务、压缩、认证等),confluent-kafka-go 是一个不错的选择。
                                  • 纯 Go 实现和易用性:如果你更倾向于使用纯 Go 实现的库,并且希望安装和使用更加简便,可以选择 sarama 或 segmentio/kafka-go。
                                  • 平衡性能和功能:如果你希望在纯 Go 实现中获得较好的性能和功能支持,可以考虑 franz-go。

                                    本文我们就以confluent-kafka-go库为例来编写代码。

                                    1.启动 kafka 集群

                                    不知道如何搭建集群请点击这里 ----》Kafka 集群部署(CentOS 单机模拟版)

                                    如果你懒得启动集群,那么直接跳过。

                                    1. 在cluster目录下运行集群启动脚本 cluster.sh;
                                    cd cluster
                                    ./cluster.sh
                                    
                                    1. 检查是否启动成功;
                                    ll zookeeper-data/
                                    total 4
                                    drwxr-xr-x 3 root root 4096 May 27 10:20 zookeeper
                                    ll broker-data/
                                    total 12
                                    drwxr-xr-x 2 root root 4096 May 27 10:21 broker-1
                                    drwxr-xr-x 2 root root 4096 May 27 10:21 broker-2
                                    drwxr-xr-x 2 root root 4096 May 27 10:21 broker-3
                                    

                                    2.安装 confluent-kafka-go 库

                                    1. 查看你的go工作目录
                                    echo $GOPATH
                                    
                                    1. 在GOPATH目录下的src目录下新建 produce 项目
                                    mkdir src/produce
                                    cd src/produce
                                    
                                    1. 在你的项目目录中运行 go mod init 命令来初始化一个新的 Go 模块
                                    go mod init produce
                                    
                                    1. 安装 confluent-kafka-go 库
                                    go get github.com/confluentinc/confluent-kafka-go/kafka
                                    

                                    3.创建生产者

                                    1. 新建文件 producer.go
                                    touch producer.go
                                    
                                    1. 编写代码
                                    package main
                                    import (
                                    	"fmt"
                                    	"log"
                                    	"github.com/confluentinc/confluent-kafka-go/kafka"
                                    )
                                    func main() {// 创建生产者实例
                                    	broker := "localhost:9091" // 集群地址
                                    	topic := "test"            // 主题名称
                                    	producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) // 创建生产者实例
                                    	// 检查错误
                                    	if err != nil {log.Fatalf("Failed to create producer: %s", err)
                                    	}
                                    	defer producer.Close()
                                    	fmt.Printf("Created Producer %v\n", producer)
                                    	// 生产消息
                                    	message := "hello kafka"
                                    	for i := 0; i < 10; i++ {producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, // 任题名称
                                    			Value:          []byte(message + fmt.Sprintf("%d", i)),                             // 消息内容
                                    		}, nil)
                                    	}
                                    	if err != nil {log.Fatalf("Failed to produce message: %v", err)
                                    	}
                                    	// 等待消息发送完成
                                    	e := <-producer.Events() // 阻塞直到消息发送完成
                                    	switch ev := e.(type) {case *kafka.Message:
                                    		if ev.TopicPartition.Error != nil {log.Printf("Failed to deliver message: %v", ev.TopicPartition)
                                    		} else {fmt.Printf("Delivered message: %s to %v\n", string(ev.Value), ev.TopicPartition)
                                    		}
                                    	}
                                    	// 冲刷缓冲区消息
                                    	producer.Flush(15 * 1000)
                                    }
                                    

                                    代码说明

                                    1. 创建生产者时需要指定集群地址以及主题信息,如果没有该主题则自动创建。
                                    2. 生产者会异步地将消息发送到 Kafka,因此你需要处理交付报告以确保消息成功发送。

                                    我们需要了解一下Go语言和Kafka之间的关系:Go是一种静态类型、编译型的编程语言,由Google开发并开源。它适用于构建高性能服务器端应用程序和网络服务。而Apache Kafka是一个分布式流处理平台,主要面向大规模数据传输和存储。

                                    在这个例子中,我们有一个生产者程序,它使用Kafka的客户端库来连接到Kafka集群,然后通过创建一个生产者实例来开始发送消息。当生产者准备好要发送的消息时,它就会调用Send()方法将其添加到缓冲区中。一旦缓冲区满了或者用户主动触发了Flush()方法,生产者就会把缓冲区里的所有消息一起发送给Kafka集群。

                                    1. 编译运行,生产者发送消息
                                    go build producer.go 
                                    ./producer 
                                    Created Producer rdkafka#producer-1
                                    Delivered message: hello kafka0 to test[0]@0
                                    
                                    1. 查看消息
                                    ll cluster/broker-data/broker-1
                                    total 20
                                    -rw-r--r-- 1 root root    0 May 27 10:20 cleaner-offset-checkpoint
                                    -rw-r--r-- 1 root root    4 May 27 11:36 log-start-offset-checkpoint
                                    -rw-r--r-- 1 root root   88 May 27 10:20 meta.properties
                                    -rw-r--r-- 1 root root   13 May 27 11:36 recovery-point-offset-checkpoint
                                    -rw-r--r-- 1 root root   14 May 27 11:36 replication-offset-checkpoint
                                    drwxr-xr-x 2 root root 4096 May 27 11:21 test-0 # 我们创建的主题 数字代表分区号
                                    ll cluster/broker-data/broker-1/test-0/
                                    total 12
                                    -rw-r--r-- 1 root root 10485760 May 27 11:21 00000000000000000000.index
                                    -rw-r--r-- 1 root root      251 May 27 11:21 00000000000000000000.log
                                    -rw-r--r-- 1 root root 10485756 May 27 11:21 00000000000000000000.timeindex
                                    -rw-r--r-- 1 root root        8 May 27 11:21 leader-epoch-checkpoint
                                    -rw-r--r-- 1 root root       43 May 27 11:21 partition.metadata
                                    

                                    特殊文件说明

                                    Kafka 的数据文件存储在每个分区的目录中,这些文件包括 .index、.log、.timeindex、leader-epoch-checkpoint 和 partition.metadata 文件。每个文件都有其特定的用途,下面是对这些文件的详细解释:

                                    1. .log 文件:

                                      • 用途:存储实际的消息数据。
                                      • 描述:这是 Kafka 中最重要的文件,包含了生产者发送到 Kafka 的消息。每个 .log 文件代表一个日志段(log segment),文件名通常是该段的起始偏移量(offset)。
                                      • .index 文件:

                                        • 用途:存储消息偏移量到物理文件位置的映射。
                                        • 描述:这个文件是一个稀疏索引,允许 Kafka 快速查找特定偏移量的消息。通过这个索引,Kafka 可以避免从头开始扫描整个日志文件,从而提高查找效率。
                                        • .timeindex 文件:

                                          • 用途:存储消息时间戳到物理文件位置的映射。
                                          • 描述:这个文件允许 Kafka 根据时间戳快速查找消息。它是一个稀疏索引,类似于 .index 文件,但索引的是时间戳而不是偏移量。
                                          • leader-epoch-checkpoint 文件:

                                            • 用途:记录分区的领导者纪元(leader epoch)信息。
                                            • 描述:这个文件包含了每个纪元的起始偏移量。领导者纪元是 Kafka 用来跟踪分区领导者变化的机制。每次分区领导者发生变化时,纪元号会增加。这个文件帮助 Kafka 在领导者变更时进行数据恢复和一致性检查。
                                            • partition.metadata 文件:

                                              • 用途:存储分区的元数据信息。
                                              • 描述:这个文件包含了分区的一些基本信息,如分区的版本号等。它帮助 Kafka 管理和维护分区的元数据。

                                    这些文件共同作用,确保 Kafka 能够高效、可靠地存储和检索消息数据。

                                    如何查看.log文件内容

                                    • 执行指令
                                       ~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
                                      
                                      ~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
                                      Dumping ./00000000000000000000.log
                                      Log starting offset: 0
                                      baseOffset: 0 lastOffset: 9 count: 10 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1716780091840 size: 251 magic: 2 compresscodec: none crc: 997822510 isvalid: true
                                      | offset: 0 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka0
                                      | offset: 1 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka1
                                      | offset: 2 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka2
                                      | offset: 3 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka3
                                      | offset: 4 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka4
                                      | offset: 5 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka5
                                      | offset: 6 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka6
                                      | offset: 7 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka7
                                      | offset: 8 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka8
                                      | offset: 9 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka9
                                      

                                      如上我们可以看到消息已经成功的发送。

                                      4.创建消费者

                                      1. 创建消费者项目
                                      mkdir src/consume
                                      cd src/consume
                                      
                                      1. 在你的项目目录中运行 go mod init 命令来初始化一个新的 Go 模块
                                      go mod init consume
                                      
                                      1. 安装 confluent-kafka-go 库
                                      go get github.com/confluentinc/confluent-kafka-go/kafka
                                      
                                      1. 新建文件
                                      touch consumer.go
                                      
                                      1. 编写代码
                                      package main
                                      import (
                                      	"fmt"
                                      	"log"
                                      	"github.com/confluentinc/confluent-kafka-go/kafka"
                                      )
                                      func main() {// 创建消费者实例
                                      	broker := "localhost:9091" // 集群地址
                                      	topic := "test"            // 主题名称
                                      	c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": broker,     // 集群地址
                                      		"group.id":          "my-group", // 消费者组
                                      		"auto.offset.reset": "earliest", // 设置偏移量 从头开始消费
                                      	})
                                      	// 检查错误
                                      	if err != nil {log.Printf("Failed to create consumer: %s\n", err)
                                      	}
                                      	defer c.Close()
                                      	// 描述订阅主题
                                      	c.SubscribeTopics([]string{topic}, nil)
                                      	fmt.Printf("Consuming topic %s\n", topic)
                                      	// 消费消息
                                      	for {msg, err := c.ReadMessage(-1) // 阻塞直到消息到达
                                      		if err == nil {fmt.Printf("Consumed message: %s\n", msg.Value)
                                      		} else {// 消费者错误
                                      			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
                                      		}
                                      	}
                                      }
                                      
                                      1. 编译并运行
                                      go build consumer.go 
                                      ./consumer 
                                      Consuming topic test
                                      Consumed message: hello kafka0
                                      Consumed message: hello kafka1
                                      Consumed message: hello kafka2
                                      Consumed message: hello kafka3
                                      Consumed message: hello kafka4
                                      Consumed message: hello kafka5
                                      Consumed message: hello kafka6
                                      Consumed message: hello kafka7
                                      Consumed message: hello kafka8
                                      Consumed message: hello kafka9
                                      

                                      可以看到已经成功的消费刚才生产的消息。