Kafka单机版安装与基本命令

目录

一、Kafka单机版安装

1.解压&改名

2.修改配置文件

3.配置环境变量

4.启动kafka之前一定要先启动zookeeper

5.kafka一键启停脚本

二、Kafka基本命令(重要)

1.topic常用命令

(1)查看topic

(2)创建topic

(3)修改topic

(4)删除topic

2.生产者

3.消费者

4.为什么kafka的bootstrap后面写几台机器都可以

5.消费者偏移量

6.消费者组

7.配置管理 kafka-config

8.ISR队列


一、Kafka单机版安装

1.解压&改名

[root@lxm147 ~]# tar -zxf /opt/install/kafka_2.12-2.8.0.tgz -C /opt/soft/
[root@lxm147 ~]# mv /opt/soft/kafka_2.12-2.8.0 /opt/soft/kafka212

2.修改配置文件

[root@lxm147 ~]# cd /opt/soft/kafka212/config/
[root@lxm147 ~]# vim ./server.properties
21 broker.id=0
36 advertised.listeners=PLAINTEXT://192.168.180.147:9092
60 log.dirs=/opt/soft/kafka212/data    消息存放目录
103 log.retention.hours=1680           消息存放时间:小时
123 zookeeper.connect=192.168.180.147:2181  连接zookeeper
137 delete.topic.enable=true      设置可以对topic删除,默认不能删除
[root@lxm147 ~]# mkdir /opt/soft/kafka212/data
[root@lxm147 ~]# echo "0">/opt/soft/kafka212/data/myid

3.配置环境变量

[root@lxm147 config]# vim /etc/profile
# KAFKA_HOEM
export KAFKA_HOME=/opt/soft/kafka212
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile

4.启动kafka之前一定要先启动zookeeper

# 启动zookeeper
[root@lxm147 config]# zkServer.sh start
[root@lxm147 config]# zkServer.sh status # 这条命令可以查看zookeeper是否成功启动
# 启动kafka的3种方式:
kafka-server-start.sh /opt/soft/kafka212/config/server.properties 
kafka-server-start.sh -daemon /opt/soft/kafka212/config/server.properties
nohup kafka-server-start.sh /opt/soft/kafka212/config/server.properties &  #后台启动
# 查看启动进程:
[root@lxm147 config]# jps
2177 QuorumPeerMain
2247 Kafka

注意:集群版的安装与单机版类似

1.配置文件/opt/soft/kafka212/config/server.properties下的broker节点数不能一样

2.记得将配置文件传输到每一个节点

3.每一个节点的环境变量也要进行配置和刷新

4.可以将kafka的命令配置为全局可调用,具体操作方法可以去网上查找

5.集群版将环境变量写在/etc/profile.d/my_env.sh文件中,否则可能会启动Kafka集群失败

5.kafka一键启停脚本

#!/bin/bash
case $1 in
"start"){
 for i in node141 node142 node143
 do
 echo " --------启动 $i Kafka-------"
 ssh $i "/opt/soft/kafka/bin/kafka-server-start.sh -daemon /opt/soft/kafka/config/server.properties"
 done
};;
"stop"){
 for i in node141 node142 node143
 do
 echo " --------停止 $i Kafka-------"
 ssh $i "/opt/soft/kafka/bin/kafka-server-stop.sh"
 done
};;
esac

二、Kafka基本命令(重要)

1.topic常用命令

(1)查看topic
# 查看消息队列
kafka-topics.sh --zookeeper lxm147:2181 --list
# 查看队列详情
kafka-topics.sh --zookeeper lxm147:2181 --describe --topic bigdata
# 查询指定队列消息数量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic bigdata
(2)创建topic
# 创建topic并指定分区数
kafka-topics.sh --zookeeper lxm147:2181 --create --topic bigdata --partitions 1 --replication-factor 1
​
# 创建topic时手动指定分区的存放位置
kafka-topics.sh --zookeeper node141:2181 --create --topic tpc-1 --replica-assignment 0:1:3,1:2:6
该 topic,将有如下 partition:
partition0 ,所在节点: broker0(leader)、broker1、broker3
partition1 ,所在节点: broker1(leader)、broker2、broker6
kafka-topics.sh --zookeeper node141:2181 --create --topic abcy --replica-assignment 0:1,2:0
kafka-topics.sh --zookeeper node141:2181 --describe --topic abcy
Topic: abcy	PartitionCount: 2	ReplicationFactor: 2	Configs: 
	Topic: abcy	Partition: 0	Leader: 0	Replicas: 0,1	Isr: 0,1
	Topic: abcy	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0
