kafka(三)——librdkafka编译与使用(c++)

linux下编译

  • 源码下载
    git clone https://github.com/edenhill/librdkafka
    
    • 配置、编译和安装
      # 进入目录
      cd librdkafka/
      # 配置
      ./configure
      # 编译
      make
      # 安装
      make install
      
      • 头文件和库目录
        # 头文件
        /usr/local/include/librdkafka
        rdkafkacpp.h
        rdkafka.h
        rdkafka_mock.h
        # 库
        /usr/local/lib
        librdkafka++.a
        librdkafka.a
        librdkafka++.so
        librdkafka.so
        librdkafka++.so.1
        librdkafka.so.1
        librdkafka-static.a
        

        windows下编译

        编译环境

        visual studio 2019

        依赖库

        依赖库直接下载源码编译即可。

        • openssl(使用的是1.1.0版本)
          • zlib(使用的静态库)
            • libcurl(使用的动态库)
              • zstd(使用的静态库)

                配置

                • 附加包含目录配置
                  • 附加库目录配置
                    • 附加依赖项配置

                      编译

                      生成c和c++动态库。

                      生产者

                      参数说明

                      参数描述
                      bootstrap.servers生产者连接集群所需的broker地址清单。
                      key.serializer和value.serializer指定发送消息的key和value的序列化类型。
                      buffer.memoryRecordAccumulator缓冲区总大小,默认32m。
                      batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
                      linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
                      acks0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。 默认值是-1,-1和all是等价的。
                      max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
                      retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
                      retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
                      enable.idempotence是否开启幂等性,默认true,开启幂等性。
                      compression.type生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。

                      示例

                      KafkaProducer.h

                      #ifndef _KAFKA_PRODUCER_H_
                      #define _KAFKA_PRODUCER_H_
                      #include "rdkafkacpp.h"
                      #include // 生产者投递报告回调
                      class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb 
                      {
                      public:
                      	void dr_cb(RdKafka::Message& message)
                      	{	
                      		if (message.err())   // err
                      		{
                                  printf("Message delivery failed:%s\n",message.errstr().c_str());
                      		} 
                      		else                
                      		{  
                                  printf("Message delivered to topic,topicName:%s,partition:%d\n",
                                      message.topic_name().c_str(),
                      				message.partition());
                      		}
                      	}
                      };
                      // 生产者事件回调函数
                      class ProducerEventCb : public RdKafka::EventCb 
                      {
                      public:
                      	void event_cb(RdKafka::Event &event) 
                      	{
                      		switch (event.type()) 
                      		{
                      		case RdKafka::Event::EVENT_ERROR:
                                  printf("RdKafka::Event::EVENT_ERROR: %s\n",
                                        RdKafka::err2str(event.err()).c_str());
                      			break;
                      		case RdKafka::Event::EVENT_STATS: 
                                  printf("RdKafka::Event::EVENT_STATS, event:%s\n",
                                        event.str().c_str());
                      			break;
                      		case RdKafka::Event::EVENT_LOG: 
                                  printf("RdKafka::Event::EVENT_LOG, fac:%s\n",
                                        event.fac().c_str());
                      			break;
                      		case RdKafka::Event::EVENT_THROTTLE:
                                  printf("RdKafka::Event::EVENT_THROTTLE, broker_name:%s\n",
                                        event.broker_name().c_str());
                      			break;
                      		}
                      	}
                      };
                      // 生产者自定义分区策略回调:partitioner_cb
                      class HashPartitionerCb : public RdKafka::PartitionerCb 
                      {
                      public:
                      	// @brief 返回 topic 中使用 key 的分区,msg_opaque 置 NULL
                      	// @return 返回分区,(0, partition_cnt)
                      	int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
                      		int32_t partition_cnt, void *msg_opaque) 
                      	{
                      		char msg[128] = {0};
                      		// 用于自定义分区策略:这里用 hash。例:轮询方式:p_id++ % partition_cnt
                      		int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
                      		// 输出:[topic][key][partition_cnt][partition_id],例 [test][6419][2][1]
                      		sprintf(msg, "HashPartitionerCb:topic:[%s], key:[%s], partition_cnt:[%d], partition_id:[%d]",
                      			topic->name().c_str(), key->c_str(), partition_cnt, partition_id);
                      		printf("msg: %s\n", msg);
                      		return partition_id;
                      	}
                      private:
                      	// 自定义哈希函数 
                      	static inline unsigned int generate_hash(const char *str, size_t len) 
                      	{
                      		unsigned int hash = 5381;
                      		for (size_t i = 0; i < len; i++)
                      			hash = ((hash << 5) + hash) + str[i];
                      		return hash;
                      	}
                      };
                      class CKafkaProducer 
                      {
                        public:
                          /**
                           * @brief CKafkaProducer
                           * @param brokers
                           * @param topic
                           * @param partition:默认分区数
                           */
                          explicit CKafkaProducer(const std::string &brokers, const std::string &topic, int partition);
                      	~CKafkaProducer();
                      	int Create();
                      	void Destroy();
                          /**
                           * @brief push Message to Kafka
                           * @param str, message data
                           */
                          void PushMessage(const std::string &str, const std::string &key);
                      private:
                      	std::string                m_brokers;          // Broker 列表,多个使用逗号分隔
                      	std::string                m_topicStr;         // Topic 名称
                      	int                        m_partition;        // 分区
                      	RdKafka::Conf*             m_config;           // Kafka Conf对象
                      	RdKafka::Conf*             m_topicConfig;      // Topic Conf对象
                      	RdKafka::Topic*            m_topic;            // Topic对象
                      	RdKafka::Producer*         m_producer;         // Producer对象
                      	RdKafka::DeliveryReportCb* m_dr_cb;            // 设置传递回调
                      	RdKafka::EventCb*          m_event_cb;         // 设置事件回调
                      	RdKafka::PartitionerCb*    m_partitioner_cb;   // 设置自定义分区回调
                      };
                      #endif // _KAFKA_PRODUCER_H_
                      

                      KafkaProducer.cpp

                      #include "KafkaProducer.h"
                      CKafkaProducer::CKafkaProducer(const std::string &brokers, const std::string &topic, int partition) 
                      : m_brokers(brokers)
                      , m_topicStr(topic)
                      , m_partition(partition)
                      , m_config(nullptr)
                      , m_topicConfig(nullptr)
                      , m_topic(nullptr)
                      , m_producer(nullptr)
                      , m_dr_cb(nullptr)
                      , m_event_cb(nullptr)
                      , m_partitioner_cb(nullptr)
                      {
                      }
                      MyKafkaProducer::~MyKafkaProducer()
                      {
                      	Destroy();
                      }
                      int MyKafkaProducer::Create()
                      {
                      	RdKafka::Conf::ConfResult errCode;           // 创建错误码
                      	std::string errorStr = "";                   // 返回错误信息   
                      	do 
                      	{
                      		// 创建配置对象
                      		// 1.1、创建 Kafka Conf 对象
                      		m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
                      		if (NULL == m_config) 
                      		{
                                  printf("Create RdKafka Conf failed.\n");
                      			break;
                      		}
                      		// 设置 Broker 属性       
                      		// (必要参数)指定 broker 地址列表。格式:host1:port1,host2:port2,...
                      		errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
                      		if (RdKafka::Conf::CONF_OK != errCode) 
                      		{
                                  printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n",
                                          errorStr.c_str());
                      			break;
                      		}
                      		// 设置生产者投递报告回调
                      		m_dr_cb = new ProducerDeliveryReportCb; // 创建投递报告回调
                      		errCode = m_config->set("dr_cb", m_dr_cb, errorStr);    // 异步方式发送数据
                      		if (RdKafka::Conf::CONF_OK != errCode) 
                      		{
                                  printf("Conf set(dr_cb) failed, errorStr:%s.\n",
                                          errorStr.c_str());
                      			break;
                      		}
                      		// 设置生产者事件回调
                      		m_event_cb = new ProducerEventCb; // 创建生产者事件回调
                      		errCode = m_config->set("event_cb", m_event_cb, errorStr);
                      		if (RdKafka::Conf::CONF_OK != errCode) 
                      		{
                                  printf("Conf set(event_cb) failed, errorStr:%s.\n",
                                          errorStr.c_str());
                      			break;
                      		}
                      		// 设置数据统计间隔
                      		errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
                      		if (RdKafka::Conf::CONF_OK != errCode) 
                      		{
                                  printf("Conf set(statistics.interval.ms) failed, errorStr:%s.\n",
                                          errorStr.c_str());
                      			break;
                      		}
                      		// 设置最大发送消息大小
                      		errCode = m_config->set("message.max.bytes", "10240000", errorStr);
                      		if (RdKafka::Conf::CONF_OK != errCode) 
                      		{
                                  printf("Conf set(message.max.bytes) failed, errorStr:%s.\n",
                                          errorStr.c_str());
                      			break;
                      		}
                      		// 2、创建 Topic Conf 对象
                      		m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
                      		if (NULL == m_topicConfig) 
                      		{
                                  printf("Create RdKafka Topic Conf failed.\n");
                      			break;
                      		}
                      		// 设置生产者自定义分区策略回调
                      		m_partitioner_cb = new HashPartitionerCb; // 创建自定义分区投递回调
                      		errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
                      		if (RdKafka::Conf::CONF_OK != errCode) 
                      		{
                                  printf("Conf set(partitioner_cb) failed, errorStr:%s.\n",
                                          errorStr.c_str());
                      			break;
                      		}
                      		// 2、创建对象
                      		// 2.1、创建 Producer 对象,可以发布不同的主题
                      		m_producer = RdKafka::Producer::create(m_config, errorStr);
                      		if (NULL == m_producer) 
                      		{
                                  printf("Create Producer failed, errorStr:%s.\n",
                                          errorStr.c_str());
                      			break;
                      		}
                      		// 2.2、创建 Topic 对象,可以创建多个不同的 topic 对象
                      		m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
                      		if (NULL == m_topic) 
                      		{
                                  printf("Create Topic failed, errorStr:%s.\n",
                                          errorStr.c_str());
                      			break;
                      		}
                              printf("Created producer success.\n");
                      		return 0;
                      	}while(0);
                      	Destroy();
                      	return -1;
                      }
                      void MyKafkaProducer::Destroy()
                      {
                      	while (nullptr !=m_producer && m_producer->outq_len() > 0) 
                      	{
                      		m_producer->flush(5000);
                      	}
                      	if(nullptr != m_config)
                      	{
                      		delete m_config;
                      		m_config = nullptr;
                      	}
                      	if(nullptr != m_topicConfig)
                      	{
                      		delete m_topicConfig;
                      		m_topicConfig = nullptr;
                      	}
                      	
                      	if(nullptr != m_topic)
                      	{
                      		delete m_topic;
                      		m_topic = nullptr;
                      	}
                      	if(nullptr != m_producer)
                      	{
                      		delete m_producer;
                      		m_producer = nullptr;
                      	}
                      	if(nullptr != m_dr_cb)
                      	{
                      		delete m_dr_cb;
                      		m_dr_cb = nullptr;
                      	}
                      	if(nullptr != m_event_cb)
                      	{
                      		delete m_event_cb;
                      		m_event_cb = nullptr;
                      	}
                      	if(nullptr != m_partitioner_cb)
                      	{
                      		delete m_partitioner_cb;
                      		m_partitioner_cb = nullptr;
                      	}
                      }
                      void MyKafkaProducer::PushMessage(const std::string &str, const std::string &key)
                      {
                      	int32_t len = (int32_t)str.length();
                      	void *payload = const_cast(static_cast(str.data()));
                      	// produce 方法,生产和发送单条消息到 Broker
                      	// 如果不加时间戳,内部会自动加上当前的时间戳
                      	RdKafka::ErrorCode errorCode = m_producer->produce(
                      		m_topic,                      // 指定发送到的主题
                      		RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过
                      		// partitioner_cb的回调选择合适的分区
                      		RdKafka::Producer::RK_MSG_COPY, // 消息拷贝
                      		payload,                        // 消息本身
                      		len,                            // 消息长度
                      		&key,                           // 消息key
                      		NULL
                      		);
                      	// 轮询处理
                      	m_producer->poll(0);
                      	if (RdKafka::ERR_NO_ERROR != errorCode) 
                      	{
                              printf("Produce failed,errorCode:%s\n",RdKafka::err2str(errorCode).c_str());
                      		// kafka 队列满,等待 100 ms
                      		if (RdKafka::ERR__QUEUE_FULL == errorCode) 
                      		{
                      			m_producer->poll(100);
                      		}
                      	}
                      }
                      

                      test.cpp

                      #include "KafkaProducer.h"
                      #include int main()
                      {
                      	std::string brokers = "127.0.0.1:9092";
                      	std::string topic = "first-topic-test";
                          auto producer = std::make_shared(brokers, topic, 1000);
                      	if(!producer.get())
                      		return -1;
                      	if(0 != producer->Create())
                      	{
                      		return -1;
                      	}
                      	std::string msg = "test kafka";
                      	std::string key = "xxx";         // 可选,涉及kafka保序策略
                      	producer->PushMessage(msg, key);
                      	producer->Destroy();
                      	delete producer;
                      	system("pause");
                      	return 0;
                      }
                      

                      消费者

                      参数说明

                      参数描述
                      bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。
                      key.deserializer和value.deserializer指定接收消息的key和value的反序列化类型。
                      group.id标记消费者所属的消费者组。
                      enable.auto.commit默认值为true,消费者会自动周期性地向服务器提交偏移量。
                      auto.commit.interval.ms如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
                      auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
                      offsets.topic.num.partitions__consumer_offsets的分区数,默认是50个分区。
                      heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。
                      session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
                      max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
                      fetch.min.bytes默认1个字节。消费者获取服务器端一批消息最小的字节数。
                      fetch.max.wait.ms默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
                      fetch.max.bytes默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。
                      max.poll.records一次poll拉取数据返回消息的最大条数,默认是500条。

                      示例

                      KafkaConsumer.h

                      #ifndef _KAFKA_CONSUMER_H_
                      #define _KAFKA_CONSUMER_H_
                      #include "rdkafkacpp.h"
                      #include #include // 设置事件回调
                      class ConsumerEventCb : public RdKafka::EventCb 
                      {
                      public:
                      	void event_cb(RdKafka::Event &event) 
                      	{
                      		switch (event.type()) 
                      		{
                      		case RdKafka::Event::EVENT_ERROR:
                      			break;
                      		case RdKafka::Event::EVENT_STATS:
                      			break;
                      		case RdKafka::Event::EVENT_LOG:
                      			break;
                      		case RdKafka::Event::EVENT_THROTTLE:
                      			break;
                      		default:
                      			break;
                      		}
                      	}
                      };
                      // 设置消费者组再平衡回调
                      // 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
                      class ConsumerRebalanceCb : public RdKafka::RebalanceCb 
                      {
                      private:
                      	// 打印当前获取的分区
                      	static void printTopicPartition(const std::vector& partitions) 
                      	{
                      		for (unsigned int i = 0; i < partitions.size(); i++) 
                      		{
                                  printf("count:%d, topic:%s,partition:%d\n",
                                        i, 
                      			      partitions[i]->topic().c_str(),
                      				  partitions[i]->partition());
                      		}
                      	}
                      public:
                      	// 消费者组再平衡回调
                      	void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
                      		std::vector &partitions) 
                      	{
                              printf("RebalanceCb: %s\n",RdKafka::err2str(err).c_str());
                      		printTopicPartition(partitions);
                      		// 分区分配成功
                      		if (RdKafka::ERR__ASSIGN_PARTITIONS == err) 
                      		{
                      			// 消费者订阅这些分区
                      			consumer->assign(partitions);
                      			// 获取消费者组本次订阅的分区数量,可以属于不同的topic
                      			m_partitionCount = (int)partitions.size();
                      		} 
                      		else   // 分区分配失败
                      		{
                      			// 消费者取消订阅所有的分区
                      			consumer->unassign();
                      			// 消费者订阅分区的数量为0
                      			m_partitionCount = 0;
                      		}
                      	}
                      private:
                      	int m_partitionCount;    // 消费者组本次订阅的分区数量
                      };
                      class CKafkaConsumer 
                      {
                      public:
                      	/**
                           * @brief CKafkaConsumer
                           * @param brokers
                      	 * @param groupID:消费者组名称
                           * @param topics
                           * @param partition:默认分区数
                           */
                      	explicit CKafkaConsumer(const std::string &brokers,
                      						   const std::string &groupID,
                      						   const std::vector &topics,
                      						   int partition);
                      	~CKafkaConsumer();
                      	int Create();
                      	void Destroy();
                      	void PullMessage();
                      public:
                      	void OnRecv();
                      private:
                      	void ConsumeMsg_(RdKafka::Message *msg, void *opaque);
                      private:
                          std::string m_brokers;
                          std::string m_groupID;
                          std::vector m_topicVector;
                          int m_partition;
                          RdKafka::Conf*             m_config;
                          RdKafka::Conf*             m_topicConfig;
                          RdKafka::KafkaConsumer*    m_consumer;
                          RdKafka::EventCb*          m_event_cb;
                          RdKafka::RebalanceCb*      m_rebalance_cb;
                      	std::thread m_thread;
                      	bool m_running;
                          typedef std::lock_guard RecursiveGuard;
                      	std::recursive_mutex mutex_; 
                      };
                      #endif // _KAFKA_CONSUMER_H_
                      

                      KafkaConsumer.cpp

                      #include "MyKafkaConsumer.h"
                      static int ConsumerWorker(void* param)
                      {
                      	CKafkaConsumer* consumer = (CKafkaConsumer*)param;
                      	if (consumer)
                      	{
                      		consumer->OnRecv();
                      		return 0;
                      	}
                      	return -1;
                      }
                      CKafkaConsumer::CKafkaConsumer(const std::string &brokers, const std::string &groupID, const std::vector &topics, int partition) 
                      : m_brokers(brokers)
                      , m_groupID(groupID)
                      , m_topicVector(topics)
                      , m_partition(partition)
                      , m_running(true)
                      {
                      }
                      CKafkaConsumer::~CKafkaConsumer()
                      {
                      	Destroy();
                      }
                      int CKafkaConsumer::Create()
                      {
                      	std::string errorStr;
                      	RdKafka::Conf::ConfResult errorCode;
                      	do 
                      	{
                      		// 1、创建配置对象
                      		// 1.1、构造 consumer conf 对象
                      		m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
                      		if(nullptr == m_config)
                      		{
                                  printf("Create RdKafka Conf failed.\n");
                      			break;
                      		}
                      		// 必要参数1:指定 broker 地址列表
                      		errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 必要参数2:设置消费者组 id
                      		errorCode = m_config->set("group.id", m_groupID, errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(group.id) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 设置事件回调
                      		m_event_cb = new ConsumerEventCb;
                      		errorCode = m_config->set("event_cb", m_event_cb, errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(event_cb) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 设置消费者组再平衡回调
                      		m_rebalance_cb = new ConsumerRebalanceCb;
                      		errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(rebalance_cb) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件
                      		errorCode = m_config->set("enable.partition.eof", "false", errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(enable.partition.eof) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 每次最大拉取的数据大小
                      		errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(max.partition.fetch.bytes) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 设置分区分配策略:range、roundrobin、cooperative-sticky
                      		errorCode = m_config->set("partition.assignment.strategy", "range", errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(partition.assignment.strategy) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 心跳探活超时时间---1s
                      		errorCode = m_config->set("session.timeout.ms", "6000", errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(session.timeout.ms) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 心跳保活间隔
                      		errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(heartbeat.interval.ms) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 1.2、创建 topic conf 对象
                      		m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
                      		if (nullptr == m_topicConfig) 
                      		{
                                  printf("Create RdKafka Topic Conf failed.\n");
                      			break;
                      		}
                      		// 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
                      		errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Topic Conf set(auto.offset.reset) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 默认 topic 配置,用于自动订阅 topics
                      		errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
                      		if (RdKafka::Conf::CONF_OK != errorCode) 
                      		{
                                  printf("Conf set(default_topic_conf) failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                      		// 2、创建 Consumer 对象
                      		m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
                      		if (nullptr == m_consumer) 
                      		{
                                  printf("Create KafkaConsumer failed, errorStr:%s.\n",
                                        errorStr.c_str());
                      			break;
                      		}
                              printf("Created consumer success, consumerName:%s.\n",
                                        m_consumer->name().c_str());
                      		return 0;
                      	} while (0);
                      	Destroy();
                      	return -1;
                      }
                      void CKafkaConsumer::Destroy()
                      {
                      	m_running = false;
                      	if (m_thread.joinable())
                      		m_thread.join();
                      	if(nullptr != m_consumer)
                      		m_consumer->close();
                      	if(nullptr != m_config)
                      	{
                      		delete m_config;
                      		m_config = nullptr;
                      	}
                      	if(nullptr != m_topicConfig)
                      	{
                      		delete m_topicConfig;
                      		m_topicConfig = nullptr;
                      	}
                      	if(nullptr != m_consumer)
                      	{
                      		delete m_consumer;
                      		m_consumer = nullptr;
                      	}
                      	
                      	if(nullptr != m_event_cb)
                      	{
                      		delete m_event_cb;
                      		m_event_cb = nullptr;
                      	}
                      	if(nullptr != m_rebalance_cb)
                      	{
                      		delete m_rebalance_cb;
                      		m_rebalance_cb = nullptr;
                      	}
                      }
                      void CKafkaConsumer::PullMessage()
                      {
                          m_thread = std::thread(ConsumerWorker, this);
                      }
                      void CKafkaConsumer::ConsumeMsg_(RdKafka::Message *msg, void *opaque)
                      {
                      	switch (msg->err()) 
                      	{
                      	case RdKafka::ERR__TIMED_OUT: // 超时
                      		break;
                      	case RdKafka::ERR_NO_ERROR:   // 有消息进来
                              printf("Recv Message. topic:%s, partition:[%d], key:%s, payload:%s\n",
                                  msg->topic_name().c_str(), 
                      			msg->partition(), 
                      			msg->key()->c_str(), 
                      			(char *)msg->payload());
                      		break;
                      	default:
                      		break;
                      	}
                      }
                      void CKafkaConsumer::OnRecv()
                      {
                      	if(nullptr == m_consumer)
                      		return;
                      	// 后续可扩展
                      	std::vector topicVector;
                      	{
                      		RecursiveGuard mtx(mutex_);
                      		topicVector = m_topicVector;
                      	}
                      	// 1、订阅主题
                      	RdKafka::ErrorCode errorCode = m_consumer->subscribe(topicVector);
                      	if (RdKafka::ERR_NO_ERROR != errorCode) 
                      	{
                              printf("Subscribe failed, errorStr:%s\n", RdKafka::err2str(errorCode).c_str());
                      		return;
                      	}
                      	// 2、拉取并消费消息
                      	while (m_running) 
                      	{
                      		RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
                      		if(nullptr != msg)
                      		{
                      			// 消费消息
                      			ConsumeMsg_(msg, nullptr);
                      			delete msg;
                                  msg = nullptr;
                      		}
                      	}
                      	// 同步提交,Consumer 关闭前调用,等待 broker 返回读取消息
                      	if(nullptr != m_consumer)
                      		m_consumer->commitSync(); 
                      }
                      

                      test.cpp

                      #include "KafkaConsumer.h"
                      #include int main()
                      {
                      	std::string brokers = "127.0.0.1:9092";
                      	std::string groupID = "test";
                      	std::vector topics;
                      	topics.push_back("first-topic-test");
                      	auto comsumer = std::make_shared(brokers, groupID, topics, 1000);
                      	if(!comsumer.get())
                      		return -1;
                      	if(0 != comsumer->Create())
                      		return -1;
                      	
                      	comsumer->PullMessage();
                          
                          system("pause");
                      	return 0;
                      }