上游文档:
- Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 生成 Watermark》学习笔记
- Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 内置 Watermark 生成器》学习笔记
- Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记
Watermark
Watermark 是在各个算子生成的、用于标记当前数据流事件时间的对象。当 Watermark 到达后,就意味着该数据流原则上将 不会 再到达比 Watermark 的事件时间更小的消息,即在 Watermark 后到达的事件时间更小的消息视作延迟消息。
首先,让我们来看一下 Watermark 类的源码。
源码:org.apache.flink.api.common.eventtime.Watermark【Github】
package org.apache.flink.api.common.eventtime; import org.apache.flink.annotation.Public; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Date; @Public public final class Watermark implements Serializable { private static final long serialVersionUID = 1L; /** Thread local formatter for stringifying the timestamps. */ private static final ThreadLocal
TS_FORMATTER = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); // ------------------------------------------------------------------------ /** The watermark that signifies end-of-event-time. */ public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE); // ------------------------------------------------------------------------ /** The timestamp of the watermark in milliseconds. */ private final long timestamp; /** Creates a new watermark with the given timestamp in milliseconds. */ public Watermark(long timestamp) { this.timestamp = timestamp; } /** Returns the timestamp associated with this Watermark. */ public long getTimestamp() { return timestamp; } /** * Formats the timestamp of this watermark, assuming it is a millisecond timestamp. The returned * format is "yyyy-MM-dd HH:mm:ss.SSS". */ public String getFormattedTimestamp() { return TS_FORMATTER.get().format(new Date(timestamp)); } // ------------------------------------------------------------------------ @Override public boolean equals(Object o) { return this == o || o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp; } @Override public int hashCode() { return Long.hashCode(timestamp); } @Override public String toString() { return "Watermark @ " + timestamp + " (" + getFormattedTimestamp() + ')'; } } 可以看到,Watermark 类主要就是用来存储当前 watermark 的毫秒级时间戳,具体地:
- 使用时间戳构造实例化 Watermark 对象,Watermark 对象在实例化后,不能修改其存储的时间戳
- 提供 long getTimestamp() 和 String getFormattedTimestamp() 两种查询 Watermark 对象时间戳的方法
WatermarkGenerator
接着,我们来看 watermark 的生成接口 WatermarkGenerator 的源码。
源码:org.apache.flink.api.common.eventtime.WatermarkGenerator【Github】
package org.apache.flink.api.common.eventtime; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; /** * The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a * fixed interval). * *
Note: This WatermarkGenerator subsumes the previous distinction between the {@code * AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}. */ @Public public interface WatermarkGenerator
{ /** * Called for every event, allows the watermark generator to examine and remember the event * timestamps, or to emit a watermark based on the event itself. */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /** * Called periodically, and might emit a new watermark, or not. * * The interval in which this method is called and Watermarks are generated depends on {@link * ExecutionConfig#getAutoWatermarkInterval()}. */ void onPeriodicEmit(WatermarkOutput output); }
WatermarkGernator 接口,既可以基于消息,也可以基于周期。WatermarkGenerator 接口有两个方法:
- void onEvent(T event, long eventTimestamp, WatermarkOutput output):这个方法会在每个消息到达时被调用一次,其参数 event 为消息本身,参数 eventTimestamp 为消息的事件时间,output 为接收生成的 watermark 的对象。
- void (WatermarkOutput output):这个方法会被周期性地调用,其参数 output 为接收生成的 watermark 的对象。
在实现 WatermarkGenerator 接口时,既可以在 onEvent 方法中生成 watermark,也可以在 onPeriodicEmit 方法中生成 watermark。因此,基于 WatermarkGenerator 接口,可以实现 标记生成 或 周期性生成 两种 watermark 生成器。
下面,我们来看 Flink 内置的几个 watermark 生成器。
Flink 内置的 watermark 生成器
在这里,我们仅介绍 Flink 的 flink-core 项目中如下内置的 watermark 生成器:
- NoWatermarksGenerator:不生成 watermark 的生成器
- BoundedOutOfOrdernessWatermarks:固定延迟时间的周期性 watermark 生成器
- AscendingTimestampsWatermarks:零延迟时间的周期性 watermark 生成器
不生成 watermark 的生成器:NoWatermarksGenerator
最简单的,不生成任何 watermark 的生成器。在实现上,在 onEvent 方法和 onPeriodicEmit 方法中均不生成 watermark。
源码:org.apache.flink.api.common.eventtime.NoWatermarksGenerator【Github】
package org.apache.flink.api.common.eventtime; import org.apache.flink.annotation.Public; @Public public final class NoWatermarksGenerator
implements WatermarkGenerator { @Override public void onEvent(E event, long eventTimestamp, WatermarkOutput output) {} @Override public void onPeriodicEmit(WatermarkOutput output) {} } 固定延迟时间的周期性 watermark 生成器:BoundedOutOfOrdernessWatermarks
当输入数据流中消息的事件时间不完全有序,但是对于绝大部分元素,滞后时间通常不会超过一个固定的时间长度时,我们可以通过在当前最大事件时间的基础上减去一个固定延迟时间,来生成 watermark。Flink 内置的 watermark 生成器 BoundedOutOfOrdernessWatermarks 实现了这种功能。
源码:org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks【Github】
package org.apache.flink.api.common.eventtime; import org.apache.flink.annotation.Public; import java.time.Duration; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @Public public class BoundedOutOfOrdernessWatermarks
implements WatermarkGenerator { /** The maximum timestamp encountered so far. */ private long maxTimestamp; /** The maximum out-of-orderness that this watermark generator assumes. */ private final long outOfOrdernessMillis; /** * Creates a new watermark generator with the given out-of-orderness bound. * * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps. */ public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) { checkNotNull(maxOutOfOrderness, "maxOutOfOrderness"); checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative"); this.outOfOrdernessMillis = maxOutOfOrderness.toMillis(); // start so that our lowest watermark would be Long.MIN_VALUE. this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; } // ------------------------------------------------------------------------ @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); } } 可以看到,在 BoundedOutOfOrdernessWatermarks 类中:
- 使用固定的延迟时间 maxOutOfOrderness 来实例化
- 使用示例属性 maxTimestamp 存储当前所有消息的最大事件时间,当每个消息到达时,onEvent 方法被调用,并更新 maxTimestamp 属性
- 周期性地生成 watermark,当 onPeriodicEmit 方法被周期性地调用时,会根据当前的最大事件时间以及固定延迟时间来生成 watermark
零延迟时间的周期性 watermark 生成器:AscendingTimestampsWatermarks
当数据源中消息的事件时间单调递增时,当前事件时间(同时也是最大事件时间)就可以充当 watermark,因为后续到达的消息的事件时间一定不会比当前事件时间小。例如,当只读取一个 Kafka 分区,并使用 Kafka 的消息时间戳作为事件时间时,则可以保证事件时间的单调递增。
此时的 watermark 生成规则,就相当于是延迟为 0 的 “固定延迟时间的周期性生成器”。Flink 内置的 watermark 生成器 AscendingTimestampsWatermarks 实现了这个功能。
源码:org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks【Github】
package org.apache.flink.api.common.eventtime; import org.apache.flink.annotation.Public; import java.time.Duration; @Public public class AscendingTimestampsWatermarks
extends BoundedOutOfOrdernessWatermarks { /** Creates a new watermark generator with for ascending timestamps. */ public AscendingTimestampsWatermarks() { super(Duration.ofMillis(0)); } } 在实现上,AscendingTimestampsWatermarks 继承了 BoundedOutOfOrdernessWatermarks,并将延迟指定为 0。