目录
一、Kafka单机版安装
1.解压&改名
2.修改配置文件
3.配置环境变量
4.启动kafka之前一定要先启动zookeeper
5.kafka一键启停脚本
二、Kafka基本命令(重要)
1.topic常用命令
(1)查看topic
(2)创建topic
(3)修改topic
(4)删除topic
2.生产者
3.消费者
4.为什么kafka的bootstrap后面写几台机器都可以
5.消费者偏移量
6.消费者组
7.配置管理 kafka-config
8.ISR队列
一、Kafka单机版安装
1.解压&改名
[root@lxm147 ~]# tar -zxf /opt/install/kafka_2.12-2.8.0.tgz -C /opt/soft/ [root@lxm147 ~]# mv /opt/soft/kafka_2.12-2.8.0 /opt/soft/kafka212
2.修改配置文件
[root@lxm147 ~]# cd /opt/soft/kafka212/config/ [root@lxm147 ~]# vim ./server.properties 21 broker.id=0 36 advertised.listeners=PLAINTEXT://192.168.180.147:9092 60 log.dirs=/opt/soft/kafka212/data 消息存放目录 103 log.retention.hours=1680 消息存放时间:小时 123 zookeeper.connect=192.168.180.147:2181 连接zookeeper 137 delete.topic.enable=true 设置可以对topic删除,默认不能删除 [root@lxm147 ~]# mkdir /opt/soft/kafka212/data [root@lxm147 ~]# echo "0">/opt/soft/kafka212/data/myid
3.配置环境变量
[root@lxm147 config]# vim /etc/profile # KAFKA_HOEM export KAFKA_HOME=/opt/soft/kafka212 export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile
4.启动kafka之前一定要先启动zookeeper
# 启动zookeeper [root@lxm147 config]# zkServer.sh start [root@lxm147 config]# zkServer.sh status # 这条命令可以查看zookeeper是否成功启动 # 启动kafka的3种方式: kafka-server-start.sh /opt/soft/kafka212/config/server.properties kafka-server-start.sh -daemon /opt/soft/kafka212/config/server.properties nohup kafka-server-start.sh /opt/soft/kafka212/config/server.properties & #后台启动 # 查看启动进程: [root@lxm147 config]# jps 2177 QuorumPeerMain 2247 Kafka
注意:集群版的安装与单机版类似
1.配置文件/opt/soft/kafka212/config/server.properties下的broker节点数不能一样
2.记得将配置文件传输到每一个节点
3.每一个节点的环境变量也要进行配置和刷新
4.可以将kafka的命令配置为全局可调用,具体操作方法可以去网上查找
5.集群版将环境变量写在/etc/profile.d/my_env.sh文件中,否则可能会启动Kafka集群失败
5.kafka一键启停脚本
#!/bin/bash case $1 in "start"){ for i in node141 node142 node143 do echo " --------启动 $i Kafka-------" ssh $i "/opt/soft/kafka/bin/kafka-server-start.sh -daemon /opt/soft/kafka/config/server.properties" done };; "stop"){ for i in node141 node142 node143 do echo " --------停止 $i Kafka-------" ssh $i "/opt/soft/kafka/bin/kafka-server-stop.sh" done };; esac
二、Kafka基本命令(重要)
1.topic常用命令
(1)查看topic
# 查看消息队列 kafka-topics.sh --zookeeper lxm147:2181 --list # 查看队列详情 kafka-topics.sh --zookeeper lxm147:2181 --describe --topic bigdata # 查询指定队列消息数量 kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic bigdata
(2)创建topic
# 创建topic并指定分区数 kafka-topics.sh --zookeeper lxm147:2181 --create --topic bigdata --partitions 1 --replication-factor 1 # 创建topic时手动指定分区的存放位置 kafka-topics.sh --zookeeper node141:2181 --create --topic tpc-1 --replica-assignment 0:1:3,1:2:6 该 topic,将有如下 partition: partition0 ,所在节点: broker0(leader)、broker1、broker3 partition1 ,所在节点: broker1(leader)、broker2、broker6 kafka-topics.sh --zookeeper node141:2181 --create --topic abcy --replica-assignment 0:1,2:0 kafka-topics.sh --zookeeper node141:2181 --describe --topic abcy Topic: abcy PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: abcy Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: abcy Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
(3)修改topic
# 增加分区 kafka-topics.sh --zookeeper node141:2181 --alter --topic abcx --partitions 3 kafka-topics.sh --zookeeper node141:2181 --describe --topic abcx Topic: abcx PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: abcx Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: abcx Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: abcx Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0 # 修改配置flush.ms=100 kafka-topics.sh --zookeeper node141:2181 --alter --topic abcx --config flush.ms=100 kafka-topics.sh --zookeeper node141:2181 --describe --topic abcx Topic: abcx PartitionCount: 3 ReplicationFactor: 2 Configs: flush.ms=100 Topic: abcx Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: abcx Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: abcx Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
其他修改配置方法参见kafka官方文档
(4)删除topic
# 删除topic kafka-topics.sh --zookeeper lxm147:2181 --delete --topic bigdata
2.生产者
# 生产消息 [root@node141 ~]# kafka-console-producer.sh --topic abcx --broker-list node141:9092 >hello,world >hello,java >
3.消费者
当消费者去消费一个不存在的主题时,消费者可以自动创建该主题。但是自动创建的topic的分区数和副本数都默认为1。因此topic尽量不要自动创建,提前手动创建好。
# 消费消息时,注意指定偏移量 --from-beginning 从最头开始消费 [root@node141 ~]# kafka-console-consumer.sh --bootstrap-server node141:9092 --from-beginning --topic abcx hello,java hello,world
当消费 Kafka 消息时,如果没有明确指定偏移量(offset):对于新的消费者组,从最新的消息开始消费;对于已存在的消费者组,从上次提交的偏移量处继续消费。
需要注意的是,消费者的起始偏移量可以通过配置参数进行自定义,例如可以通过设置 auto.offset.reset 参数来修改消费者组的默认偏移量策略。此外,还可以使用 Kafka 提供的API来手动控制消费者的偏移量,以实现更精确的消费行为。
消费者在消费数据时,是先把一个分区的数据消费完后,再去消费下一个分区。
生产者写数据,只能写给分区的leader副本;消费者读数据,也只能从分区的leader副本来读。
4.为什么kafka的bootstrap后面写几台机器都可以
Kafka的bootstrap servers配置是用来初始化Kafka客户端的。在配置中,您可以指定一个或多个Kafka broker的地址。这些地址用于客户端在启动时与集群建立初始连接。一旦客户端与集群建立连接,它就可以从集群中的任何broker接收元数据,包括其他broker的地址。
因此,可以在bootstrap servers配置中指定多台机器,这样客户端在启动时可以有多个连接选项。这是有用的,因为如果一个broker不可用,客户端可以使用其他可用的broker。这增加了客户端的容错能力和可靠性。
在实际使用中,通常建议将所有broker的地址都添加到bootstrap servers配置中,这样客户端可以均匀地分散连接到集群中的各个broker,从而实现负载均衡。
5.消费者偏移量
消费者在消费的时候,需要指定要订阅的主题,还可以指定消费的起始偏移量,起始偏移量的指定策略有 4 种:
(1)earliest:从最早的消息开始消费
(2)latest:从最新的消息开始消费
(3)指定的 offset( 分区号:偏移量):从指定的位置开始消费
(4)从之前所记录的偏移量开始消费
kafka 的 topic 中的消息,是有序号的(序号叫消息偏移量),而且消息的偏移量是在各个 partition 中独立维护的,在各个分区内,都是从 0 开始递增编号。消息(数据)的offset在topich中并不会有全局的递增号。
这里再强调一次:当消费 Kafka 消息时,如果没有明确指定偏移量(offset):
如果是第一次消费该主题的消息,消费者将从最早的可用偏移量开始消费。这意味着它将从主题的起始位置开始,逐条消费所有消息。
如果之前已经消费过该主题的消息,并且有提交过消费偏移量,则消费者将从上次提交的偏移量+1的位置开始消费。这将使其继续在上次离开的地方继续消费。
如果既没有记录过消费偏移量,也无法找到主题的起始偏移量,消费者可以根据配置使用以下默认策略:
- 对于新的消费组(group),从最早的可用偏移量开始消费。
- 对于已存在的消费组,从最新的可用偏移量开始消费。
需要注意的是,以上行为可能受到消费者组(consumer group)以及 Kafka 配置的影响。具体的行为和偏移量控制也可以通过代码进行自定义配置。
因此,在指定消费topic的偏移量时,必须要指定分区!
kafka-console-consumer.sh --bootstrap-server node141:9092 --topic abcx --offset 2 --partition 0
偏移量是定期提交的,默认是5秒,默认保留7天,每一个消费者组记录的偏移量都会记录在__consumer_offsets主题中的一个固定分区中,__consumer_offsets默认50个分区,可以在server.properties文件中可以配置。
消费者组的消费位移发往__consumer_offsets的哪一个分区呢?
(groupId的hashCode % 50) 的值 就是消费位移发往的分区号
消费者不是把自己所负责的分区的消息位移记录在它的本地磁盘,在0.11.x之前,消费者将消费到的位置(消费位移)记录在zookeeper上,但是,0.11.x之后是记录在kafka内部的一个topic中——即__consumer_offsets,里面是二进制形式。
bin/kafka-console-consumer.sh --bootstrap-server node141:9092 --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" [d30,abcx,2]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159239114, expireTimestamp=None) [d30,abcx,0]::OffsetAndMetadata(offset=12, leaderEpoch=Optional[16], metadata=, commitTimestamp=1694159239114, expireTimestamp=None) [d30,abcx,1]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159239114, expireTimestamp=None) [d30,abcx,2]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159244118, expireTimestamp=None) [d30,abcx,0]::OffsetAndMetadata(offset=12, leaderEpoch=Optional[16], metadata=, commitTimestamp=1694159244118, expireTimestamp=None) [d30,abcx,1]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159244118, expireTimestamp=None) [d30,abcx,2]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159249121, expireTimestamp=None)
consumer去记录偏移量的时候,不是读到一批数据就记录一次,也不是记录一次后再去读数据,而是周期性地、定期地去提交当前的位移。
6.消费者组
如果topic中数据量太大,但是需要多个并行处理任务去消费topic中的数据,就需要消费者组。
消费者组最小分配单位是partition,同一个partition一定只会被组内某一个消费者来负责读取。
在kafka底层逻辑中,任何消费者都有自己所属的组。
组和组之间没有任何关系,都可以消费到目标topic的所有数据。但是组内的各个消费者,就只能读到自己所分配到的partitions。
要把多个消费者组成一个组: 就是让这些消费者的 groupId一致即可。
# 开启两个消费者,让两个消费者的组id一致 [root@node141 ~]# kafka-console-consumer.sh --bootstrap-server node141:9092 --topic abcx --from-beginning --group d30 [root@node141 ~]# kafka-console-consumer.sh --bootstrap-server node141:9092 --topic abcx --from-beginning --group d30
kafka中的消费组,可以动态增减消费者而且消费组中的消费者数量发生任意变动,都会重新分配分区消费任务。
关于重复消费:消费者已经读到了一条数据,但是没来得及去更新消费唯一,崩溃了,然后重启该消费者,就会出现重复消费。如果发生上述场景,重复的数据往往不止一条,因为一次是去读一批,然后更新偏移量。
kafka的消费者读取数据是消费者主动向broker主动拉取,而不是broker服务器向消费者推送。
kafka的消费者,记录新的消费位移,不是去修改上一次所记录的位移,而是追加新记录。kafka之所以将自己的数据存储目录,称作:log目录,是因为,它底层存储数据的特性,类似于“日志”:数据只能不断追加,不能修改。偏移量也有保存时长的,否则会把磁盘撑爆。kafka中的topic中的数据只能追加,不能条件删除,只能对数据做截断。
消费者组在启动消费的时候,是可以显式地指定其实偏移量的,也就是说,可以忽略掉之前所记录的偏移量,并不一定非要取不同的消费者组名。
7.配置管理 kafka-config
kafka-configs.sh 脚本是专门用来进行动态参数配置操作的,这里的操作是运行状态修改原有的配置, 如此可以达到动态变更的目的; 动态配置的参数,会被存储在 zookeeper 上,因而是持久生效的。会逐渐取代二、1.(3)修改topic部分介绍的命令。
可用参数的查阅地址: https://kafka.apache.org/documentation/#configuration
8.ISR队列
《KAFKA中的ISR是什么》