(3)修改topic
# 增加分区
kafka-topics.sh --zookeeper node141:2181 --alter --topic abcx --partitions 3
kafka-topics.sh --zookeeper node141:2181 --describe --topic abcx
Topic: abcx	PartitionCount: 3	ReplicationFactor: 2	Configs: 
	Topic: abcx	Partition: 0	Leader: 0	Replicas: 0,1	Isr: 0,1
	Topic: abcx	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0
	Topic: abcx	Partition: 2	Leader: 2	Replicas: 2,0	Isr: 2,0
# 修改配置flush.ms=100
kafka-topics.sh --zookeeper node141:2181 --alter --topic abcx --config flush.ms=100
kafka-topics.sh --zookeeper node141:2181 --describe --topic abcx
Topic: abcx	PartitionCount: 3	ReplicationFactor: 2	Configs: flush.ms=100
	Topic: abcx	Partition: 0	Leader: 0	Replicas: 0,1	Isr: 0,1
	Topic: abcx	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0
	Topic: abcx	Partition: 2	Leader: 2	Replicas: 2,0	Isr: 2,0

其他修改配置方法参见kafka官方文档

(4)删除topic
# 删除topic
kafka-topics.sh --zookeeper lxm147:2181 --delete --topic bigdata

2.生产者

# 生产消息
[root@node141 ~]# kafka-console-producer.sh --topic abcx --broker-list node141:9092
>hello,world
>hello,java      
>

3.消费者

当消费者去消费一个不存在的主题时,消费者可以自动创建该主题。但是自动创建的topic的分区数和副本数都默认为1。因此topic尽量不要自动创建,提前手动创建好。

# 消费消息时,注意指定偏移量 --from-beginning 从最头开始消费
[root@node141 ~]# kafka-console-consumer.sh --bootstrap-server node141:9092 --from-beginning --topic abcx
hello,java
hello,world

当消费 Kafka 消息时,如果没有明确指定偏移量(offset):对于新的消费者组,从最新的消息开始消费;对于已存在的消费者组,从上次提交的偏移量处继续消费。

需要注意的是,消费者的起始偏移量可以通过配置参数进行自定义,例如可以通过设置 auto.offset.reset 参数来修改消费者组的默认偏移量策略。此外,还可以使用 Kafka 提供的API来手动控制消费者的偏移量,以实现更精确的消费行为。

消费者在消费数据时,是先把一个分区的数据消费完后,再去消费下一个分区。

生产者写数据,只能写给分区的leader副本;消费者读数据,也只能从分区的leader副本来读。

4.为什么kafka的bootstrap后面写几台机器都可以

        Kafka的bootstrap servers配置是用来初始化Kafka客户端的。在配置中,您可以指定一个或多个Kafka broker的地址。这些地址用于客户端在启动时与集群建立初始连接。一旦客户端与集群建立连接,它就可以从集群中的任何broker接收元数据,包括其他broker的地址。

        因此,可以在bootstrap servers配置中指定多台机器,这样客户端在启动时可以有多个连接选项。这是有用的,因为如果一个broker不可用,客户端可以使用其他可用的broker。这增加了客户端的容错能力和可靠性。

        在实际使用中,通常建议将所有broker的地址都添加到bootstrap servers配置中,这样客户端可以均匀地分散连接到集群中的各个broker,从而实现负载均衡。

5.消费者偏移量

        消费者在消费的时候,需要指定要订阅的主题,还可以指定消费的起始偏移量,起始偏移量的指定策略有 4 种:

(1)earliest:从最早的消息开始消费

(2)latest:从最新的消息开始消费

(3)指定的 offset( 分区号:偏移量):从指定的位置开始消费

(4)从之前所记录的偏移量开始消费

        kafka 的 topic 中的消息,是有序号的(序号叫消息偏移量),而且消息的偏移量是在各个 partition 中独立维护的,在各个分区内,都是从 0 开始递增编号。消息(数据)的offset在topich中并不会有全局的递增号。

        这里再强调一次:当消费 Kafka 消息时,如果没有明确指定偏移量(offset):

  1. 如果是第一次消费该主题的消息,消费者将从最早的可用偏移量开始消费。这意味着它将从主题的起始位置开始,逐条消费所有消息。

  2. 如果之前已经消费过该主题的消息,并且有提交过消费偏移量,则消费者将从上次提交的偏移量+1的位置开始消费。这将使其继续在上次离开的地方继续消费。

  3. 如果既没有记录过消费偏移量,也无法找到主题的起始偏移量,消费者可以根据配置使用以下默认策略:

    • 对于新的消费组(group),从最早的可用偏移量开始消费。
    • 对于已存在的消费组,从最新的可用偏移量开始消费。

