本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。
本文除了maven依赖外,没有其他依赖。
一、Flink测试概述
Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。
本文示例的maven依赖:
UTF-8 UTF-8 1.8 1.8 1.8 2.12 1.17.0 org.apache.flink flink-clients${flink.version} provided org.apache.flink flink-java${flink.version} provided org.apache.flink flink-streaming-java${flink.version} provided org.apache.flink flink-csv${flink.version} provided org.apache.flink flink-json${flink.version} provided junit junit4.13 org.mockito mockito-core4.0.0 test
二、测试用户自定义函数
可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。
1、单元测试无状态、无时间限制的 UDF
1)、示例-mapFunction
以下无状态的 MapFunction 为例:
public class IncrementMapFunction implements MapFunction{ @Override public Long map(Long record) throws Exception { return record + 1; } }
通过传递合适地参数并验证输出,可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。
import static org.junit.Assert.assertEquals; import org.apache.flink.api.common.functions.MapFunction; import org.junit.Test; /** * @author alanchan * */ public class TestDemo { public class IncrementMapFunction implements MapFunction{ @Override public Long map(Long record) throws Exception { return record + 1; } } @Test public void testIncrement() throws Exception { IncrementMapFunction incrementer = new IncrementMapFunction(); assertEquals((Long) 3L, incrementer.map(2L)); } }
2)、示例-flatMapFunction
对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; /** * @author alanchan * */ @RunWith(MockitoJUnitRunner.class) public class TestDemo2 { public static class IncrementFlatMapFunction implements FlatMapFunction{ @Override public void flatMap(String value, Collector out) throws Exception { Long sum = 0L; for (String num : value.split(",")) { sum += Long.valueOf(num); } out.collect(sum); } } @Test public void testSum() throws Exception { IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction(); Collector collector = mock(Collector.class); incrementer.flatMap("1,2,3,4,5", collector); Mockito.verify(collector, times(1)).collect(15L); } }
2、对有状态或及时 UDF 和自定义算子进行单元测试
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:
·OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
· KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
· TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
· KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)
要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。
1)、DataStream API 测试依赖
如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:
org.apache.flink flink-test-utils1.17.2 test
在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。
2)、Table API 测试依赖
如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:
org.apache.flink flink-table-test-utils1.17.2 test
这将自动引入查询计划器和运行时,分别用于计划和执行查询。
flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。
3)、flatmap function 单元测试
现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。
示例如下:
/* * @Author: alanchan * @LastEditors: alanchan * @Description: 单元测试flatmap,如果是偶数则存储原值及平方数 */ import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.operators.StreamFlatMap; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector; import org.junit.Before; import org.junit.Test; public class TestStatefulFlatMapDemo3 { static class AlanFlatMapFunction implements FlatMapFunction{ @Override public void flatMap(Integer value, Collector out) throws Exception { if (value % 2 == 0) { out.collect(value); out.collect(value * value); } } } OneInputStreamOperatorTestHarness testHarness; @Before public void setupTestHarness() throws Exception { StreamFlatMap operator = new StreamFlatMap (new AlanFlatMapFunction()); testHarness = new OneInputStreamOperatorTestHarness (operator); testHarness.open(); } @Test public void testFlatMap2() throws Exception { long initialTime = 0L; ConcurrentLinkedQueue
KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。
示例如下:
/* * @Author: alanchan * @LastEditors: alanchan * @Description: 按照城市分类,并将城市缩写变成大写 */ import com.google.common.collect.Lists; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamFlatMap; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; public class TestStatefulFlatMapDemo2 { @Data @NoArgsConstructor @AllArgsConstructor static class User { private int id; private String name; private int age; private String city; } static class AlanFlatMapFunction extends RichFlatMapFunction{ // The state is only accessible by functions applied on a {@code KeyedStream} ValueState previousInput; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); previousInput = getRuntimeContext() .getState(new ValueStateDescriptor ("previousInput", User.class)); } @Override public void flatMap(User input, Collector out) throws Exception { previousInput.update(input); input.setCity(input.getCity().toUpperCase()); out.collect(input); } } AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction(); OneInputStreamOperatorTestHarness testHarness; @Before public void setupTestHarness() throws Exception { alanFlatMapFunction = new AlanFlatMapFunction(); testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction), new KeySelector () { @Override public String getKey(User value) throws Exception { return value.getCity(); } }, Types.STRING); testHarness.open(); } @Test public void testFlatMap() throws Exception { testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10); ValueState previousInput = alanFlatMapFunction.getRuntimeContext().getState( new ValueStateDescriptor<>("previousInput", User.class)); User stateValue = previousInput.value(); Assert.assertEquals( Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)), testHarness.extractOutputStreamRecords()); Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue); testHarness.processElement(new User(2, "alan", 19, "bj"), 10000); Assert.assertEquals( Lists.newArrayList( new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10), new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)), testHarness.extractOutputStreamRecords()); Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value()); } }
4)、Process Function 单元测试
除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。
·OneInputStreamOperatorTestHarness示例
import com.google.common.collect.Lists; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Before; import org.junit.Test; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ public class TestProcessOperatorDemo1 { // public abstract class KeyedProcessFunctionstatic class AlanProcessFunction extends KeyedProcessFunction { @Override public void processElement(String value, KeyedProcessFunction .Context ctx, Collector out) throws Exception { ctx.timerService().registerProcessingTimeTimer(50); out.collect("vx->" + value); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { // 到达时间点触发事件操作 out.collect(String.format("定时器在 %d 被触发", timestamp)); } } private OneInputStreamOperatorTestHarness testHarness; private AlanProcessFunction processFunction; @Before public void setupTestHarness() throws Exception { processFunction = new AlanProcessFunction(); testHarness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedProcessOperator<>(processFunction), x -> "1", Types.STRING); // Function time is initialized to 0 testHarness.open(); } @Test public void testProcessElement() throws Exception { testHarness.processElement("alanchanchn", 10); Assert.assertEquals( Lists.newArrayList( new StreamRecord<>("vx->alanchanchn", 10)), testHarness.extractOutputStreamRecords()); } @Test public void testOnTimer() throws Exception { // test first record testHarness.processElement("alanchanchn", 10); Assert.assertEquals(1, testHarness.numProcessingTimeTimers()); // Function time 设置为 100 testHarness.setProcessingTime(100); Assert.assertEquals( Lists.newArrayList( new StreamRecord<>("vx->alanchanchn", 10), new StreamRecord<>("定时器在 100 被触发")), testHarness.extractOutputStreamRecords()); } }
·ProcessFunctionTestHarnesses示例:
本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。
import java.util.Arrays; import java.util.Collections; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.streaming.util.BroadcastOperatorTestHarness; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /* * @Author: alanchan * * @LastEditors: alanchan * * @Description: */ public class TestProcessOperatorDemo3 { @Data @NoArgsConstructor @AllArgsConstructor static class User { private int id; private String name; private int age; private String city; } // 测试ProcessFunction 的 processElement @Test public void testProcessFunction() throws Exception { // public abstract class ProcessFunction ProcessFunctionfunction = new ProcessFunction () { @Override public void processElement( String value, Context ctx, Collector out) throws Exception { out.collect("vx->" + value); } }; OneInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses .forProcessFunction(function); harness.processElement("alanchanchn", 10); Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn")); } // 测试KeyedProcessFunction 的 processElement @Test public void testKeyedProcessFunction() throws Exception { // public abstract class KeyedProcessFunction KeyedProcessFunction function = new KeyedProcessFunction () { @Override public void processElement(String value, KeyedProcessFunction .Context ctx, Collector out) throws Exception { out.collect("vx->" + value); } }; OneInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses .forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO); harness.processElement("alanchan", 10); Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1)); } // 测试CoProcessFunction 的 processElement1、processElement2 @Test public void testCoProcessFunction() throws Exception { // public abstract class CoProcessFunction CoProcessFunction function = new CoProcessFunction () { @Override public void processElement1(String value, CoProcessFunction .Context ctx, Collector out) throws Exception { String[] userStr = value.split(","); out.collect( new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3])); } @Override public void processElement2(User value, CoProcessFunction .Context ctx, Collector out) throws Exception { out.collect(value); } }; TwoInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses .forCoProcessFunction(function); harness.processElement2(new User(2, "alan", 19, "bj"), 100); harness.processElement1("1,alanchan,18,sh", 10); Assert.assertEquals(harness.extractOutputValues(), Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj"))); } // 测试KeyedCoProcessFunction 的 processElement1和processElement2 @Test public void testKeyedCoProcessFunction() throws Exception { // public abstract class KeyedCoProcessFunction KeyedCoProcessFunction function = new KeyedCoProcessFunction () { @Override public void processElement1(String value, KeyedCoProcessFunction .Context ctx, Collector out) throws Exception { String[] userStr = value.split(","); out.collect( new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3])); } @Override public void processElement2(User value, KeyedCoProcessFunction .Context ctx, Collector out) throws Exception { out.collect(value); } }; // public static // KeyedTwoInputStreamOperatorTestHarness // forKeyedCoProcessFunction( // KeyedCoProcessFunction function, // KeySelector keySelector1, // KeySelector keySelector2, // TypeInformation keyType) KeyedTwoInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses .forKeyedCoProcessFunction(function, new KeySelector () { @Override public String getKey(String value) throws Exception { return value.split(",")[3]; } }, new KeySelector () { @Override public String getKey(User value) throws Exception { return value.getCity(); } }, TypeInformation.of(String.class)); harness.processElement2(new User(2, "alan", 19, "bj"), 100); harness.processElement1("1,alanchan,18,sh", 10); Assert.assertEquals(harness.extractOutputValues(), Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj"))); } // 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement @Test public void testBroadcastOperator() throws Exception { // 定义广播 // 数据格式: // sh,上海 // bj,北京 // public class MapStateDescriptor MapStateDescriptor broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState", String.class, String.class); // public abstract class BroadcastProcessFunction // * @param The input type of the non-broadcast side. // * @param The input type of the broadcast side. // * @param The output type of the operator. BroadcastProcessFunction function = new BroadcastProcessFunction () { // 负责处理广播流的元素 @Override public void processBroadcastElement(String value, BroadcastProcessFunction .Context ctx, Collector out) throws Exception { System.out.println("收到广播数据:" + value); // 得到广播流的存储状态 ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]); } // 处理非广播流,关联维度 @Override public void processElement(User value, BroadcastProcessFunction .ReadOnlyContext ctx, Collector out) throws Exception { // 得到广播流的存储状态 ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastDesc); value.setCity(state.get(value.getCity())); out.collect(value); } }; BroadcastOperatorTestHarness harness = ProcessFunctionTestHarnesses .forBroadcastProcessFunction(function, broadcastDesc); harness.processBroadcastElement("sh,上海", 10); harness.processBroadcastElement("bj,北京", 20); harness.processElement(new User(2, "alan", 19, "bj"), 10); harness.processElement(new User(1, "alanchan", 18, "sh"), 30); Assert.assertEquals(harness.extractOutputValues(), Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京"))); } }
三、测试 Flink 作业
1、JUnit 规则 MiniClusterWithClientResource
Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.
要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。
org.apache.flink flink-test-utils1.17.2 test
让我们采用与前面几节相同的简单 MapFunction来做示例。
/* * @Author: alanchan * @LastEditors: alanchan * @Description: */ package com.win; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.junit.ClassRule; import org.junit.Test; public class TestExampleIntegrationDemo { static class AlanIncrementMapFunction implements MapFunction{ @Override public Long map(Long record) throws Exception { return record + 1; } } @ClassRule public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberSlotsPerTaskManager(2) .setNumberTaskManagers(1) .build()); @Test public void testIncrementPipeline() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // configure your test environment env.setParallelism(2); // values are collected in a static variable CollectSink.values.clear(); // create a stream of custom elements and apply transformations env.fromElements(1L, 21L, 22L) .map(new AlanIncrementMapFunction()) .addSink(new CollectSink()); // execute env.execute(); // verify your results assertTrue(CollectSink.values.containsAll(Arrays.asList(2L, 22L, 23L))); } // create a testing sink private static class CollectSink implements SinkFunction { // must be static public static final List values = Collections.synchronizedList(new ArrayList<>()); @Override public void invoke(Long value, SinkFunction.Context context) throws Exception { values.add(value); } } }
关于使用 MiniClusterWithClientResource 进行集成测试的几点备注:
·为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。
· 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。
· 如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。
· 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。
· 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。
· 如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。
以上,本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。
感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:
这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取