Flink KafkaSource 启用动态分区检查

Flink KafkaSource 启用动态分区检查

在不同版本的Flink中,动态分区检查(Dynamic Partitions Check)启用方式可能会有一些变化。以下是不同版本变化的情况总结:

1. Flink版本<= 1.11 分区发现

Flink Kafka Consumer 支持发现动态创建的 Kafka 分区,并使用精准一次的语义保证去消耗它们。在初始检索分区元数据之后(即,当 Job 开始运行时)发现的所有分区将从最早可能的 offset 中消费。

默认情况下,是禁用了分区发现的。若要启用它,请在提供的属性配置中为 flink.partition-discovery.interval-millis 设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。

限制:当从 Flink 1.3.x 之前的 Flink 版本的 savepoint 恢复 consumer 时,分区发现无法在恢复运行时启用。如果启用了,那么还原将会失败并且出现异常。在这种情况下,为了使用分区发现,请首先在 Flink 1.3.x 中使用 savepoint,然后再从 savepoint 中恢复。


2. Flink版本:1.12 ~ 1.14 分区发现

Flink Kafka Consumer 支持发现动态创建的 Kafka 分区,并使用精准一次的语义保证去消耗它们。在初始检索分区元数据之后(即,当 Job 开始运行时)发现的所有分区将从最早可能的 offset 中消费。

默认情况下,是禁用了分区发现的。若要启用它,请在提供的属性配置中为 flink.partition-discovery.interval-millis 设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。


3. Flink版本:1.15~1.17 动态分区检查

为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。要启用动态分区检查,请将 partition.discovery.interval.ms 设置为非负值:

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区

分区检查功能默认不开启。需要显式地设置分区检查间隔才能启用此功能。


4. Flink版本:1.18 动态分区检查

为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。要启用动态分区检查,请将 partition.discovery.interval.ms 设置为正值:

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区

分区检查间隔默认为5分钟。需要显式地设置分区检查间隔为非正数才能关闭此功能。


5. Flink版本:1.19~1.20 动态分区检查

为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。要启用动态分区检查,请将 partition.discovery.interval.ms 设置为非负值:

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区

分区检查功能默认不开启。需要显式地设置分区检查间隔才能启用此功能。