需要注意的是,以上行为可能受到消费者组(consumer group)以及 Kafka 配置的影响。具体的行为和偏移量控制也可以通过代码进行自定义配置。

因此,在指定消费topic的偏移量时,必须要指定分区!

kafka-console-consumer.sh --bootstrap-server node141:9092 --topic abcx --offset 2 --partition 0

偏移量是定期提交的,默认是5秒,默认保留7天,每一个消费者组记录的偏移量都会记录在__consumer_offsets主题中的一个固定分区中,__consumer_offsets默认50个分区,可以在server.properties文件中可以配置。

消费者组的消费位移发往__consumer_offsets的哪一个分区呢?

(groupId的hashCode % 50) 的值 就是消费位移发往的分区号

消费者不是把自己所负责的分区的消息位移记录在它的本地磁盘,在0.11.x之前,消费者将消费到的位置(消费位移)记录在zookeeper上,但是,0.11.x之后是记录在kafka内部的一个topic中——即__consumer_offsets,里面是二进制形式。

bin/kafka-console-consumer.sh --bootstrap-server node141:9092 --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
[d30,abcx,2]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159239114, expireTimestamp=None)
[d30,abcx,0]::OffsetAndMetadata(offset=12, leaderEpoch=Optional[16], metadata=, commitTimestamp=1694159239114, expireTimestamp=None)
[d30,abcx,1]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159239114, expireTimestamp=None)
[d30,abcx,2]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159244118, expireTimestamp=None)
[d30,abcx,0]::OffsetAndMetadata(offset=12, leaderEpoch=Optional[16], metadata=, commitTimestamp=1694159244118, expireTimestamp=None)
[d30,abcx,1]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159244118, expireTimestamp=None)
[d30,abcx,2]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[30], metadata=, commitTimestamp=1694159249121, expireTimestamp=None)

consumer去记录偏移量的时候,不是读到一批数据就记录一次,也不是记录一次后再去读数据,而是周期性地、定期地去提交当前的位移。

 

6.消费者组

如果topic中数据量太大,但是需要多个并行处理任务去消费topic中的数据,就需要消费者组。

消费者组最小分配单位是partition,同一个partition一定只会被组内某一个消费者来负责读取。

在kafka底层逻辑中,任何消费者都有自己所属的组。

组和组之间没有任何关系,都可以消费到目标topic的所有数据。但是组内的各个消费者,就只能读到自己所分配到的partitions。

要把多个消费者组成一个组: 就是让这些消费者的 groupId一致即可。

# 开启两个消费者,让两个消费者的组id一致
[root@node141 ~]# kafka-console-consumer.sh --bootstrap-server node141:9092 --topic abcx --from-beginning --group d30
[root@node141 ~]# kafka-console-consumer.sh --bootstrap-server node141:9092 --topic abcx --from-beginning --group d30

kafka中的消费组,可以动态增减消费者而且消费组中的消费者数量发生任意变动,都会重新分配分区消费任务。

关于重复消费:消费者已经读到了一条数据,但是没来得及去更新消费唯一,崩溃了,然后重启该消费者,就会出现重复消费。如果发生上述场景,重复的数据往往不止一条,因为一次是去读一批,然后更新偏移量。

kafka的消费者读取数据是消费者主动向broker主动拉取,而不是broker服务器向消费者推送。

kafka的消费者,记录新的消费位移,不是去修改上一次所记录的位移,而是追加新记录。kafka之所以将自己的数据存储目录,称作:log目录,是因为,它底层存储数据的特性,类似于“日志”:数据只能不断追加,不能修改。偏移量也有保存时长的,否则会把磁盘撑爆。kafka中的topic中的数据只能追加,不能条件删除,只能对数据做截断。

消费者组在启动消费的时候,是可以显式地指定其实偏移量的,也就是说,可以忽略掉之前所记录的偏移量,并不一定非要取不同的消费者组名。

7.配置管理 kafka-config

kafka-configs.sh 脚本是专门用来进行动态参数配置操作的,这里的操作是运行状态修改原有的配置, 如此可以达到动态变更的目的; 动态配置的参数,会被存储在 zookeeper 上,因而是持久生效的。会逐渐取代二、1.(3)修改topic部分介绍的命令。

可用参数的查阅地址: https://kafka.apache.org/documentation/#configuration

8.ISR队列

《KAFKA中的ISR是什么》