Flink 源码剖析|Watermark 与各内置 Watermark 生成器(WatermarkGenerator)

上游文档:

  • Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 生成 Watermark》学习笔记
  • Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 内置 Watermark 生成器》学习笔记
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

    Watermark

    Watermark 是在各个算子生成的、用于标记当前数据流事件时间的对象。当 Watermark 到达后,就意味着该数据流原则上将 不会 再到达比 Watermark 的事件时间更小的消息,即在 Watermark 后到达的事件时间更小的消息视作延迟消息。

    首先,让我们来看一下 Watermark 类的源码。

    源码:org.apache.flink.api.common.eventtime.Watermark【Github】

    package org.apache.flink.api.common.eventtime;
    import org.apache.flink.annotation.Public;
    import java.io.Serializable;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    @Public
    public final class Watermark implements Serializable { private static final long serialVersionUID = 1L;
        /** Thread local formatter for stringifying the timestamps. */
        private static final ThreadLocal TS_FORMATTER =
                ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));
        // ------------------------------------------------------------------------
        /** The watermark that signifies end-of-event-time. */
        public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
        // ------------------------------------------------------------------------
        /** The timestamp of the watermark in milliseconds. */
        private final long timestamp;
        /** Creates a new watermark with the given timestamp in milliseconds. */
        public Watermark(long timestamp) { this.timestamp = timestamp;
        }
        /** Returns the timestamp associated with this Watermark. */
        public long getTimestamp() { return timestamp;
        }
        /**
         * Formats the timestamp of this watermark, assuming it is a millisecond timestamp. The returned
         * format is "yyyy-MM-dd HH:mm:ss.SSS".
         */
        public String getFormattedTimestamp() { return TS_FORMATTER.get().format(new Date(timestamp));
        }
        // ------------------------------------------------------------------------
        @Override
        public boolean equals(Object o) { return this == o
                    || o != null
                            && o.getClass() == Watermark.class
                            && ((Watermark) o).timestamp == this.timestamp;
        }
        @Override
        public int hashCode() { return Long.hashCode(timestamp);
        }
        @Override
        public String toString() { return "Watermark @ " + timestamp + " (" + getFormattedTimestamp() + ')';
        }
    }
    

    可以看到,Watermark 类主要就是用来存储当前 watermark 的毫秒级时间戳,具体地:

    • 使用时间戳构造实例化 Watermark 对象,Watermark 对象在实例化后,不能修改其存储的时间戳
    • 提供 long getTimestamp() 和 String getFormattedTimestamp() 两种查询 Watermark 对象时间戳的方法

      WatermarkGenerator

      接着,我们来看 watermark 的生成接口 WatermarkGenerator 的源码。

      源码:org.apache.flink.api.common.eventtime.WatermarkGenerator【Github】

      package org.apache.flink.api.common.eventtime;
      import org.apache.flink.annotation.Public;
      import org.apache.flink.api.common.ExecutionConfig;
      /**
       * The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
       * fixed interval).
       *
       * 

      Note: This WatermarkGenerator subsumes the previous distinction between the {@code * AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}. */ @Public public interface WatermarkGenerator { /** * Called for every event, allows the watermark generator to examine and remember the event * timestamps, or to emit a watermark based on the event itself. */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /** * Called periodically, and might emit a new watermark, or not. * *

      The interval in which this method is called and Watermarks are generated depends on {@link * ExecutionConfig#getAutoWatermarkInterval()}. */ void onPeriodicEmit(WatermarkOutput output); }

      WatermarkGernator 接口,既可以基于消息,也可以基于周期。WatermarkGenerator 接口有两个方法:

      • void onEvent(T event, long eventTimestamp, WatermarkOutput output):这个方法会在每个消息到达时被调用一次,其参数 event 为消息本身,参数 eventTimestamp 为消息的事件时间,output 为接收生成的 watermark 的对象。
      • void (WatermarkOutput output):这个方法会被周期性地调用,其参数 output 为接收生成的 watermark 的对象。

        在实现 WatermarkGenerator 接口时,既可以在 onEvent 方法中生成 watermark,也可以在 onPeriodicEmit 方法中生成 watermark。因此,基于 WatermarkGenerator 接口,可以实现 标记生成周期性生成 两种 watermark 生成器。

        下面,我们来看 Flink 内置的几个 watermark 生成器。

        Flink 内置的 watermark 生成器

        在这里,我们仅介绍 Flink 的 flink-core 项目中如下内置的 watermark 生成器:

        • NoWatermarksGenerator:不生成 watermark 的生成器
        • BoundedOutOfOrdernessWatermarks:固定延迟时间的周期性 watermark 生成器
        • AscendingTimestampsWatermarks:零延迟时间的周期性 watermark 生成器

          不生成 watermark 的生成器:NoWatermarksGenerator

          最简单的,不生成任何 watermark 的生成器。在实现上,在 onEvent 方法和 onPeriodicEmit 方法中均不生成 watermark。

          源码:org.apache.flink.api.common.eventtime.NoWatermarksGenerator【Github】

          package org.apache.flink.api.common.eventtime;
          import org.apache.flink.annotation.Public;
          @Public
          public final class NoWatermarksGenerator implements WatermarkGenerator { @Override
              public void onEvent(E event, long eventTimestamp, WatermarkOutput output) {}
              @Override
              public void onPeriodicEmit(WatermarkOutput output) {}
          }
          

          固定延迟时间的周期性 watermark 生成器:BoundedOutOfOrdernessWatermarks

          当输入数据流中消息的事件时间不完全有序,但是对于绝大部分元素,滞后时间通常不会超过一个固定的时间长度时,我们可以通过在当前最大事件时间的基础上减去一个固定延迟时间,来生成 watermark。Flink 内置的 watermark 生成器 BoundedOutOfOrdernessWatermarks 实现了这种功能。

          源码:org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks【Github】

          package org.apache.flink.api.common.eventtime;
          import org.apache.flink.annotation.Public;
          import java.time.Duration;
          import static org.apache.flink.util.Preconditions.checkArgument;
          import static org.apache.flink.util.Preconditions.checkNotNull;
          @Public
          public class BoundedOutOfOrdernessWatermarks implements WatermarkGenerator { /** The maximum timestamp encountered so far. */
              private long maxTimestamp;
              /** The maximum out-of-orderness that this watermark generator assumes. */
              private final long outOfOrdernessMillis;
              /**
               * Creates a new watermark generator with the given out-of-orderness bound.
               *
               * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
               */
              public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) { checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
                  checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
                  this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
                  // start so that our lowest watermark would be Long.MIN_VALUE.
                  this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
              }
              // ------------------------------------------------------------------------
              @Override
              public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
              }
              @Override
              public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
              }
          }
          

          可以看到,在 BoundedOutOfOrdernessWatermarks 类中:

          • 使用固定的延迟时间 maxOutOfOrderness 来实例化
          • 使用示例属性 maxTimestamp 存储当前所有消息的最大事件时间,当每个消息到达时,onEvent 方法被调用,并更新 maxTimestamp 属性
          • 周期性地生成 watermark,当 onPeriodicEmit 方法被周期性地调用时,会根据当前的最大事件时间以及固定延迟时间来生成 watermark

            零延迟时间的周期性 watermark 生成器:AscendingTimestampsWatermarks

            当数据源中消息的事件时间单调递增时,当前事件时间(同时也是最大事件时间)就可以充当 watermark,因为后续到达的消息的事件时间一定不会比当前事件时间小。例如,当只读取一个 Kafka 分区,并使用 Kafka 的消息时间戳作为事件时间时,则可以保证事件时间的单调递增。

            此时的 watermark 生成规则,就相当于是延迟为 0 的 “固定延迟时间的周期性生成器”。Flink 内置的 watermark 生成器 AscendingTimestampsWatermarks 实现了这个功能。

            源码:org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks【Github】

            package org.apache.flink.api.common.eventtime;
            import org.apache.flink.annotation.Public;
            import java.time.Duration;
            @Public
            public class AscendingTimestampsWatermarks extends BoundedOutOfOrdernessWatermarks { /** Creates a new watermark generator with for ascending timestamps. */
                public AscendingTimestampsWatermarks() { super(Duration.ofMillis(0));
                }
            }
            

            在实现上,AscendingTimestampsWatermarks 继承了 BoundedOutOfOrdernessWatermarks,并将延迟指定为 0。