深入学习Kafka数据消费大致流程(如何创建并使用Kafka消费者)

异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。

见代码:com.heima.kafka.chapter3.OffsetCommitAsyncCallback

但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

异步回调

try {

while (running.get()) {

ConsumerRecords records = consumer.poll(1000);

for (ConsumerRecord record : records) {

//do some logical processing.

}

// 异步回调

consumer.commitAsync(new OffsetCommitCallback() {

@Override

public void onComplete(Map offsets,Exception exception) {

if (exception == null) {

System.out.println(offsets);

} else {

log.error(“fail to commit offsets {}”, offsets, exception);

}

}

});

}

} finally {

consumer.close();

}

5.指定位移消费

到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。

seek()方法正好提供了这个功能,让我们得以追踪以前的消费或者回溯消费。

见代码库:com.heima.kafka.chapter3.SeekDemo

/**

  • 指定位移消费

    */

    public class SeekDemo extends ConsumerClientConfig {

    public static void main(String[] args) {

    Properties props = initConfig();

    KafkaConsumer consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList(topic));

    // timeout参数设置多少合适?太短会使分区分配失败,太长又有可能造成一些不必要的等待

    consumer.poll(Duration.ofMillis(2000));

    // 获取消费者所分配到的分区

    Set assignment = consumer.assignment();

    System.out.println(assignment);

    for (TopicPartition tp : assignment) {

    // 参数partition表示分区,offset表示指定从分区的哪个位置开始消费

    consumer.seek(tp, 10);

    }

    consumer.seek(new TopicPartition(topic,0),10);

    while (true) {

    ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));

    //consume the record.

    for (ConsumerRecord record : records) {

    System.out.println(record.offset() + “:” + record.value());

    }

    }

    }

    增加判断是否分配到了分区,见代码库:com.heima.kafka.chapter3.SeekDemoAssignment

    public static void main(String[] args) {

    Properties props = initConfig();

    KafkaConsumer consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList(topic));

    long start = System.currentTimeMillis();

    Set assignment = new HashSet<>();

    while (assignment.size() == 0) {

    consumer.poll(Duration.ofMillis(100));

    assignment = consumer.assignment();

    }

    long end = System.currentTimeMillis();

    System.out.println(end - start);

    System.out.println(assignment);

    for (TopicPartition tp : assignment) {

    consumer.seek(tp, 10);

    }

    while (true) {

    ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));

    //consume the record.

    for (ConsumerRecord record : records) {

    System.out.println(record.offset() + “:” + record.value());

    }

    }

    }

    • 指定从分区末尾开始消费 ,见代码库:com.heima.kafka.chapter3.SeekToEnd

      // 指定从分区末尾开始消费

      Map offsets = consumer.endOffsets(assignment);

      for (TopicPartition tp : assignment) {

      consumer.seek(tp, offsets.get(tp));

      }

      • 演示位移越界操作,修改代码如下:

        for (TopicPartition tp : assignment) {

        //consumer.seek(tp, offsets.get(tp));

        consumer.seek(tp, offsets.get(tp) + 1);

        }

        会通过auto.offset.reset参数的默认值将位置重置,效果如下:

        INFO [Consumer clientId=consumer-1, groupId=group.heima] Fetch offset 1 is out of range for partition heima-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:967)

        INFO [Consumer clientId=consumer-1, groupId=group.heima] Fetch offset 10 is out of range for partition heima-1, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:967)

        INFO [Consumer clientId=consumer-1, groupId=group.heima] Resetting offset for partition heima-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:583)

        INFO [Consumer clientId=consumer-1, groupId=group.heima] Resetting offset for partition heima-1 to offset 9. (org.apache.kafka.clients.consumer.internals.Fetcher:583)

        6.再均衡监听器

        再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取消息的。

        见代码库:com.heima.kafka.chapter3.CommitSyncInRebalance

        public static void main(String[] args) {

        Properties props = initConfig();

        KafkaConsumer consumer = new KafkaConsumer<>(props);

        Map currentOffsets = new HashMap<>();

        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {

        @Override

        public void onPartitionsRevoked(Collection partitions) {

        // 劲量避免重复消费

        consumer.commitSync(currentOffsets);

        }

        @Override

        public void onPartitionsAssigned(Collection partitions) {

        //do nothing.

        }

        });

        try {while (isRunning.get()) {

        ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));

        for (ConsumerRecord record : records) {

        System.out.println(record.offset() + “:” + record.value());

        // 异步提交消费位移,在发生再均衡动作之前可以通过再均衡监听器的 onPartitionsRevoked回调执行commitSync方法同步提交位移。

        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));

        }

        consumer.commitAsync(currentOffsets, null);

        }

        } finally {

        consumer.close();

        }

        }

        7.消费者拦截器

        之前章节讲了生产者拦截器,对应的消费者也有相应的拦截器概念,消费者拦截器主要是在消费到消息或者在提交消费位移时进行的一些定制化的操作。

        使用场景

        对消费消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那就视为无效,不需要再被处理。

        见代码库:com.heima.kafka.chapter3.ConsumerInterceptorTTL

        public ConsumerRecords onConsume(ConsumerRecords records) {

        System.out.println(“before:” + records);

        long now = System.currentTimeMillis();

        Map>> newRecords = new HashMap<>();

        for (TopicPartition tp : records.partitions()) {

        List> tpRecords = records.records(tp);

        List> newTpRecords = new ArrayList<> ();

        for (ConsumerRecord record : tpRecords) {

        if (now - record.timestamp() < EXPIRE_INTERVAL) {

        newTpRecords.add(record);

        }

        }

        if (!newTpRecords.isEmpty()) {

        newRecords.put(tp, newTpRecords);

        }

        }

        return new ConsumerRecords<>(newRecords);

        }

        实现自定义拦截器之后,需要在KafkaConsumer中配置指定这个拦截器,如下

        // 指定消费者拦截器

        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class .getName());

        效果演示

        发送端同时发送两条消息,其中一条修改timestamp的值来使其变得超时,如下:

        最后

        我还通过一些渠道整理了一些大厂真实面试主要有:蚂蚁金服、拼多多、阿里云、百度、唯品会、携程、丰巢科技、乐信、软通动力、OPPO、银盛支付、中国平安等初,中级,高级Java面试题集合,附带超详细答案,希望能帮助到大家。

        还有专门针对JVM、SPringBoot、SpringCloud、数据库、Linux、缓存、消息中间件、源码等相关面试题。

        O、银盛支付、中国平安等初,中级,高级Java面试题集合,附带超详细答案,希望能帮助到大家。**

        [外链图片转存中…(img-bYTbLYgz-1714738977997)]

        还有专门针对JVM、SPringBoot、SpringCloud、数据库、Linux、缓存、消息中间件、源码等相关面试题。

        [外链图片转存中…(img-8VNXNw0n-1714738977998)]

        本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录