如何让Kafka在保证高性能、高吞吐的同时通过各种机制来保证高可用性?(1)

最后

面试前一定少不了刷题,为了方便大家复习,我分享一波个人整理的面试大全宝典

  • Java核心知识整理

    Java核心知识

    • Spring全家桶(实战系列)
      • 其他电子书资料

        Step3:刷题

        既然是要面试,那么就少不了刷题,实际上春节回家后,哪儿也去不了,我自己是刷了不少面试题的,所以在面试过程中才能够做到心中有数,基本上会清楚面试过程中会问到哪些知识点,高频题又有哪些,所以刷题是面试前期准备过程中非常重要的一点。

        以下是我私藏的面试题库:

        本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

        需要这份系统化的资料的朋友,可以点击这里获取

        一、幂等性

        1.场景

        所谓幂等性,就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,二使用Kafka的幂等性功能就可以避免这种情况。

        幂等性是有条件的:

        • 只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
        • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个Topic-Partition 时,这中间的状态并没有同步。

          Producer 使用幂等性的示例非常简单,与正常情况下 Producer 使用相比变化不大,只需要把Producer 的配置 enable.idempotence 设置为 true 即可,如下所示:

          Properties props = new Properties();

          props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, “true”);

          props.put(“acks”, “all”); // 当 enable.idempotence 为 true,这里默认为 all

          props.put(“bootstrap.servers”, “localhost:9092”);

          props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

          props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

          KafkaProducer producer = new KafkaProducer(props);

          producer.send(new ProducerRecord(topic, “test”);

          二、事务

          1.场景

          幂等性并不能跨多个分区运作,而事务可以弥补这个缺憾,**事务可以保证对多个分区写入操作的原子性。**操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功部分失败的可能。

          为了实现事务,应用程序必须提供唯一的transactionalId,这个参数通过客户端程序来进行设定。

          见代码库:com.heima.kafka.chapter7.ProducerTransactionSend

          properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);

          2.前期准备

          事务要求生产者开启幂等性特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认值为true),如果显示设置为false,则会抛出异常。

          KafkaProducer提供了5个与事务相关的方法,详细如下:

          //初始化事务,前提是配置了transactionalId

          public void initTransactions()

          //开启事务

          public void beginTransaction()

          //为消费者提供事务内的位移提交操作

          public void sendOffsetsToTransaction(Map offsets, String consumerGroupId)

          //提交事务

          public void commitTransaction()

          //终止事务,类似于回滚

          public void abortTransaction()

          3.案例解析

          见代码库:com.heima.kafka.chapter7.ProducerTransactionSend

          消息发送端

          /**

          • Kafka Producer事务的使用

            */

            public class ProducerTransactionSend {

            public static final String topic = “topic-transaction”;

            public static final String brokerList = “localhost:9092”;

            public static final String transactionId = “transactionId”;

            public static void main(String[] args) {

            Properties properties = new Properties();

            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);

            KafkaProducer producer = new KafkaProducer<> (properties);

            producer.initTransactions();

            producer.beginTransaction();

            try {

            //处理业务逻辑并创建ProducerRecord

            ProducerRecord record1 = new ProducerRecord<>(topic, “msg1”);

            producer.send(record1);

            ProducerRecord record2 = new ProducerRecord<>(topic, “msg2”);

            producer.send(record2);

            ProducerRecord record3 = new ProducerRecord<>(topic, “msg3”);

            producer.send(record3);

            //处理一些其它逻辑

            producer.commitTransaction();

            } catch (ProducerFencedException e) {

            producer.abortTransaction();

            }

            producer.close();

            }

            }

            模拟事务回滚案例

            try {

            //处理业务逻辑并创建ProducerRecord

            ProducerRecord record1 = new ProducerRecord<>(topic, “msg1”);

            producer.send(record1);

            //模拟事务回滚案例

            System.out.println(1/0);

            ProducerRecord record2 = new ProducerRecord<>(topic, “msg2”);

            producer.send(record2);

            ProducerRecord record3 = new ProducerRecord<>(topic, “msg3”);

            producer.send(record3);

            //处理一些其它逻辑

            producer.commitTransaction();

            } catch (ProducerFencedException e) {

            producer.abortTransaction();

            }

            从上面案例中,msg1发送成功之后,出现了异常事务进行了回滚,则msg1消费端也收不到消息。

            三、控制器

            在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。

            Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:

            1.ZooInspector管理

            • 使用zookeeper图形化的客户端工具(ZooInspector)提供的jar来进行管理,启动如下:
              1. 定位到jar所在目录
              2. 运行jar文件 java -jar zookeeper-dev-ZooInspector.jar
              3. 连接Zookeeper

              {“version”:1,“brokerid”:0,“timestamp”:“1529210278988”}

              其中version在目前版本中固定为1,brokerid表示称为控制器的broker的id编号,timestamp表示竞选称为控制器时的时间戳。

              在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取**/controller节点**的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。

              Zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为**“控制器的纪元”**。

              controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,没选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。

              具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:

              1. 监听partition相关的变化。
              2. 监听topic相关的变化。
              3. 监听broker相关的变化。
              4. 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。

              四、可靠性保证

              1. 可靠性保证:确保系统在各种不同的环境下能够发生一致的行为
              2. Kafka的保证
              • 保证分区消息的顺序

                Docker步步实践

                目录文档:

                ①Docker简介

                ②基本概念

                ③安装Docker

                ④使用镜像:

                ⑤操作容器:

                ⑥访问仓库:

                ⑦数据管理:

                ⑧使用网络:

                ⑨高级网络配置:

                ⑩安全:

                ⑪底层实现:

                ⑫其他项目:

                本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

                需要这份系统化的资料的朋友,可以点击这里获取

                7-1715532627591)]

                本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

                需要这份系统化的资料的朋友,可以点击这里获取