Flink|《Flink 官方文档 - DataStream API - 事件时间 - 生成 Watermark》学习笔记

学习文档:Flink 官方文档 - DataStream API - 事件时间 - 生成 Watermark

学习笔记如下:


  • 事件时间:可以通过使用 TimestampAssigner API,从数据流中的每个元素获取的事件时间戳,通常是从元素中的某个字段去访问 / 提取时间戳
  • watermark:可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式

    定义 WatermarkStrategy

    如果需要自定义 WatermarkStrategy,则需要同时实现 TimestampAssigner 和 WatermarkGenerator:

    public interface WatermarkStrategy extends TimestampAssignerSupplier, WatermarkGeneratorSupplier{ /**
         * 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
         */
        @Override
        TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context);
        /**
         * 根据策略实例化一个 watermark 生成器。
         */
        @Override
        WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
    }
    

    通常,我们不用实现此接口,而是可以使用 WatermarkStrategy 工具类将自定义的 TimestampAssigner 与 WatermarkGenerator 进行绑定:

    WatermarkStrategy
            .>forBoundedOutOfOrderness(Duration.ofSeconds(20))
            .withTimestampAssigner((event, timestamp) -> event.f0);
    

    TimestampAssigner 的设置是可选的,大多数情况下,不用特别指定。例如,当使用 Kafka 或 Kinesis 数据源时,可以直接从 Kafka/Kinesis 数据源记录中获取到时间戳。

    使用 WatermarkStrategy

    • 在数据源上使用:数据源可以利用 watermark 生成逻辑中有关分片 / 分区(shards / partitions / splits)的信息,watermark 可以更精确;但必须使用特定数据源接口。
    • 在非数据源的 operator 之后:仅当无法直接在数据源上设置策略时,才应该使用这种方式。

      在使用 WatermarkStrategy 时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳将覆盖原有的时间戳和 watermark。

      空闲数据源的处理

      空闲数据源:如果数据源中的某一个分区 / 分片在一段时间内未发送事件数据,那么这类数据源称为空闲输入或空闲源。

      对于空闲数据源, WatermarkGenerator 也不会获得新数据去生成 watermark。在这种情况下,如果其他分区仍然发送事件数据时就会出现问题。因为下游算子 watermark 的计算方式是获取所有不同的上游并行数据源 watermark 的最小值,所以其 watermark 不会发生变化。

      对于这种情况,可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。

      WatermarkStrategy
              .>forBoundedOutOfOrderness(Duration.ofSeconds(20))
              .withIdleness(Duration.ofMinutes(1));
      

      watermark 对齐

      如果某一个分区 / 分片在一段时间内处理记录非常快,导致 watermark 的生成速度显著地高于其他分区。这对于下游使用 watermark 的算子来说,将遇到问题。

      在这种情况下,下游 operator 的 watermark 仍然是会不断前进的。但是,watermark 可能会被速度较慢的输入流拖住,导致下游算子可能需要缓存来自速度更快的数据流的大量数据,这将导致下游算子的状态进入不可控的增长。

      为了避免这种情况,我们可以打开 watermark alignment,这将不会有分区的 watermark 相较于其他分区的增速过快:

      WatermarkStrategy
              .>forBoundedOutOfOrderness(Duration.ofSeconds(20))
              .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));
      
      • 参数 1:对应 source 的组的标签,这个组中的 source 将共享这个 Watermarkstrategy;
      • 参数 2:在这个组中的所有 source 的 watermark 的最大 drift
      • 参数 3:当前最大 watermark 的更新频率

        Flink 实现 watermark 对齐的方法:通过暂停消费速度过快的 source,同时仍然消费速度较慢的 source

        自定义 WatermarkGenerator

        /**
         * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
         *
         * 

        注意: WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。 */ @Public public interface WatermarkGenerator { /** * 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。 */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /** * 周期性的调用,也许会生成新的 watermark,也许不会。 * *

        调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。 */ void onPeriodicEmit(WatermarkOutput output); }

        watermark 的生成方式:

        • 周期性生成:通常通过 onEvent() 观察传入的事件数据,然后在框架调用 onPeriodicEmit() 时发出 watermark。
        • 标记生成:查看 onEvent() 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 onPeriodicEmit() 发出 watermark。
          自定义周期性 Watermark 生成器

          可以通过 ExecutionConfig.setAutoWatermarkInterval(...) 指定生成 watermark 的时间间隔(每 n 毫秒),即 Flink 每过多久调用一次 WatermarkGenerator 的 onPeriodicEmit() 方法。

          样例:数据源存在一定程度的乱序;但是假定对于所有时间戳为 t 的元素,最早到达的元素和最晚到达的元素之间最大相差 n 毫秒。

          public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator { private final long maxOutOfOrderness = 3500; // 3.5 秒
              private long currentMaxTimestamp;
              @Override
              public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
              }
              @Override
              public void onPeriodicEmit(WatermarkOutput output) { // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
                  output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
              }
          }
          

          样例:数据源存在一定程度的延迟;但是假定所有元素都会在有限的延迟 n 秒后到达。

          /**
           * 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。
           */
          public class TimeLagWatermarkGenerator implements WatermarkGenerator { private final long maxTimeLag = 5000; // 5 秒
              @Override
              public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { // 处理时间场景下不需要实现
              }
              @Override
              public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
              }
          }
          

          自定义标记 Watermark 生成器

          标记 watermark 生成器通过观察流事件并在获取到带有 watermark 信息的特殊事件元素时发出 watermark。

          样例:

          public class PunctuatedAssigner implements WatermarkGenerator { @Override
             public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { if (event.hasWatermarkMarker()) { output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
                 }
             }
             @Override
             public void onPeriodicEmit(WatermarkOutput output) { // onEvent 中已经实现
             }
          }
          

          通过标记 watermark 生成器,虽然可以针对每个事件去生成 watermark。但是由于每个 watermark 都会在下游做一些计算,因此过多的 watermark 会降低程度性能。

          Watermark 策略与 Kafka 连接器

          当使用 Kafka 作为数据源时,每个 Kafka 分区可能会有一个单独的事件时间模式,这个时间模式可能是递增的(例如 Kafka 的消息时间戳),也可能是有界无序的(Kafka 消息中的字段)。但是,当使用 Kafka 数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件事件模式,这是问题是 Kafka 消费所有固有的。

          针对这种情况,Flink 提供了可识别 Kafka 分区的 watermark 生成机制,可以针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。

          例如,如果每个 Kafka 分区中的事件时间戳严格递增(比如使用 Kafka 的消息时间戳),则使用单调递增时间戳分配器按分区生成的 watermark 将生成完美的全局 watermark。

          当不使用 TimestampAssigner 时,会使用 Flink 记录自身的消息时间戳。

          KafkaSource kafkaSource = KafkaSource.builder()
              .setBootstrapServers(brokers)
              .setTopics("my-topic")
              .setGroupId("my-group")
              .setStartingOffsets(OffsetsInitializer.earliest())
              .setValueOnlyDeserializer(new SimpleStringSchema())
              .build();
          DataStream stream = env.fromSource(
              kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");
          

          在上图中,左侧的每个橙色圆柱体为一个 Kafka 分区,每个分区都有自己的 watermark,然后再右侧的 window 算子中,使用 shuffle 的机制,operator 选择自己接收到的所有上游数据流中最小的 watermark 作为自己当前确认的 watermark。

          算子处理 Watermark 的方式

          operator 在将 watermark 转发到下游之前,首先需要对该 watermark 触发的事件进行完整的处理,包括:

          • 该 watermark 触发的所有窗口数据

            仅当由此 watermark 触发计算进而生成的所有数据都被转发到下游之后,该 watermark 才会被发送到下游。也就是说,当 watermark 出现在算子的输出流时,说明由该 watermark 出现而产生的所有数据元素都已经在此 watermark 之前被发送到输出流了。

            上述规则也适用于存在多个输入流的 operator。但是,在这种情况下,算子当前的 watermark 会取其多个输入流中的最小值。

            实现原理:OneInputStreamOperator#processWatermark、TwoInputStreamOperator#processWatermark1 和 TwoInputStreamOperator#processWatermark2


            Flink 旧版 API(已可以弃用):AssignerWithPeriodicWatermarks、AssignerWithPunctuatedWatermarks

            Flink 新版 API:WatermarkStrategy,TimestampAssigner 和 WatermarkGenerator