Kafka详细教程(一)

总体目录

1、什么是消息队列

消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列 。来看一下下面的代码

 // 1.创建一个保存字符串的队列
        Queue queue = new LinkedList<>();
        // 2. 往消息队列中放入消息
        queue.offer("hello");
        // 3. 从消息队列中取出消息把那个打印
        System.out.println(queue.poll());

上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的

消息队列指的就是将数据放置到一个队列中, 从队列一端进入, 然后从另一端流出的过程

2、消息队列的应用场景

主要四个场景:

  • 应用耦合

    多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;

  • 异步处理

    多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间

  • 限流削峰

    广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;

  • 消息驱动的系统

    系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理 下面详细介绍上述四个场景以及消息队列如何在上述四个场景中使用

    异步处理

    「具体场景」:用户为了使用某个应用,进行注册,系统需要发送注册邮件并验证短信。对这两个操作的处理方式有两种:串行及并行。

    1 ) 串行方式: 新注册信息生成后 , 先发送注册邮件, 再发送验证短信 注意 : 在这种方式下,需要最终发送验证短信后再返回给客户端

    2) 并行处理:新注册信息写入后,由发短信和发邮件并行处理

    「注意」: 在这种方式下,发短信和发邮件 需处理完成后再返回给客户端。

    应用耦合

    「具体场景:」 用户使用 QQ 相册上传一张图片,人脸识别系统会对该图片进行人脸识别,一般的做法是,服务器接收到图片后,图片上传系统立即调用人脸识别系统,调用完成后再返回成功,如下图所示: 如果引入消息队列 , 在来看整体的执行效率

    该方法有如下缺点:

    • 人脸识别系统被调失败,导致图片上传失败;
    • 延迟高,需要人脸识别系统处理完成后,再返回给客户端,即使用户并不需要立即知道结果;
    • 图片上传系统与人脸识别系统之间互相调用,需要做耦合;若使用消息队列:

      此时图片上传系统并不需要关心人脸识别系统是否对这些图片信息的处理、以及何时对这些图片信息进行处理。事实上,由于用户并不需要立即知道人脸识别结果,人脸识别系统可以选择不同的调度策略,按照闲时、忙时、正常时 间,对队列中的图片信息进行处理。

      限流削峰

      「具体场景:」 购物网站开展秒杀活动,一般由于瞬时访问量过大,服务器接收过大,会导致流量暴增,相关系统无法处理请求甚至崩溃。而加入消息队列后,系统可以从消息队列中取数据,相当于消息队列做了一次缓冲。

      该方法有如下优点:请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲 , 极大地减少了业务处理系统的压力;队列长度可以做限制,事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息;

      消息驱动系统

      「具体场景:」 用户新上传了一批照片, 人脸识别系统需要对这个用户的所有照片进行聚类,聚类完成后由对账系统重新生成用 户的人脸索引( 加快查询 ) 。这三个子系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,后一个阶段从队列中获取消息继续处理。

      该方法有如下优点:避免了直接调用下一个系统导致当前系统失败;每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间 段按不同处理速度处理;

      **问题:**如果一个业务被拆分成多个消息执行,万一有个过程,执行失败,怎么回滚是一个大的问题

      3、消息队列的两种方式

      点对点模式

      点对点模式下包括三个角色

      • 消息队列
      • 发送者 (生产者)
      • 接收者(消费者)

        消息发送者生产消息发送到 queue 中,然后消息接收者从 queue 中取出并且消费消息。消息被消费以后, queue 中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

        「点对点模式特点:」

        • 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);
        • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
        • 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

          发布/订阅模式

          发布 / 订阅模式下包括三个角色:

          • 角色主题(Topic)
          • 发布者(Publisher)
          • 订阅者(Subscriber)

            发布者将消息发送到 Topic, 系统将这些消息传递给多个订阅者。发布 / 订阅模式特点:

            • 每个消息可以有多个订阅者;
            • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
            • 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

              4、常见的消息队列的产品

              1、RabbitMQ RabbitMQ 2007 年发布,是一个在 AMQP ( 高级消息队列协议 ) 基础上完成的,可复用的企业消息系统,是当前最主 流的消息中间件之一。

              2、ActiveMQ: ActiveMQ 是由 Apache 出品, ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。它非常快速 ,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能, 目前市场的活跃 度比较低, 在 java 领域正在被 RabbitMQ 替代

              3、RocketMQ RocketMQ 出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka ,并做出了自己的一些改进,消息可靠性上比 Kafka 更好。RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理 等

              4、kafka Apache Kafka 是一个分布式消息发布订阅系统。它最初由 LinkedIn 公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log) ,之后成为 Apache 项目的一部分。Kafka 系统快速、可扩展并且可持久化。它的分区特性,可复制和可容错都是其不错的特性。

              5、Kafka的基本介绍

              官网:「http://kafka.apache.org/」 kafka 是最初由 linkedin 公司开发的,使用 scala 语言编写, kafka 是一个分布式,分区的,多副本的,多订阅者的日 志系统(分布式MQ 系统),可以用于搜索日志,监控日志,访问日志等 Kafka is a distributed,partitioned,replicated commit logservice 。它提供了类似于 JMS 的特性,但是在设计实现上完全不同,此外它并不是JMS 规范的完整实现。kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer, 消息 接受者成为Consumer, 此外 kafka 集群有多个 kafka 实例组成,每个实例 (server) 成为 broker 。无论是 kafka 集群,还是producer和 consumer 都依赖于 zookeeper 来保证系统可用性集群保存一些 meta 信息

              「kakfa的特点:」

              • 可靠性: 分布式, 分区 , 复制 和容错等
              • 可扩展性: kakfa消息传递系统轻松缩放, 无需停机
              • 耐用性: kafka使用分布式提交日志, 这个意味着消息会尽可能快速的保存在磁盘上, 因此它是持久的
              • 性能: kafka对于发布和订阅消息都具有高吞吐量, 即使存储了许多TB的消息, 他也爆出稳定的性能-kafka非常快: 保证零停机和零数据丢失

                apache kafka 是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将消息从一个 端点传递到另一个端点,kafka 适合离线和在线消息消费。kafka 消息保留在磁盘上,并在集群内复制以防止数据丢失。kafka构建在 zookeeper 同步服务之上。它与 apache 和 spark 非常好的集成,应用于实时流式数据分析。

                「kafka的主要应用场景:」

                • 指标分析 : kafka 通常用于操作监控数据 , 这设计聚合来自分布式应用程序和统计信息 , 以产生操作的数据集中反馈
                • 日志聚合解决方法 : kafka 可用于跨组织从多个服务器收集日志 , 并使他们一标准的合适提供给多个服务器
                • 流式处理 : 流式的处理框架 (spark, storm , flink) 从主题中读取数据 , 对其进行处理 , 并将处理后的结果数据写入新的主题, 供用户和应用程序使用 , kafka 的强耐久性在流处理的上下文中也非常的有用

                  「版本说明:」Kafka版本为2.4.1,是2020年3月12日发布的版本。可以注意到Kafka的版本号为:kafka_2.12-2.4.1,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。

                  6、Kafka特点总结

                  • kafka是大数据中一款消息队列的中间件产品, 最早是有领英开发的, 后期将其贡献给了apache 成为apache的顶级项目
                  • kafka是采用Scala语言编写 kafka并不是对JMS规范完整实现 仅实现一部分 , kafka集群依赖于zookeeper
                  • kafka可以对接离线业务或者实时业务, 可以很好的和apache其他的软件进行集成, 可以做流式数据分析(实时分

                    「小结:」

                    • 高可靠性 : 数据不容易丢失, 数据分布式存储, 集群某个节点宕机也不会影响
                    • 高可扩展性 : 动态的进行添加或者减少集群的节点
                    • 高耐用性 : 数据持久化的磁盘上
                    • 高性能 : 数据具有高吞吐量
                    • 非常快: 零停机和零数据丢失 (存在重复消费问题)

                      7、Kafka架构

                      「专业术语」

                      (1)Producer:消息生产者,就是向 Kafka broker 发消息的客户端。

                      (2)Consumer:消息消费者,向 Kafka broker 取消息的客户端。

                      (3)Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消

                      费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不

                      影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

                      (4)Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个

                      broker 可以容纳多个 topic。

                      (5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。

                      (6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服

                      务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。

                      (7)Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个

                      Follower。

                      (8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数

                      据的对象都是 Leader。

                      (9)Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和

                      Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

                      1、为方便扩展,并提高吞吐量,一个topic分为多个partition

                      2、配合分区的设计,提出消费者组的概念,组内每个消费者并行消费

                      3、为提高可用性,为每个partition增加若干副本,类似NameNode HA

                      4、ZK中记录谁是leader,Kafka2.8.0以后也可以配置不采用ZK

                      8、搭建kafka集群

                      集群规划

                      集群部署

                      0)官方下载地址:http://kafka.apache.org/downloads.html

                      1)解压安装包

                       tar -zxvf kafka_2.12-3.0.0.tgz -C 
                      /opt/module/
                      

                      2)修改解压后的文件名称

                      mv kafka_2.12-3.0.0/ kafka

                      3)进入到/opt/module/kafka 目录,修改配置文件

                      cd config/
                      vim server.properties
                      

                      输入以下内容:

                      #broker 的全局唯一编号,不能重复,只能是数字。
                      broker.id=0
                      #处理网络请求的线程数量
                      num.network.threads=3
                      #用来处理磁盘 IO 的线程数量
                      num.io.threads=8
                      #发送套接字的缓冲区大小
                      socket.send.buffer.bytes=102400
                      #接收套接字的缓冲区大小
                      socket.receive.buffer.bytes=102400
                      #请求套接字的缓冲区大小
                      socket.request.max.bytes=104857600
                      #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
                      配置多个磁盘路径,路径与路径之间可以用","分隔
                      log.dirs=/opt/module/kafka/datas
                      #topic 在当前 broker 上的分区个数
                      num.partitions=1
                      #用来恢复和清理 data 下数据的线程数量
                      num.recovery.threads.per.data.dir=1
                      # 每个 topic 创建时的副本数,默认时 1 个副本
                      offsets.topic.replication.factor=1
                      #segment 文件保留的最长时间,超时将被删除
                      log.retention.hours=168
                      #每个 segment 文件的大小,默认最大 1G
                      log.segment.bytes=1073741824
                      # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
                      log.retention.check.interval.ms=300000
                      #配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
                      zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/ka
                      fka
                      

                      4)分发安装包

                      xsync kafka/

                      5)分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties

                      中的 broker.id=1、broker.id=2

                      注:broker.id 不得重复,整个集群中唯一。

                      vim kafka/config/server.properties

                      修改:

                      kafka/config/server.properties
                      修改:
                      # The id of the broker. This must be set to a unique integer for 
                      each broker.
                      broker.id=1
                      vim kafka/config/server.properties
                      修改:
                      # The id of the broker. This must be set to a unique integer for 
                      each broker.
                      broker.id=2
                      

                      6)配置环境变量

                      (1)在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置

                      sudo vim /etc/profile.d/my_env.sh
                      增加如下内容:
                      #KAFKA_HOME
                      export KAFKA_HOME=/opt/module/kafka
                      export PATH=$PATH:$KAFKA_HOME/bin
                      

                      (2)刷新一下环境变量。

                      source /etc/profile
                      

                      (3)分发环境变量文件到其他节点,并 source。

                      sudo /home/atguigu/bin/xsync 
                      /etc/profile.d/my_env.sh
                       source /etc/profile
                       source /etc/profile
                      

                      7)启动集群

                      (1)先启动 Zookeeper 集群,然后启动 Kafka。

                      zk.sh start 

                      (2)依次在 hadoop102、hadoop103、hadoop104 节点上启动 Kafka。

                       bin/kafka-server-start.sh -daemon config/server.properties
                      bin/kafka-server-start.sh -daemon config/server.properties
                       bin/kafka-server-start.sh -daemon config/server.properties
                      注意:配置文件的路径要能够到 server.properties。
                      

                      8)关闭集群

                      bin/kafka-server-stop.sh

                      9、目录结构

                      10、Kafka一键启动/关闭脚本

                      为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。

                      1、在节点1中创建 /export/onekey 目录

                      cd /export/onekey
                      

                      2、准备slave配置文件,用于保存要启动哪几个节点上的kafka

                      node1
                      node2
                      node3
                      

                      3、编写start-kafka.sh脚本

                      vim start-kafka.sh
                       
                      cat /export/onekey/slave | while read line
                       
                      do
                       
                      { echo $line
                       
                       ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
                       
                      }&
                       
                      wait
                       
                      done
                      4、编写stop-kafka.sh脚本
                      vim stop-kafka.sh
                       
                      cat /export/onekey/slave | while read line
                       
                      do
                       
                      { echo $line
                       
                       ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
                       
                      }&
                       
                      wait
                       
                      done
                      

                      5、给start-kafka.sh、stop-kafka.sh配置执行权限

                      chmod u+x start-kafka.sh
                       
                      chmod u+x stop-kafka.sh
                      

                      6、执行一键启动、一键关闭

                      ./start-kafka.sh
                       
                      ./stop-kafka.sh
                      

                      11、Kafka的shell命令使用

                      1、 创建topic

                      创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。

                      # 创建名为test的主题
                      bin/kafka-topics.sh --create --bootstrap-server node1:9092 --topic test
                      # 查看目前Kafka中的主题
                      bin/kafka-topics.sh --list --bootstrap-server node1:9092
                      

                      2、生产消息到kafka

                      使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。

                      bin/kafka-console-producer.sh --broker-list node1:9092 --topic test
                      

                      3、从kafka中消费消息

                      使用下面的命令来消费 test 主题中的消息。

                      bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test
                      

                      4、查看主题的命令

                      查看 kafka 当中存在的主题

                      bin/kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181
                      

                      5、运行describe的命令

                      运行 describe 查看 topic 的相关详细信息

                      [root@node01 bin]# ./kafka-topics.sh --describe --zookeeper node01:2181 --topic demo
                      Topic:demo      PartitionCount:3        ReplicationFactor:1     Configs:
                              Topic: demo     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
                              Topic: demo     Partition: 1    Leader: 1       Replicas: 1     Isr: 1
                              Topic: demo     Partition: 2    Leader: 2       Replicas: 2     Isr: 2
                      

                      6、 增加topic分区数

                      任意 kafka 服务器执行以下命令可以增加 topic 分区数

                      bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
                      

                      7、删除topic

                      目前删除 topic 在默认情况下知识打上一个删除的标记,在重新启动 kafka 后才删除。如果需要立即删除,则需要 在server.properties 中配置:

                       delete.topic.enable=true
                      

                      delete.topic.enable=true

                      bin/kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
                      

                      12、Kafka 生产者

                      1、生产者消息发送流程

                      在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程

                      中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给RecordAccumulator,

                      Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker

                      2、 生产者重要参数列表

                      3、异步发送API

                      3.1、普通异步发送

                      需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker

                      导入依赖

                        org.apache.kafka kafka-clients 3.0.0 

                      生产者代码

                      public class CustomProducer { public static void main(String[] args) throws InterruptedException { // 1. 创建 kafka 生产者的配置对象
                       Properties properties = new Properties();
                       // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
                       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
                       // key,value 序列化(必须):key.serializer,value.serializer
                      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
                      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
                       // 3. 创建 kafka 生产者对象
                       KafkaProducer kafkaProducer = new 
                      KafkaProducer(properties);
                       // 4. 调用 send 方法,发送消息
                       for (int i = 0; i < 5; i++) { kafkaProducer.send(new 
                      ProducerRecord<>("first","atguigu " + i));
                       }
                       // 5. 关闭资源
                       kafkaProducer.close();
                       }
                      }
                      

                      消费消息

                      bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
                      

                      3.2、回调函数的异步发送

                      回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败

                      public class CustomProducerCallback { public static void main(String[] args) throws InterruptedException { // 1. 创建 kafka 生产者的配置对象
                       Properties properties = new Properties();
                       // 2. 给 kafka 配置对象添加配置信息
                       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
                       // key,value 序列化(必须):key.serializer,value.serializer
                       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                       // 3. 创建 kafka 生产者对象
                       KafkaProducer kafkaProducer = new KafkaProducer(properties);
                       // 4. 调用 send 方法,发送消息
                       for (int i = 0; i < 5; i++) { // 添加回调
                       kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() { // 该方法在 Producer 收到 ack 时调用,为异步调用
                       @Override
                       public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { // 没有异常,输出信息到控制台
                       System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition());
                       } else { // 出现异常打印
                       exception.printStackTrace();
                       }
                       }
                       });
                       // 延迟一会会看到数据发往不同分区
                       Thread.sleep(2);
                       }
                       // 5. 关闭资源
                       kafkaProducer.close();
                       }
                      }
                      

                      4、同步发送API

                      只需在异步发送的基础上,再调用一下 get()方法即可。

                      public class CustomProducerSync { public static void main(String[] args) throws InterruptedException, ExecutionException { // 1. 创建 kafka 生产者的配置对象
                       Properties properties = new Properties();
                       // 2. 给 kafka 配置对象添加配置信息
                      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
                       // key,value 序列化(必须):key.serializer,value.serializer
                       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                       // 3. 创建 kafka 生产者对象
                       KafkaProducer kafkaProducer = new KafkaProducer(properties);
                       // 4. 调用 send 方法,发送消息
                       for (int i = 0; i < 10; i++) { // 异步发送 默认
                      // kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
                       // 同步发送
                       kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
                       }
                       // 5. 关闭资源
                       kafkaProducer.close();
                       }
                      }
                      

                      5、生产者消息分区

                      默认的分区器 DefaultPartitioner

                      代码示例

                      将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中

                      public class CustomProducerCallbackPartitions { public static void main(String[] args) { // 1. 创建 kafka 生产者的配置对象
                       Properties properties = new Properties();
                       // 2. 给 kafka 配置对象添加配置信息
                       
                      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
                       // key,value 序列化(必须):key.serializer,value.serializer
                       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                       
                      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                       KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
                       for (int i = 0; i < 5; i++) { // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
                       kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i), new Callback() { @Override
                       public void onCompletion(RecordMetadata metadata, Exception e) { if (e == null){ System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition()
                       );
                       }else { e.printStackTrace();
                       }
                       }
                       });
                       }
                       kafkaProducer.close();
                       }
                      }
                      

                      6、生产经验——生产者如何提高吞吐量

                      代码

                      public class CustomProducerParameters { public static void main(String[] args) throws InterruptedException { // 1. 创建 kafka 生产者的配置对象
                       Properties properties = new Properties();
                       // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
                       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
                       
                       // key,value 序列化(必须):key.serializer,value.serializer
                      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
                      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
                       // batch.size:批次大小,默认 16K
                       properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
                       // linger.ms:等待时间,默认 0
                       properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
                       // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
                       properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
                       // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
                      properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
                       // 3. 创建 kafka 生产者对象
                       KafkaProducer kafkaProducer = new KafkaProducer(properties);
                       // 4. 调用 send 方法,发送消息
                       for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
                       }
                       // 5. 关闭资源
                       kafkaProducer.close();
                       }
                      }
                      

                      开启消费者,可以看到消息被消费

                      bin/kafka-console-consumer.sh --ootstrap-server hadoop102:9092 --topic first
                      atguigu 0
                      atguigu 1
                      atguigu 2
                      atguigu 3
                      atguigu 4
                      

                      7、生产经验——消息数据可靠性

                      7.1、消息发送原理

                      7.2、ack应答原理

                      代码示例

                      public class CustomProducerAck { public static void main(String[] args) throws 
                      InterruptedException { // 1. 创建 kafka 生产者的配置对象
                       Properties properties = new Properties();
                       // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
                       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                      "hadoop102:9092");
                       
                       // key,value 序列化(必须):key.serializer,value.serializer
                       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      StringSerializer.class.getName());
                       
                      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                      StringSerializer.class.getName());
                       // 设置 acks
                       properties.put(ProducerConfig.ACKS_CONFIG, "all");
                       // 重试次数 retries,默认是 int 最大值,2147483647
                       properties.put(ProducerConfig.RETRIES_CONFIG, 3);
                       // 3. 创建 kafka 生产者对象
                       KafkaProducer kafkaProducer = new 
                      KafkaProducer(properties);
                       // 4. 调用 send 方法,发送消息
                       for (int i = 0; i < 5; i++) { kafkaProducer.send(new 
                      ProducerRecord<>("first","atguigu " + i));
                       }
                       // 5. 关闭资源
                       kafkaProducer.close();
                       }
                      }
                      

                      8、生产经验——数据去重

                      8.1、数据传递语义

                      8.2、幂等性

                      如何使用幂等

                      开启参数 enable.idempotence 默认为 true,false 关闭。

                      8.3、Kafka事务

                      Kafka 的事务一共有如下 5 个 API

                      // 1 初始化事务
                      void initTransactions();
                      // 2 开启事务
                      void beginTransaction() throws ProducerFencedException;
                      // 3 在事务内提交已经消费的偏移量(主要用于消费者)
                      void sendOffsetsToTransaction(Map offsets,String consumerGroupId) throws ProducerFencedException;
                      // 4 提交事务
                      void commitTransaction() throws ProducerFencedException;
                      // 5 放弃事务(类似于回滚事务的操作)
                      void abortTransaction() throws ProducerFencedException;
                      

                      单个 Producer,使用事务保证消息的仅一次发送

                      public class CustomProducerTransactions { public static void main(String[] args) throws 
                      InterruptedException { // 1. 创建 kafka 生产者的配置对象
                       Properties properties = new Properties();
                       // 2. 给 kafka 配置对象添加配置信息
                       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
                       // key,value 序列化
                       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
                      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                       // 设置事务 id(必须),事务 id 任意起名
                       properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
                       // 3. 创建 kafka 生产者对象
                       KafkaProducer kafkaProducer = new KafkaProducer(properties);
                       // 初始化事务
                       kafkaProducer.initTransactions();
                       // 开启事务
                       kafkaProducer.beginTransaction();
                       try { // 4. 调用 send 方法,发送消息
                       for (int i = 0; i < 5; i++) { // 发送消息
                       kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
                       }
                      // int i = 1 / 0;
                       // 提交事务
                       kafkaProducer.commitTransaction();
                       } catch (Exception e) { // 终止事务
                       kafkaProducer.abortTransaction();
                       } finally { // 5. 关闭资源
                       kafkaProducer.close();
                       }
                       }
                      }
                      

                      9、生产经验——数据有序

                      10、生产经验——数据乱序

                      13、Kafka Broker

                      1、zookeeper存储的kafka信息

                      启动 zk
                       bin/zkCli.sh
                       
                      通过 ls 命令可以查看 kafka 相关信息
                      ls /kafka
                      

                      2、kafka中的Controller和状态机

                      Controller 是从 Broker 中选举出来的,负责分区 Leader 和 Follower 的管理。当某个分区的 leader 副本发生故障时,由 Controller 负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR(In-Sync Replica)集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用kafka-topics.sh脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。

                      Kafka 中 Contorller 的选举的工作依赖于 Zookeeper,成功竞选为控制器的 broker 会在 Zookeeper 中创建/controller这个临时(EPHEMERAL)节点。

                      选举过程

                      Broker 启动的时候尝试去读取/controller节点的brokerid的值,如果brokerid的值不等于-1,则表明已经有其他的 Broker 成功成为 Controller 节点,当前 Broker 主动放弃竞选;如果不存在/controller节点,或者 brokerid 数值异常,当前 Broker 尝试去创建/controller这个节点,此时也有可能其他 broker 同时去尝试创建这个节点,只有创建成功的那个 broker 才会成为控制器,而创建失败的 broker 则表示竞选失败。每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId。

                      实现

                      Controller 读取 Zookeeper 中的节点数据,初始化上下文(Controller Context),并管理节点变化,变更上下文,同时也需要将这些变更信息同步到其他普通的 broker 节点中。Controller 通过定时任务,或者监听器模式获取 zookeeper 信息,事件监听会更新更新上下文信息,如图所示,Controller 内部也采用生产者-消费者实现模式,Controller 将 zookeeper 的变动通过事件的方式发送给事件队列,队列就是一个LinkedBlockingQueue,事件消费者线程组通过消费消费事件,将相应的事件同步到各 Broker 节点。这种队列 FIFO 的模式保证了消息的有序性。

                      controller职责

                      Controller 被选举出来,作为整个 Broker 集群的管理者,管理所有的集群信息和元数据信息。它的职责包括下面几部分:

                      • 处理 Broker 节点的上线和下线,包括自然下线、宕机和网络不可达导致的集群变动,Controller 需要及时更新集群元数据,并将集群变化通知到所有的 Broker 集群节点;

                      • 创建 Topic 或者 Topic 扩容分区,Controller 需要负责分区副本的分配工作,并主导 Topic 分区副本的 Leader 选举。

                      • 管理集群中所有的副本和分区的状态机,监听状态机变化事件,并作出相应的处理。Kafka 分区和副本数据采用状态机的方式管理,分区和副本的变化都在状态机内会引起状态机状态的变更,从而触发相应的变化事件。

                        什么是状态机?

                        Controller 管理着集群中所有副本和分区的状态机。大家不要被状态机这个词唬住了。理解状态机很简单。先理解模型,即这是什么关于什么模型,然后就是模型的状态有哪些,模型状态之间如何转换,转换时发送相应的变化事件。

                        Kafka 的分区和副本状态机很简单。我们先理解,这分别是管理 Kafka Topic 的分区和副本的。它们的状态也很简单,就是 CRUD,具体说来如下:

                        分区状态机

                        PartitionStateChange,管理 Topic 的分区,它有以下 4 种状态:

                        • NonExistentPartition:该状态表示分区没有被创建过或创建后被删除了。
                        • NewPartition:分区刚创建后,处于这个状态。此状态下分区已经分配了副本,但是还没有选举 leader,也没有 ISR 列表。
                        • OnlinePartition:一旦这个分区的 leader 被选举出来,将处于这个状态。
                        • OfflinePartition:当分区的 leader 宕机,转移到这个状态。

                          我们用一张图来直观的看看这些状态是如何变化的,以及在状态发生变化时 Controller 都有哪些操作:

                          副本状态机

                          ReplicaStateChange,副本状态,管理分区副本信息,它也有 4 种状态:

                          • NewReplica: 创建 topic 和分区分配后创建 replicas,此时,replica 只能获取到成为 follower 状态变化请求。
                          • OnlineReplica: 当 replica 成为 parition 的 assingned replicas 时,其状态变为 OnlineReplica, 即一个有效的 OnlineReplica。
                          • OfflineReplica: 当一个 replica 下线,进入此状态,这一般发生在 broker 宕机的情况下;
                          • NonExistentReplica: Replica 成功删除后,replica 进入 NonExistentReplica 状态。

                            副本状态间的变化如下图所示,Controller 在状态变化时会做出相应的操作:

                            3、Kafka Broker 工作流程

                            模拟 Kafka 上下线,Zookeeper 中数据变化

                            (1)查看/kafka/brokers/ids 路径上的节点。

                            [zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
                            [0, 1, 2]
                            

                            (2)查看/kafka/controller 路径上的数据。

                            [zk: localhost:2181(CONNECTED) 15] get /kafka/controller
                            {"version":1,"brokerid":0,"timestamp":"1637292471777"}
                            

                            (3)查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。

                            [zk: localhost:2181(CONNECTED) 16] get /kafka/brokers/topics/first/partitions/0/state
                            {"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1,2]}
                            

                            (4)停止 hadoop104 上的 kafka。

                            bin/kafka-server-stop.sh

                            (5)再次查看/kafka/brokers/ids 路径上的节点。

                            [zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids
                            [0, 1]
                            

                            (6)再次查看/kafka/controller 路径上的数据。

                            [zk: localhost:2181(CONNECTED) 15] get /kafka/controller
                            {"version":1,"brokerid":0,"timestamp":"1637292471777"}
                            

                            (7)再次查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。

                            [zk: localhost:2181(CONNECTED) 16] get 
                            /kafka/brokers/topics/first/partitions/0/state
                            {"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1]}
                            

                            (8)启动 hadoop104 上的 kafka。

                            bin/kafka-server-start.sh -

                            daemon ./config/server.properties

                            (9)再次观察(1)、(2)、(3)步骤中的内容。

                            4、Broker的重要参数

                            5、生产经验——Broker节点服役和退役

                            5.1、服役新节点

                            准备新节点

                            (1)关闭 hadoop104,并右键执行克隆操作。
                            (2)开启 hadoop105,并修改 IP 地址。
                            vim /etc/sysconfig/network-scripts/ifcfgens33
                            DEVICE=ens33
                            TYPE=Ethernet
                            ONBOOT=yes
                            BOOTPROTO=static
                            NAME="ens33"
                            IPADDR=192.168.10.105
                            PREFIX=24
                            GATEWAY=192.168.10.2
                            DNS1=192.168.10.2
                            (3)在 hadoop105 上,修改主机名称为 hadoop105。
                             vim /etc/hostname
                            hadoop105
                            (4)重新启动 hadoop104、hadoop105。
                            (5)修改 haodoop105 中 kafka 的 broker.id 为 3。
                            (6)删除 hadoop105 中 kafka 下的 datas 和 logs。
                            rm -rf datas/* logs/*
                            (7)启动 hadoop102、hadoop103、hadoop104 上的 kafka 集群。
                            zk.sh start
                            kf.sh start
                            (8)单独启动 hadoop105 中的 kafka。
                             bin/kafka-server-start.sh -daemon ./config/server.properties
                            

                            执行负载均衡操作

                            (1)创建一个要均衡的主题。
                            [atguigu@hadoop102 kafka]$ vim topics-to-move.json
                            { "topics": [
                             {"topic": "first"}
                             ],
                             "version": 1
                            }
                            (2)生成一个负载均衡的计划。
                            [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
                            Current partition replica assignment
                            {"version":1,"partitions":[{"topic":"first","partition":0,"replic
                            as":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","par
                            tition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"to
                            pic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","
                            any","any"]}]}
                            Proposed partition reassignment configuration
                            {"version":1,"partitions":[{"topic":"first","partition":0,"replic
                            as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","par
                            tition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to
                            pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
                            any","any"]}]}
                            (3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。
                            [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
                            输入如下内容:
                            {"version":1,"partitions":[{"topic":"first","partition":0,"replic
                            as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","par
                            tition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to
                            pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
                            any","any"]}]}
                            (4)执行副本存储计划。
                            [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
                            bootstrap-server hadoop102:9092 --reassignment-json-file 
                            increase-replication-factor.json --execute
                            (5)验证副本存储计划。
                            [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
                            bootstrap-server hadoop102:9092 --reassignment-json-file 
                            increase-replication-factor.json --verify
                            Status of partition reassignment:
                            Reassignment of partition first-0 is complete.
                            Reassignment of partition first-1 is complete.
                            Reassignment of partition first-2 is complete.
                            Clearing broker-level throttles on brokers 0,1,2,3
                            Clearing topic-level throttles on topic first
                            

                            5.2、退役旧节点

                            1)执行负载均衡操作
                            先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
                            (1)创建一个要均衡的主题。
                            [atguigu@hadoop102 kafka]$ vim topics-to-move.json
                            { "topics": [
                             {"topic": "first"}
                             ],
                             "version": 1
                            }
                            (2)创建执行计划。
                            [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
                            bootstrap-server hadoop102:9092 --topics-to-move-json-file 
                            topics-to-move.json --broker-list "0,1,2" --generate
                            Current partition replica assignment
                            {"version":1,"partitions":[{"topic":"first","partition":0,"replic
                            as":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","par
                            tition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"to
                            pic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any","
                            any","any"]}]}
                            Proposed partition reassignment configuration
                            {"version":1,"partitions":[{"topic":"first","partition":0,"replic
                            as":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","par
                            tition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"to
                            pic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","
                            any","any"]}]}
                            (3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)。
                            [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
                            {"version":1,"partitions":[{"topic":"first","partition":0,"replic
                            as":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","par
                            tition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"to
                            pic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","
                            any","any"]}]}
                            (4)执行副本存储计划。
                            [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
                            bootstrap-server hadoop102:9092 --reassignment-json-file 
                            increase-replication-factor.json --execute
                            (5)验证副本存储计划。
                            [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
                            bootstrap-server hadoop102:9092 --reassignment-json-file 
                            increase-replication-factor.json --verify
                            Status of partition reassignment:
                            Reassignment of partition first-0 is complete.
                            Reassignment of partition first-1 is complete.
                            Reassignment of partition first-2 is complete.
                            Clearing broker-level throttles on brokers 0,1,2,3
                            Clearing topic-level throttles on topic first
                            2)执行停止命令
                            在 hadoop105 上执行停止命令即可。
                            [atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh
                            

                            6、Kafka副本

                            6.1、副本基本信息

                            (1)Kafka 副本作用:提高数据可靠性。

                            (2)Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

                            (3)Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。

                            (4)Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

                            AR = ISR + OSR

                            ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。

                            OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。

                            6.2、Leader 选举流程

                            Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。

                            Controller 的信息同步工作是依赖于 Zookeeper 的。

                            (1)创建一个新的 topic,4 个分区,4 个副本
                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
                            hadoop102:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
                            Created topic atguigu1.
                            (2)查看 Leader 分布情况
                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
                            --topic atguigu1
                            Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
                            Configs: segment.bytes=1073741824
                            Topic: atguigu1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
                            Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
                            Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
                            Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3
                            (3)停止掉 hadoop105 的 kafka 进程,并查看 Leader 分区情况
                            [atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh
                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
                            --topic atguigu1
                            Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
                            Configs: segment.bytes=1073741824
                            Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
                            Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
                            Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
                            Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0
                            (4)停止掉 hadoop104 的 kafka 进程,并查看 Leader 分区情况
                            [atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh
                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
                            --topic atguigu1
                            Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
                            Configs: segment.bytes=1073741824
                            Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
                            Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
                            Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
                            Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0
                            (5)启动 hadoop105 的 kafka 进程,并查看 Leader 分区情况
                            [atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
                            --topic atguigu1
                            Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
                            Configs: segment.bytes=1073741824
                            Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3
                            Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3
                            Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3
                            Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3
                            (6)启动 hadoop104 的 kafka 进程,并查看 Leader 分区情况
                            [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
                            --topic atguigu1
                            Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
                            Configs: segment.bytes=1073741824
                            Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
                            Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
                            Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
                            Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2
                            (7)停止掉 hadoop103 的 kafka 进程,并查看 Leader 分区情况
                            [atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe 
                            --topic atguigu1
                            Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
                            Configs: segment.bytes=1073741824
                            Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
                            Topic: atguigu1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
                            Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
                            Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,2
                            

                            6.3、分区副本分配

                            如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka底层如何分配存储副本呢?

                            1)创建 16 分区,3 个副本

                            (1)创建一个新的 topic,名称为 second。

                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second

                            (2)查看分区和副本情况。

                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server

                            hadoop102:9092 --describe --topic second

                            Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

                            Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

                            Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0

                            Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

                            Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3

                            Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0

                            Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

                            Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

                            Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1

                            Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

                            Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3

                            Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0

                            Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

                            Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

                            Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0

                            Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

                            6.4、生产经验——Leader Partition 负载平衡

                            6.5、生产经验——增加副本因子

                            在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行

                            1)创建 topic
                            [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
                            hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four
                            2)手动增加副本存储
                            (1)创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)。
                            [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
                            输入如下内容:
                            {"version":1,"partitions":[{"topic":"four","partition":0,"replica
                            s":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"t
                            opic":"four","partition":2,"replicas":[0,1,2]}]}
                            (2)执行副本存储计划。
                            [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute