Flink|《Flink 官方文档 - DataStream API - 用户自定义 Functions》学习笔记 + 源码分析

学习文档:Flink 官方文档 - DataStream API - 用户自定义 Functions

学习笔记如下:


接口实现样例

用户可以通过实现接口来完成自定义 Functions。

实现接口并使用的样例:

class MyMapFunction implements MapFunction { public Integer map(String value) { return Integer.parseInt(value); }
}
data.map(new MyMapFunction());

使用匿名类实现的样例:

data.map(new MapFunction () { public Integer map(String value) { return Integer.parseInt(value); }
});

使用 Lambda 表达式实现(Java 8)样例:

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

Rick Functions

所有的 Flink 函数类都有其 Rich 版本,在 Rick function 中,可以获取运行状态的上下文,而且拥有一些生命周期方法,可以实现更复杂的功能。

所有需要用户自定义的 function 的转化操作都可以转化为 rich function,只需要继承 Rich 版本的 Function 即可。

例如:可以将以下 MapFunction 代码

class MyMapFunction implements MapFunction { public Integer map(String value) { return Integer.parseInt(value); }
}

替换为

class MyMapFunction extends RichMapFunction { public Integer map(String value) { return Integer.parseInt(value); }
}

在使用自定义的 rich function 时,与非 Rich 版本一样,传给 map 算子即可:

data.map(new MyMapFunction());

Rich Function 也可以使用匿名类的方式定义。例如:

data.map (new RichMapFunction() { public Integer map(String value) { return Integer.parseInt(value); }
});

累加器使用样例

累加器可以使用 Accumulator.add(V value) 方法将其递增;在作业结束时,Flink 会汇总(merge)所有部分的结果并将其发送给客户端。

使用计数器的方法如下:

Step 1|在需要使用累加器的用户自定义 function 中创建一个累加器对象(此处以计数器为例)

private IntCounter numLines = new IntCounter();

Step 2|在 rich function 的 open() 方法中注册累加器对象,也可以在次数定义名称

getRuntimeContext().addAccumulator("num-lines", this.numLines);

Step 3|在 function 的任何地方(包括 open() 和 close() 方法中)使用累加器

this.numLines.add(1);

Step 4|最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器共享一个命名空间,因此可以在不同操作 function 里使用同一个累加器,Flink 会在内部将所有具有相同名称的累加器合并起来。

需要注意的是,当前累加器的结果只有在整个作业结束后才可用。

计数器和累加器源码

计数器 IntCounter 类的源码如下:

// flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java
@PublicEvolving
public class IntCounter implements SimpleAccumulator { private static final long serialVersionUID = 1L;
    private int localValue = 0;
    public IntCounter() {}
    public IntCounter(int value) { this.localValue = value;
    }
    // ------------------------------------------------------------------------
    //  Accumulator
    // ------------------------------------------------------------------------
    /** Consider using {@link #add(int)} instead for primitive int values */
    @Override
    public void add(Integer value) { localValue += value;
    }
    @Override
    public Integer getLocalValue() { return localValue;
    }
    @Override
    public void merge(Accumulator other) { this.localValue += other.getLocalValue();
    }
    @Override
    public void resetLocal() { this.localValue = 0;
    }
    @Override
    public IntCounter clone() { IntCounter result = new IntCounter();
        result.localValue = localValue;
        return result;
    }
    // ------------------------------------------------------------------------
    //  Primitive Specializations
    // ------------------------------------------------------------------------
    public void add(int value) { localValue += value;
    }
    public int getLocalValuePrimitive() { return this.localValue;
    }
    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------
    @Override
    public String toString() { return "IntCounter " + this.localValue;
    }
}
  • 使用 localValue 属性来存储当前 function 中的累加器的值
  • add 方法:实现累加操作
  • merge 方法:实现多个累加器的合并
  • clone() 方法:实现累加器的复制

    计数器 IntCounter 类实现了 SimpleAccumulator 接口,SimpleAccumulator 接口的源码如下:

    // flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java
    @Public
    public interface SimpleAccumulator extends Accumulator {}
    

    SimpleAccumulator 接口继承了 Accumulator 接口,并定义添加的值的类型与最终结果的类型相同;Accumulator 接口的源码如下:

    @Public
    public interface Accumulator extends Serializable, Cloneable { /** @param value The value to add to the accumulator object */
        void add(V value);
        /** @return local The local value from the current UDF context */
        R getLocalValue();
        /** Reset the local value. This only affects the current UDF context. */
        void resetLocal();
        /**
         * Used by system internally to merge the collected parts of an accumulator at the end of the
         * job.
         *
         * @param other Reference to accumulator to merge in.
         */
        void merge(Accumulator other);
        /**
         * Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
         * throw a {@link java.lang.CloneNotSupportedException}
         *
         * @return The duplicated accumulator.
         */
        Accumulator clone();
    }
    
    • add 方法:实现累加器的累加操作
    • getLocalValue() 方法:读取当前 UDF 中累加器的值
    • resetLocal() 方法:重置当前 UDF 中累加器的值
    • merge 方法:在 Flink 合并不同 UDF 的结果时使用
    • clone() 方法:复制累加器对象,实现了 Clonable 接口

      在 Accumulator 接口中,定义了添加的值的类型 V 和最终结果的类型 R。适用于实现更灵活的操作,例如直方图,添加的值为数字,最终结果的直方图。

      在 SimpleAccumulator 接口中,要求添加的值的类型和最终结果的类型相同。适用于相对简单的操作,例如计数器。

      自定义累加器

      如果需要自定义累加器,只需要实现 Accumulator 接口或 SimpleAccumulator 接口即可。