Flink 源码剖析|3. UDF 接口与富函数

3 UDF 接口与富函数

3.1 使用 UDF 函数

Flink 作业的数据流中的主要操作大部分都需要 UDF(user defined functions,用户自定义函数)。

3.1.1 使用普通 UDF 接口

实现 UDF 最基本的方法是实现 Flink 提供的接口。在 Java 具体实现时,可以使用实现接口、匿名类或 lambda 语法实现。

样例|实现 MapFunction 接口,将字符串转化为整数,并在 DataStream API 中使用

// 实现 MapFunction 接口
class MyMapFunction implements MapFunction { public Integer map(String value) { return Integer.parseInt(value); }
}
// 在 DataStream API 中使用实现接口的类
data.map(new MyMapFunction());

样例|在 DataStream API 中直接使用匿名类

data.map(new MapFunction () { public Integer map(String value) { return Integer.parseInt(value); }
});

样例|在 DataStream API 中使用 lambda 表达式语法

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

对于 UDF 接口的详细介绍,详见 3.2。

3.1.2 使用富函数

几乎所有 Function 接口的子接口,都有其富函数(Rich Function)版本;在富函数中,可以在获取运行状态的上下文,从而支持使用状态,从而支持实现更复杂的功能。

要使用富函数,只需要将实现 Function 子接口改为继承富函数版本的抽象类即可,在使用中,与使用非富函数一样,直接传给 DataStream API 即可。

样例|将非富函数版本的 MyMapFunction 替换为富函数版本

// 非 rich 版本
class MyMapFunction implements MapFunction { public Integer map(String value) { return Integer.parseInt(value); }
}
// rich 版本
class MyMapFunction extends RichMapFunction { public Integer map(String value) { return Integer.parseInt(value); }
}
// 使用 UDF
data.map(new MyMapFunction());

样例|使用匿名类的方式定义富函数

data.map (new RichMapFunction() { public Integer map(String value) { return Integer.parseInt(value); }
});

对于富函数的详细介绍,详见 3.3。

3.2 Function 接口的源码

在 3.1.1 例中的 MapFunction 接口是 Function 接口的子接口。Function 接口是所有 UDF 函数的父接口,在 Function 接口中没有任何方法,以便允许继承的扩展接口能够使用 Java 8 的 lambda 语法实现。

源码|Github|org.apache.flink.api.common.functions.Function

@Public
public interface Function extends java.io.Serializable {}

3.2.1 处理普通数据流

3.2.1.1 MapFunction(1 个输入,1 个输出)

输入 1 个 T 类型的记录,输出 1 个 O 类型的记录。

源码|Github|org.apache.flink.api.common.functions.MapFunction

@Public
@FunctionalInterface
public interface MapFunction extends Function, Serializable { O map(T value) throws Exception;
}
3.2.1.2 FlatMapFunction(1 个输入,任意输出)

输入 1 个 T 类型的记录,输出 0 个、1 个或多个 O 类型的记录。

可以将调用 out.collect() 方法向输出流写入元素。

源码|Github|org.apache.flink.api.common.functions.FlatMapFunction

@Public
@FunctionalInterface
public interface FlatMapFunction extends Function, Serializable { void flatMap(T value, Collector out) throws Exception;
}
3.2.1.3 FilterFunction(1 个输入,0 或 1 个输出)

输入 1 个 T 类型的记录,输出 0 条或 1 条 T 类型的记录。

Filter 为每一条元素执行一个返回布尔值的函数 filter(T value),并保留函数返回值为 true 的元素。

源码|Github|org.apache.flink.api.common.functions.FilterFunction

@Public
@FunctionalInterface
public interface FilterFunction extends Function, Serializable { boolean filter(T value) throws Exception;
}
3.2.1.4 ReduceFunction(多个输入,1 个输出)

输入 2 个 T 类型的记录,输出 1 个 T 类型的记录。

可以在数据流上滚动执行 reduce,不断将当前元素与最后一次 reduce 得到的值进行合并得到新值。

源码|Github|org.apache.flink.api.common.functions.ReduceFunction

@Public
@FunctionalInterface
public interface ReduceFunction extends Function, Serializable { T reduce(T value1, T value2) throws Exception;
}

3.2.2 处理窗口

3.2.2.1 WindowFunction(监控窗口)

适用于键控窗口(keyed / grouped windows)。输入 1 个或多个 IN 类型的记录,输出 0 个、1 个或多个 OUT 类型的元素。

WindowFunction 接口只有 1 个 apply() 方法,其中包含如下 4 个参数:当前键控窗口的 KEY 类型键值,W 类型的窗口,窗口中的所有 IN 类型元素的迭代器,以及接收 OUT 类型的输出流元素的迭代器。

源码|Github|org.apache.flink.streaming.api.functions.windowing.WindowFunction

@Public
public interface WindowFunction extends Function, Serializable { void apply(KEY key, W window, Iterable input, Collector out) throws Exception;
}
3.2.2.2 AllWindowFunction(全局窗口)

适用于全局窗口。输入 1 个或多个 IN 类型的记录,输出 0 个、1 个或多个 OUT 类型的元素。

AllWindowFunction 接口也只有 1 个 apply() 方法,其中包含 3 个参数,与 WindowFunction 中的含义一致。

源码|Github|org.apache.flink.streaming.api.functions.windowing.AllWindowFunction

@Public
public interface AllWindowFunction extends Function, Serializable { void apply(W window, Iterable values, Collector out) throws Exception;
}

3.2.3 处理多个数据流

3.2.3.1 JoinFunction(以成功关联的记录为单位)

输入两个数据流中的各 1 个记录,其类型分别为 IN1 和 IN2,输出 1 个 OUT 类型的元素。

使用 JoinFunction 关联 2 个数据流,当且仅当成功关联时,JOIN 方法才会被调用。JoinFunction 接口只有 1 个 apply() 方法,其中参数 first 为第 1 个数据流中的元素,参数 second 为第 2 个数据流中的元素,这 2 个元素是相互成功关联的。

源码|Github|org.apache.flink.api.common.functions.JoinFunction

@Public
@FunctionalInterface
public interface JoinFunction extends Function, Serializable { OUT join(IN1 first, IN2 second) throws Exception;
}
3.2.3.2 CoGroupFunction(处理窗口为单位)

适用于键控窗口。输入 当前窗口中 第 1 个数据流中所有 IN1 类型元素的迭代器和第 2 个数据流中所有 IN2 类型元素的迭代器,输出 0 个、1 个或多个 OUT 类型的元素。

源码|Github|org.apache.flink.api.common.functions.CoGroupFunction

@Public
@FunctionalInterface
public interface CoGroupFunction extends Function, Serializable { void coGroup(Iterable first, Iterable second, Collector out) throws Exception;
}
3.2.3.3 CoMapFunction(处理 2 个数据流,1 个输入,1 个输出)

适用于 “连接” 后的数据流。在 “连接” 后的数据流中,2 个数据流保留各自的类型,但允许两个流的处理逻辑之间共享状态。

输入来自第 1 个数据流的 1 个 IN1 类型元素或输入来自第 2 个数据流的 1 个 IN2 类型元素,输出 1 个 OUT 类型元素。

源码|Github|org.apache.flink.streaming.api.functions.co.CoMapFunction

@Public
public interface CoMapFunction extends Function, Serializable { OUT map1(IN1 value) throws Exception;
    OUT map2(IN2 value) throws Exception;
}

当接收到来自第 1 个数据流的记录时,map1(IN1 value) 方法将被调用;当接收到来自第 2 个数据流的记录时,map2(IN2 value) 方法将被调用。

3.2.3.4 CoFlatMapFunction(处理 1 个数据流,1 个输出,任意输出)

与 CoMapFunction 类似,适用于 “连接” 后的数据流。与 CoMapFunction 区别类似于 FlatMapFunction 与 MapFunction 的区别。

源码|Github|org.apache.flink.streaming.api.functions.co.CoFlatMapFunction

@Public
public interface CoFlatMapFunction extends Function, Serializable { void flatMap1(IN1 value, Collector out) throws Exception;
    void flatMap2(IN2 value, Collector out) throws Exception;
}

3.3 富函数的源码

上图为仅包含 Mapfunction、FlatMapFunction、FilterFunction 和 RedeuceFunction 作为样例的 Rich Function 的 UML 图。每个 Function 子接口的 Rich 版本,都是实现了该接口的抽象类,同时,这些抽象类都继承了 AbstractRichFunction。

下面我们逐个了解 RichFunction 接口、AbstractRichFunction 抽象类以及 Function 子接口的 rich 版本。

3.3.1 RichFunction 接口

RichFunction 接口是用于所有 Rich UDF 的基础接口,其中定义了函数的生命周期方法以及访问函数执行上下文的方法。具体地,包含如下方法:

  • open(Configuration parameters):初始化 Function 的方法。这个方法将在类似于 map、flatMap 等主方法(working methods)之前被调用,适合于执行一次性的初始化工作,其参数 parameters 中包含着配置信息。
  • open(OpenContext openContext):默认方法,与 open(Configuration parameters) 类似,但是其配置信息包含于 openContext 中。默认使用空的 Configuration 作为参数调用 open(Configuration parameters)。
  • close():关闭 Function 的方法。这个方法将在最后一次调用主方法之后被调用,适合于清理运行垃圾。
  • getRuntimeContext():获取 UDF 当前运行的上下文信息对象 RuntimeContext,例如当前 Function 的并行度,当前 subtask 的 index,当前 task 名称等。
  • getIterationRuntimeContext():当且仅当当前 Function 是迭代(iteration)的一部分时可用,返回多个 RuntimeContext 的迭代器。
  • setRuntimeContext(RuntimeContext t):设置 UDF 的上下文信息,仅被框架在创建 Function 的并行实例时调用。

    对于 RuntimeContext 的介绍详见 3.4。

    源码|Github|org.apache.flink.api.common.functions.RichFunction(无注释)

    @Public
    public interface RichFunction extends Function { @Deprecated
        void open(Configuration parameters) throws Exception;
        @PublicEvolving
        default void open(OpenContext openContext) throws Exception { open(new Configuration());
        }
        void close() throws Exception;
        RuntimeContext getRuntimeContext();
        IterationRuntimeContext getIterationRuntimeContext();
        void setRuntimeContext(RuntimeContext t);
    }
    

    接口中需要实现的方法包括:open(Configuration parameters)、close()、getRuntimeContext()、getIterationRuntimeContext() 和 setRuntimeContext(RuntimeContext t)。

    3.3.2 AbstractRichFunction 抽象类

    AbstractRichFunction 抽象类实现了 RichFunction 接口,添加了存储上下文信息的、不参与系列化的私有对象属性 runtimeContext,实现了接口中所有 5 个非默认方法:

    • open(Configuration parameters):不执行任何操作
    • close():不执行任何操作
    • getRuntimeContext() 和 getIterationRuntimeContext():检查表行返回对象属性 runtimeContext
    • setRuntimeContext(RuntimeContext t):设置对象属性 runtimeContext

      源码|Github|org.apache.flink.api.common.functions.AbstractRichFunction(部分)

      @Public
      public abstract class AbstractRichFunction implements RichFunction, Serializable { private transient RuntimeContext runtimeContext;
          @Override
          public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t;
          }
          @Override
          public RuntimeContext getRuntimeContext() { if (this.runtimeContext != null) { return this.runtimeContext;
              } else { throw new IllegalStateException("The runtime context has not been initialized.");
              }
          }
          @Override
          public IterationRuntimeContext getIterationRuntimeContext() { if (this.runtimeContext == null) { throw new IllegalStateException("The runtime context has not been initialized.");
              } else if (this.runtimeContext instanceof IterationRuntimeContext) { return (IterationRuntimeContext) this.runtimeContext;
              } else { throw new IllegalStateException("This stub is not part of an iteration step function.");
              }
          }
          @Override
          public void open(Configuration parameters) throws Exception {}
          @Override
          public void close() throws Exception {}
      }
      

      3.3.3 Function 子接口的 Rich 版本

      因为 Function 子接口的 Rich 版本实现了非 Rich 版本的接口,并继承了 AbstractRichFunction 抽象类,所以通常不需要添加额外的方法,只需要将非 Rich 版本接口中的方法改写为抽象方法即可。例如 MapFunction:

      源码|Github|org.apache.flink.api.common.functions.RichMapFunction

      @Public
      public abstract class RichMapFunction extends AbstractRichFunction
              implements MapFunction { private static final long serialVersionUID = 1L;
          @Override
          public abstract OUT map(IN value) throws Exception;
      }
      

      3.4 RuntimeContext

      每个并行的实例都会包含一个 RuntimeContext。RuntimeContext 接口包含函数执行的上下文信息,主要用于富函数,提供了如下功能:

      • 访问静态上下文信息(例如当前并行度)
      • 添加及访问累加器
      • 访问外部资源信息
      • 访问广播变量和分布式缓存
      • 访问并编辑状态

        下面,我们逐类介绍 RuntimeContext 接口的方法。

        3.4.1 访问静态上下文信息

        RuntimeContext 的包括如下获取静态上下文信息的方法:

        • getJobId():获取当前作业的 ID。
        • getTaskName():获取执行 UDF 的 task 名称。
        • getMetricGroup():获取当前 subtask 的指标组。
        • getNumberOfParallelSubtasks():获取执行 UDF 的 task 的并行度。
        • getMaxNumberOfParallelSubtasks():获取执行 UDF 的 task 的最大并行度。
        • getIndexOfThisSubtask():获取执行 UDF 的 subtask 的编号(编号从 0 开始)。
        • getAttemptNumber():获取执行 UDF 的 subtask 的尝试次数(第一次尝试的次数是 0)。
        • getTaskNameWithSubtasks():获取执行 UDF 的 subtask 的名称。这个名称如下 {任务名称} ({subtask的编号}/{并行度})#{尝试次数},例如 MyTask (3/6)#1,其中 {任务名称} 为 getTaskName() 的返回值,{subtask的编号} 为 getIndexOfThisSubtask() + 1,{并行度} 为 getNumberOfParallelSubtasks() 的返回值,{尝试次数} 为 getAttemptNumber() 的返回值。
        • getExecutionConfig():获取当前作业的执行配置。
        • createSerializer(TypeInformation typeInformation):获取指定类型的序列化器。
        • getGlobalJobParameters():获取作业的全局参数。
        • isObjectReuseEnabled():获取对象重用是否开启。
        • getUserCodeClassLoader():获取用户类(不在系统 classpath 的类)的加载器。
        • registerUserCodeClassLoaderReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook):在用户类加载器中注册自定义 hook。

          大概可以归纳为如下几种类型

          • 作业、task、subtask 的基本信息及尝试次数
          • 指标组信息
          • 包括序列化器和类加载器在内的配置信息

            源码|Github|org.apache.flink.api.common.functions.RuntimeContext(部分)

            JobID getJobId();
            String getTaskName();
            @PublicEvolving
            OperatorMetricGroup getMetricGroup();
            int getNumberOfParallelSubtasks();
            @PublicEvolving
            int getMaxNumberOfParallelSubtasks();
            int getIndexOfThisSubtask();
            int getAttemptNumber();
            String getTaskNameWithSubtasks();
            @Deprecated
            ExecutionConfig getExecutionConfig();
            @PublicEvolving
             TypeSerializer createSerializer(TypeInformation typeInformation);
            @PublicEvolving
            Map getGlobalJobParameters();
            @PublicEvolving
            boolean isObjectReuseEnabled();
            ClassLoader getUserCodeClassLoader();
            @PublicEvolving
            void registerUserCodeClassLoaderReleaseHookIfAbsent(
                    String releaseHookName, Runnable releaseHook);
            

            3.4.2 添加及访问累加器

            RuntimeContext 的包括如下累加器相关的方法:

            • addAccumulator(String name, Accumulator accumulator):添加累加器
            • Accumulator getAccumulator(String name):获取基础类型的累加器
            • getIntCounter(String name):获取 IntCounter 类型的累加器
            • getLongCounter(String name):获取 LongCounter 类型的累加器
            • getDoubleCounter(String name):获取 DoubleCounter 类型的累加器
            • getHistogram(String name):获取直方图类型的累加器

              源码|Github|org.apache.flink.api.common.functions.RuntimeContext(部分)

               void addAccumulator(String name, Accumulator accumulator);
               Accumulator getAccumulator(String name);
              @PublicEvolving
              IntCounter getIntCounter(String name);
              @PublicEvolving
              LongCounter getLongCounter(String name);
              @PublicEvolving
              DoubleCounter getDoubleCounter(String name);
              @PublicEvolving
              Histogram getHistogram(String name);
              

              3.4.3 访问外部资源

              RuntimeContext 提供了 getExternalResourceInfos(String resourceName) 外部资源信息。

              源码|Github|org.apache.flink.api.common.functions.RuntimeContext#getExternalResourceInfos

              @PublicEvolving
              Set getExternalResourceInfos(String resourceName);
              

              3.4.4 访问广播变量

              RuntimeContext 提供了如下访问广播变量的方法:

              • hasBroadcastVariable(String name):检查是否包含名称为 name 的广播变量
              • getBroadcastVariable(String name):返回名称为 name 的广播变量
              • getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer initializer):返回名称为 name 的广播变量,并使用 initializer 初始化。
              • getDistributedCache():访问分布式缓存。

                源码|Github|org.apache.flink.api.common.functions.RuntimeContext(部分)

                @PublicEvolving
                boolean hasBroadcastVariable(String name);
                 List getBroadcastVariable(String name);
                 C getBroadcastVariableWithInitializer(
                        String name, BroadcastVariableInitializer initializer);
                DistributedCache getDistributedCache();
                

                3.4.5 访问并编辑状态

                RuntimeContext 提供了如下方法用于获取不同类型的状态对象,在获取到状态对象后,可以对状态对象进行编辑:

                • getState(ValueStateDescriptor stateProperties):获取 ValueState 类型的状态对象
                • getListState(ListStateDescriptor stateProperties):获取 ListState 类型的状态对象
                • getReducingState(ReducingStateDescriptor stateProperties):获取 ReducingState 类型的状态对象
                • getAggregatingState(AggregatingStateDescriptor stateProperties):获取 AggregatingState 类型的状态对象
                • getMapState(MapStateDescriptor stateProperties):获取 MapState 类型的状态对象

                  源码|Github|org.apache.flink.api.common.functions.RuntimeContext(部分)

                  @PublicEvolving
                   ValueState getState(ValueStateDescriptor stateProperties);
                  @PublicEvolving
                   ListState getListState(ListStateDescriptor stateProperties);
                  @PublicEvolving
                   ReducingState getReducingState(ReducingStateDescriptor stateProperties);
                  @PublicEvolving
                   AggregatingState getAggregatingState(
                          AggregatingStateDescriptor stateProperties);
                  @PublicEvolving
                   MapState getMapState(MapStateDescriptor stateProperties);
                  

                  参考文献

                  • 《Flink 官方文档:应用开发 - DataStream API - 用户自定义 Functions》