Flink处理函数(2)—— 按键分区处理函数

 按键分区处理函数(KeyedProcessFunction):先进行分区,然后定义处理操作

1.定时器(Timer)和定时服务(TimerService)

  • 定时器(timers)是处理函数中进行时间相关操作的主要机制
  • 定时服务(TimerService)提供了注册定时器的功能

    TimerService 是 Flink 关于时间和定时器的基础服务接口:

    // 获取当前的处理时间
    long currentProcessingTime();
    // 获取当前的水位线(事件时间)
    long currentWatermark();
    // 注册处理时间定时器,当处理时间超过 time 时触发
    void registerProcessingTimeTimer(long time);
    // 注册事件时间定时器,当水位线超过 time 时触发
    void registerEventTimeTimer(long time);
    // 删除触发时间为 time 的处理时间定时器
    void deleteProcessingTimeTimer(long time);
    // 删除触发时间为 time 的事件时间定时器
    void deleteEventTimeTimer(long time);

    六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器

    尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间

    对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳保存起来,排队等待执行;可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

    基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的;利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次:

    long coalescedTime = time / 1000 * 1000; //时间戳(定时器默认的区分精度是毫秒)
    ctx.timerService().registerProcessingTimeTimer(coalescedTime); //注册定时器

    2.KeyedProcessFunction 的使用

    基础用法:

    stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())

    这里的MyKeyedProcessFunction即是KeyedProcessFunction的一个实现类;

    源码解析


    KeyedProcessFunction源码如下:

    public abstract class KeyedProcessFunction extends AbstractRichFunction {
        private static final long serialVersionUID = 1L;
        /**
         * Process one element from the input stream.
         *
         * 

    This function can output zero or more elements using the {@link Collector} parameter and * also update internal state or set timers using the {@link Context} parameter. * * @param value The input value. * @param ctx A {@link Context} that allows querying the timestamp of the element and getting a * {@link TimerService} for registering timers and querying the time. The context is only * valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * @throws Exception This method may throw exceptions. Throwing an exception will cause the * operation to fail and may trigger recovery. */ public abstract void processElement(I value, Context ctx, Collector out) throws Exception; /** * Called when a timer set using {@link TimerService} fires. * * @param timestamp The timestamp of the firing timer. * @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link * TimeDomain}, and the key of the firing timer and getting a {@link TimerService} for * registering timers and querying the time. The context is only valid during the invocation * of this method, do not store it. * @param out The collector for returning result values. * @throws Exception This method may throw exceptions. Throwing an exception will cause the * operation to fail and may trigger recovery. */ public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {} /** * Information available in an invocation of {@link #processElement(Object, Context, Collector)} * or {@link #onTimer(long, OnTimerContext, Collector)}. */ public abstract class Context { /** * Timestamp of the element currently being processed or timestamp of a firing timer. * *

    This might be {@code null}, for example if the time characteristic of your program is * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. */ public abstract Long timestamp(); /** A {@link TimerService} for querying time and registering timers. */ public abstract TimerService timerService(); /** * Emits a record to the side output identified by the {@link OutputTag}. * * @param outputTag the {@code OutputTag} that identifies the side output to emit to. * @param value The record to emit. */ public abstract void output(OutputTag outputTag, X value); /** Get key of the element being processed. */ public abstract K getCurrentKey(); } /** * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. */ public abstract class OnTimerContext extends Context { /** The {@link TimeDomain} of the firing timer. */ public abstract TimeDomain timeDomain(); /** Get key of the firing timer. */ @Override public abstract K getCurrentKey(); } }

    可以看到和ProcessFunction类似,都有一个processElement()和onTimer()方法,并且定义了一个Context抽象类;不同点在于类型参数多了一个K,也就是key的类型;

    代码示例

    ①处理时间语义

    public class ProcessingTimeTimerTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // 处理时间语义,不需要分配时间戳和watermark
            SingleOutputStreamOperator stream = env.addSource(new ClickSource());
            // 要用定时器,必须基于KeyedStream
            stream.keyBy(data -> true)
                    .process(new KeyedProcessFunction() {
                        @Override
                        public void processElement(Event value, Context ctx, Collector out) throws Exception {
                            Long currTs = ctx.timerService().currentProcessingTime();
                            out.collect("数据到达,到达时间:" + new Timestamp(currTs));
                            // 注册一个10秒后的定时器
                            ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
                        }
                        @Override
                        public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
                            out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));
                        }
                    })
                    .print();
            env.execute();
        }
    }

    通过ctx.timerService().currentProcessingTime()获取当前处理时间;

    通过ctx.timerService().registerProcessingTimeTimer来设置一个定时器;

    运行结果如下:

    由于定时器是处理时间的定时器,不用考虑水位线延时问题,因此10s后能够准时触发定时操作;


    ②事件时间语义:

    public class EventTimeTimerTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            SingleOutputStreamOperator stream = env.addSource(new CustomSource())
                    .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner() {
                                @Override
                                public long extractTimestamp(Event element, long recordTimestamp) {
                                    return element.timestamp;
                                }
                            }));
            // 基于KeyedStream定义事件时间定时器
            stream.keyBy(data -> true)
                    .process(new KeyedProcessFunction() {
                        @Override
                        public void processElement(Event value, Context ctx, Collector out) throws Exception {
                            out.collect("数据到达,时间戳为:" + ctx.timestamp());
                            out.collect("数据到达,水位线为:" + ctx.timerService().currentWatermark() + "\n -------分割线-------");
                            // 注册一个10秒后的定时器
                            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
                        }
                        @Override
                        public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
                            out.collect("定时器触发,触发时间:" + timestamp);
                        }
                    })
                    .print();
            env.execute();
        }
        // 自定义测试数据源
        public static class CustomSource implements SourceFunction {
            @Override
            public void run(SourceContext ctx) throws Exception {
                // 直接发出测试数据
                ctx.collect(new Event("Mary", "./home", 1000L));
                // 为了更加明显,中间停顿5秒钟
                Thread.sleep(5000L);
                // 发出10秒后的数据
                ctx.collect(new Event("Mary", "./home", 11000L));
                Thread.sleep(5000L);
                // 发出10秒+1ms后的数据
                ctx.collect(new Event("Alice", "./cart", 11001L));
                Thread.sleep(5000L);
            }
            @Override
            public void cancel() { }
        }
    }
    

    运行结果如下:

    运行结果解释:

    ①第一条数据到来时,时间戳为1000,但由于水位线的生成是周期性的(默认200ms),因此水位线不会立即发送改变,仍然是Long.MIN_VALUE,之后只要到了水位线生成的时间周期,就会依据当前最大的时间戳来生成水位线(默认减1)

    ②第二条数据到来时,显然水位线已经推进到了999,但仍然不会立即改变;

    ③在事件时间语义下,定时器触发的条件就是水位线推进到设定的时间;第一条数据到来之后,设定的定时器时间为11000,而当时间戳为11000的数据到来时,水位线还停留在999的位置,因此不会立即触发定时器;之后水位线会推进到10999(11000-1),同样无法触发定时器;

    ④第三条数据到来时,时间戳为11001,此时水位线推进到了10999,等到水位线周期性更新后,推进到11000(11001-1),这样第一个定时器就会触发

    ⑤然后等待5s后,没有新的数据到来,整个程序结束,将要退出,此时会将水位线推进到Long.MAX_VALUE,所以所有没有触发的定时器统一触发;

     学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili