学习文档:Flink 官方文档 - DataStream API - 用户自定义 Functions
学习笔记如下:
接口实现样例
用户可以通过实现接口来完成自定义 Functions。
实现接口并使用的样例:
class MyMapFunction implements MapFunction{ public Integer map(String value) { return Integer.parseInt(value); } } data.map(new MyMapFunction());
使用匿名类实现的样例:
data.map(new MapFunction() { public Integer map(String value) { return Integer.parseInt(value); } });
使用 Lambda 表达式实现(Java 8)样例:
data.filter(s -> s.startsWith("http://"));data.reduce((i1,i2) -> i1 + i2);
Rick Functions
所有的 Flink 函数类都有其 Rich 版本,在 Rick function 中,可以获取运行状态的上下文,而且拥有一些生命周期方法,可以实现更复杂的功能。
所有需要用户自定义的 function 的转化操作都可以转化为 rich function,只需要继承 Rich 版本的 Function 即可。
例如:可以将以下 MapFunction 代码
class MyMapFunction implements MapFunction{ public Integer map(String value) { return Integer.parseInt(value); } } 替换为
class MyMapFunction extends RichMapFunction{ public Integer map(String value) { return Integer.parseInt(value); } } 在使用自定义的 rich function 时,与非 Rich 版本一样,传给 map 算子即可:
data.map(new MyMapFunction());
Rich Function 也可以使用匿名类的方式定义。例如:
data.map (new RichMapFunction() { public Integer map(String value) { return Integer.parseInt(value); } });
累加器使用样例
累加器可以使用 Accumulator.add(V value) 方法将其递增;在作业结束时,Flink 会汇总(merge)所有部分的结果并将其发送给客户端。
使用计数器的方法如下:
Step 1|在需要使用累加器的用户自定义 function 中创建一个累加器对象(此处以计数器为例)
private IntCounter numLines = new IntCounter();
Step 2|在 rich function 的 open() 方法中注册累加器对象,也可以在次数定义名称
getRuntimeContext().addAccumulator("num-lines", this.numLines);
Step 3|在 function 的任何地方(包括 open() 和 close() 方法中)使用累加器
this.numLines.add(1);
Step 4|最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中。
myJobExecutionResult.getAccumulatorResult("num-lines");
单个作业的所有累加器共享一个命名空间,因此可以在不同操作 function 里使用同一个累加器,Flink 会在内部将所有具有相同名称的累加器合并起来。
需要注意的是,当前累加器的结果只有在整个作业结束后才可用。
计数器和累加器源码
计数器 IntCounter 类的源码如下:
// flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java @PublicEvolving public class IntCounter implements SimpleAccumulator{ private static final long serialVersionUID = 1L; private int localValue = 0; public IntCounter() {} public IntCounter(int value) { this.localValue = value; } // ------------------------------------------------------------------------ // Accumulator // ------------------------------------------------------------------------ /** Consider using {@link #add(int)} instead for primitive int values */ @Override public void add(Integer value) { localValue += value; } @Override public Integer getLocalValue() { return localValue; } @Override public void merge(Accumulator other) { this.localValue += other.getLocalValue(); } @Override public void resetLocal() { this.localValue = 0; } @Override public IntCounter clone() { IntCounter result = new IntCounter(); result.localValue = localValue; return result; } // ------------------------------------------------------------------------ // Primitive Specializations // ------------------------------------------------------------------------ public void add(int value) { localValue += value; } public int getLocalValuePrimitive() { return this.localValue; } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @Override public String toString() { return "IntCounter " + this.localValue; } }
- 使用 localValue 属性来存储当前 function 中的累加器的值
- add 方法:实现累加操作
- merge 方法:实现多个累加器的合并
- clone() 方法:实现累加器的复制
计数器 IntCounter 类实现了 SimpleAccumulator 接口,SimpleAccumulator 接口的源码如下:
// flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java @Public public interface SimpleAccumulator
extends Accumulator {} SimpleAccumulator 接口继承了 Accumulator 接口,并定义添加的值的类型与最终结果的类型相同;Accumulator 接口的源码如下:
@Public public interface Accumulator
extends Serializable, Cloneable { /** @param value The value to add to the accumulator object */ void add(V value); /** @return local The local value from the current UDF context */ R getLocalValue(); /** Reset the local value. This only affects the current UDF context. */ void resetLocal(); /** * Used by system internally to merge the collected parts of an accumulator at the end of the * job. * * @param other Reference to accumulator to merge in. */ void merge(Accumulator other); /** * Duplicates the accumulator. All subclasses need to properly implement cloning and cannot * throw a {@link java.lang.CloneNotSupportedException} * * @return The duplicated accumulator. */ Accumulator clone(); } - add 方法:实现累加器的累加操作
- getLocalValue() 方法:读取当前 UDF 中累加器的值
- resetLocal() 方法:重置当前 UDF 中累加器的值
- merge 方法:在 Flink 合并不同 UDF 的结果时使用
- clone() 方法:复制累加器对象,实现了 Clonable 接口
在 Accumulator 接口中,定义了添加的值的类型 V 和最终结果的类型 R。适用于实现更灵活的操作,例如直方图,添加的值为数字,最终结果的直方图。
在 SimpleAccumulator 接口中,要求添加的值的类型和最终结果的类型相同。适用于相对简单的操作,例如计数器。
自定义累加器
如果需要自定义累加器,只需要实现 Accumulator 接口或 SimpleAccumulator 接口即可。