Flink算子通用状态应用测试样例
1. 获取Flink执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
2. 创建数据源,生成随机数据
DataStream
3. 按照ID和星期几进行分组
KeyedStream, String> keyedStream = source.keyBy(new KeySelector , String>() { @Override public String getKey(Map value) throws Exception { return value.get("ID") + LocalDate.now().getDayOfWeek(); } });
4. 处理函数,实现状态初始化和元素处理逻辑
SingleOutputStreamOperator> process = keyedStream.process(new KeyedProcessFunction , Map >() { private AggregatingState , Map > aggState; @Override public void open(Configuration parameters) throws Exception { // 配置状态的TTL StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 仅在创建和写入时清除,另一个读和写时清除 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不退回过期值 .build(); // 初始化状态 AggregatingStateDescriptor , Map , Map > aggRes = new AggregatingStateDescriptor<>("aggRes", new AggregateFunction , Map , Map >() { @Override public Map createAccumulator() { return new HashMap<>(); } @Override public Map add(Map in, Map acc) { String amt = acc.get("AMT"); if (amt == null) { acc.put("ID", in.get("ID")); acc.put("AMT", in.get("AMT")); } else { acc.put("AMT", Integer.valueOf(in.get("AMT")) + Integer.valueOf(amt) + ""); } return acc; } @Override public Map getResult(Map acc) { return acc; } @Override public Map merge(Map a, Map b) { return null; } }, TypeInformation.of(new TypeHint >() { })); aggRes.enableTimeToLive(ttlConfig); aggState = getRuntimeContext().getAggregatingState(aggRes); } @Override public void processElement(Map value, KeyedProcessFunction , Map >.Context ctx, Collector > out) throws Exception { aggState.add(value); out.collect(aggState.get()); } });
5. 打印聚合结果
process.map((MapFunction, Object>) value -> { System.out.println("聚合结果:" + value); return null; });
6. 执行作业
env.execute("Flink Common State Test");
7. 执行结果
------ 生产数据:{AMT=1, ID=2} 聚合结果:{AMT=1, ID=2} ------ 生产数据:{AMT=1, ID=3} 聚合结果:{AMT=1, ID=3} ------ 生产数据:{AMT=1, ID=3} 聚合结果:{AMT=2, ID=3} ------ 生产数据:{AMT=1, ID=1} 聚合结果:{AMT=1, ID=1} ------ 生产数据:{AMT=1, ID=1} 聚合结果:{AMT=2, ID=1} ------ 生产数据:{AMT=1, ID=1} 聚合结果:{AMT=3, ID=1} ...
这段代码实现了一个 Flink 作业,生成随机数据并对数据进行状态聚合处理。其中包括数据源生成、按键分区、状态初始化、元素聚合处理和结果输出。可以作为多场景下通用的实时数据处理模型。