Flink学习笔记
前言:今天是学习 flink 第三天啦,学习了高级 api 开发中11 中重要算子,查找了好多资料理解其中的原理,以及敲了好几个小时代码抓紧理解原理。
Tips:虽然学习进度有点慢,希望自己继续努力,不断猜想 api 原理,通过敲代码不断印证自己的想法,转码大数据之路一定会越来越好的!
二、Flink 流批一体 API 开发
2. Transfromation
2.1 Map
将 DataStream 中的每一个元素转化为另一个元素,类似于之前 wordcount 案例中 word—> (word,1)
案例:使用map操作,读取 apache.log 文件中的字符串数据转换成 ApacheLogEvent 对象
# 日志数据 83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.text.SimpleDateFormat; /** * @author lql * @time 2024-02-13 19:44:52 * @description TODO:使用map操作,读取apache.log文件中的字符串数据转换成ApacheLogEvent对象 */ public class MapDemo { public static void main(String[] args) throws Exception { /** * 获取ExecutionEnvironment运行环境 * 使用readTextFile读取数据构建数据源 * 创建一个ApacheLogEvent类 * 使用map操作执行转换 * 打印测试 */ //TODO 获取ExecutionEnvironment运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //TODO 使用readTextFile读取数据构建数据源 DataStreamlines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache2.log"); //TODO 创建一个ApacheLogEvent类 //TODO 使用map操作执行转换 /** * String:传入值类型 * ApacheEvent:返回值类型 */ SingleOutputStreamOperator apacheEventBean = lines.map(new MapFunction () { @Override public ApacheEvent map(String line) throws Exception { String[] elements = line.split(" "); String ip = elements[0]; int userId = Integer.parseInt(elements[1]); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); long timestamp = simpleDateFormat.parse(elements[2]).getTime(); String method = elements[3]; String path = elements[4]; return new ApacheEvent(ip, userId, timestamp, method, path); } }); //TODO 打印测试 apacheEventBean.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class ApacheEvent{ String ip; // 访问ip int userId; // 用户id long timestamp; // 访问时间戳 String method; // 访问方法 String path; // 访问路径 } }
# 打印数据 2> MapDemo.ApacheEvent(ip=10.0.0.1, userId=10003, timestamp=1431829613000, method=POST, path=/presentations/logstash-monitorama-2013/css/print/paper.css)
总结:
- 1- env.readTextFile 返回的类型是:DataStream,而不是 DataStreamSource 类型,不然会报错,这里用 var 快捷键需要注意!
- 2- 重写 map 方法,切割后列表形式,以脚标形式取值
- 3- Intger.parseInt 可以将字符串转化为整数类型
- 4- new 一个 SimpleDateFormat()进行日期格式化处理
- 5- simpleDateFormat.parse(字符串).getTime() 可以获取指定格式的日期
2.2 FlatMap
将 DataStream 中的每一个元素转化为 0……n 个元素,类似于 wordcount 案例中以空格切割单词
实例:读取 flatmap.log 文件中的数据
将数据: 张三,苹果手机,联想电脑,华为平板 李四,华为手机,苹果电脑,小米平板 转化为: 张三有苹果手机 张三有联想电脑 张三有华为平板 李四有…
package cn.itcast.day02.transformation; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @author lql * @time 2024-02-14 21:04:09 * @description TODO:读取 flatmap.log文件中的数据,以上数据为一条转换为三条 */ public class FlatMapDemo { public static void main(String[] args) throws Exception { /** * 开发步骤: * 构建批处理运行环境 * 构建本地集合数据源 * 使用flatMap将一条数据经过处理转换为三条数据 * 使用逗号分隔字段 * 分别构建三条数据 * 打印输出 */ // TODO 1: 构建 flink 流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // TODO 2: 获取本地数据源 DataStream
lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\flatmap.log"); // TODO 3: 使用flatMap将一条数据经过处理转换为三条数据 SingleOutputStreamOperator result = lines.flatMap(new FlatMapFunction () { @Override public void flatMap(String line, Collector collector) throws Exception { String[] elements = line.split(","); collector.collect(elements[0] + "有" + elements[1]); collector.collect(elements[0] + "有" + elements[2]); collector.collect(elements[0] + "有" + elements[3]); } }); result.print(); env.execute(); } } 结果:
8> 李四有华为手机 8> 李四有苹果电脑 8> 李四有小米平板 5> 张三有苹果手机 5> 张三有联想电脑 5> 张三有华为平板
总结:collect 可以多行书写
2.3 Filter
过滤出来符合条件的元素
实例:读取 apache.log 文件中的访问日志数据,过滤出来以下访问IP是 83.149.9.216 的访问日志。
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css 83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css 83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lql * @time 2024-02-14 21:20:30 * @description TODO:读取apache.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志。 */ public class FilterDemo { public static void main(String[] args) throws Exception { /** * 获取ExecutionEnvironment运行环境 * 使用fromCollection构建数据源 * 使用filter操作执行过滤 * 打印测试 */ //TODO 获取ExecutionEnvironment运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //TODO 使用fromCollection构建数据源 DataStream
lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log"); //TODO 使用filter操作执行过滤(66.249.73.135) SingleOutputStreamOperator result = lines.filter(new FilterFunction () { @Override public boolean filter(String line) throws Exception { return line.contains("83.149.9.216"); } }); result.print(); env.execute(); } } 结果:
2> 83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png 2> 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png 2> 83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js 2> 83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js 2> 83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js 2> 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
总结:contains 方法可以达到过滤效果
2.4 KeyBy
流处理中没有 groupBy,而是 keyBy
实例:读取本地数据源, 进行单词的计数
package cn.itcast.day02.transformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lql * @time 2024-02-14 21:29:52 * @description TODO:读取本地元组数据源, 进行单词的计数 */ public class KeyByDemo { public static void main(String[] args) throws Exception { // TODO 1: 初始化 Fink 环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // TODO 2: 读取本地数据源 DataStreamSource
> source = env.fromElements( Tuple2.of("篮球", 1), Tuple2.of("篮球", 2), Tuple2.of("篮球", 3), Tuple2.of("足球", 3), Tuple2.of("足球", 2), Tuple2.of("足球", 3) ); // 在流计算内,来一条算一条,就是每个组的数据,挨个进行计算,求和累加,所以结果中最后一个打印的数据才是最终的求和结果 SingleOutputStreamOperator > sum = source.keyBy(t -> t.f0).sum(1); // 如果不分组的话, sum的结果是 1+2+3+3+2+3 = 14 分组后是 篮球 6 足球 8 sum.print(); env.execute(); } } 结果:
4> (足球,3) 4> (足球,5) 4> (足球,8) 5> (篮球,1) 5> (篮球,3) 5> (篮球,6)
总结:
- 1- keyBy 是流式分组
- 2- keyBy () 可以填写 t -> f0, 也可以直接填 0
2.5 Reduce
可以对一个 dataset 或者一个 group 来进行聚合计算,最终聚合成一个元素
实例:读取 apache.log 日志,统计ip地址访问pv数量,使用 reduce 操作聚合成一个最终结果
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css 83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css 83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lql * @time 2024-02-14 21:43:10 * @description TODO: 读取apache.log日志,统计ip地址访问pv数量,使用 reduce 操作聚合成一个最终结果 */ public class ReduceDemo { public static void main(String[] args) throws Exception { /** * 获取 ExecutionEnvironment 运行环境 * 使用 readTextFile 构建数据源 * 使用 reduce 执行聚合操作 * 打印测试 */ //TODO 获取 ExecutionEnvironment 运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //TODO 使用 readTextFile 构建数据源 DataStream
lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log"); //TODO 使用 reduce 执行聚合操作 SingleOutputStreamOperator > ipAndOne = lines.map(new MapFunction >() { @Override public Tuple2 map(String line) throws Exception { String[] dataArray = line.split(" "); return Tuple2.of(dataArray[0], 1); } }); SingleOutputStreamOperator > result = ipAndOne.keyBy(0).reduce(new ReduceFunction >() { @Override public Tuple2 reduce(Tuple2 tuple1, Tuple2 tuple2) throws Exception { return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1); } }); result.print(); env.execute(); } } 结果:
3> (74.218.234.48,3) 3> (74.218.234.48,4) 3> (74.218.234.48,5) 3> (74.218.234.48,6)
总结:
- 1- reduce 类似于 sum 操作
- 2- 重写方法注意返回值写法:return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1)
2.6 minBy 和 maxBy
获取指定字段的最大值、最小值
2.6.1 场景一:
实例:Tuple2 情况
package cn.itcast.day02.transformation; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lql * @time 2024-02-14 21:57:18 * @description TODO:分组后,求组内最值 */ public class MinMaxByDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream
lines = env.socketTextStream("node1", 9999); SingleOutputStreamOperator > wordAndCount = lines.map(new MapFunction >() { @Override public Tuple2 map(String line) throws Exception { String[] fields = line.split(","); String word = fields[0]; int count = Integer.parseInt(fields[1]); return Tuple2.of(word, count); } }); KeyedStream , String> keyd = wordAndCount.keyBy(t -> t.f0); keyd.minBy(1).print("最小数据>>>"); keyd.maxBy(1).print("最大数据>>>"); env.execute(); } } 结果:
最大数据>>>:1> (spark,2) 最小数据>>>:1> (spark,2) 最小数据>>>:1> (spark,2) 最大数据>>>:1> (spark,5) 最大数据>>>:8> (hadoop,7) 最大数据>>>:8> (hadoop,7) 最小数据>>>:8> (hadoop,3) 最小数据>>>:8> (hadoop,3)
2.6.2 场景二
实例:Tuple3 情况
package cn.itcast.day02.transformation; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lql * @time 2024-02-14 21:57:18 * @description TODO:分组后,求组内最值 */ public class MinMaxByDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //调用Source创建DataStream //辽宁,沈阳,1000 //北京,朝阳,8000 //辽宁,朝阳,1000 //辽宁,朝阳,1000 //辽宁,沈阳,2000 //北京,朝阳,1000 //辽宁,大连,3000 //辽宁,铁岭,500 DataStream
lines = env.socketTextStream("node1", 9999); SingleOutputStreamOperator > pcm = lines.map(new MapFunction >() { @Override public Tuple3 map(String value) throws Exception { String[] fields = value.split(","); String province = fields[0]; String city = fields[1]; double money = Double.parseDouble(fields[2]); return Tuple3.of(province, city, money); } }); KeyedStream , String> keyed = pcm.keyBy(t -> t.f0); // considerTimestamps 设置为 false,则 Flink 在比较时不会考虑元素的时间戳,而只会根据指定的字段 SingleOutputStreamOperator > res = keyed.minBy(2, false); res.print(); env.execute(); } } 结果:
5> (辽宁,沈阳,1000.0) 4> (北京,朝阳,8000.0) 5> (辽宁,朝阳,1000.0) 5> (辽宁,朝阳,1000.0) 5> (辽宁,朝阳,1000.0) 4> (北京,朝阳,1000.0) 5> (辽宁,朝阳,1000.0) 5> (辽宁,铁岭,500.0)
2.7 min max 和 minBy maxBy 的区别
package cn.itcast.day02.transformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.awt.event.TextEvent; /** * @author lql * @time 2024-02-14 22:52:36 * @description TODO */ public class MinVSMinByDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream
> source = env.fromElements( Tuple3.of(1, 3, 2), Tuple3.of(1, 1, 2), Tuple3.of(1, 2, 3), Tuple3.of(1, 111, 1), Tuple3.of(1, 1, 1), Tuple3.of(1, 2, 0), Tuple3.of(1, 33, 2) ); source.keyBy(t -> t.f0).min(2).print("min>>>"); source.keyBy(t->t.f0).minBy(2).printToErr("minBy>>>"); env.execute(); } } 结果:
minBy>>>:6> (1,3,2) minBy>>>:6> (1,3,2) minBy>>>:6> (1,3,2) minBy>>>:6> (1,111,1) minBy>>>:6> (1,111,1) minBy>>>:6> (1,2,0) minBy>>>:6> (1,2,0) min>>>:6> (1,3,2) min>>>:6> (1,3,2) min>>>:6> (1,3,2) min>>>:6> (1,3,1) min>>>:6> (1,3,1) min>>>:6> (1,3,0) min>>>:6> (1,3,0)
总结:
- 1- minBy 和 maxBy 会返回整个对象数据(包括最小值所在的前缀)
- 2- min 和 max 只会返回最小值以及第一次最小值的前缀
2.8 Union
将多个DataSet合并成一个DataSet,union合并的DataSet的类型必须是一致的
package cn.itcast.day02.transformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lql * @time 2024-02-14 23:06:20 * @description TODO: * * 使用union实现 * * 将以下数据进行取并集操作 * * 数据集1 * * "hadoop", "hive","flume" * * 数据集2 * * "hadoop","hive","spark" * * * * 注意: * * 1:合并后的数据不会自动去重 * * 2:要求数据类型必须一致 * */ public class UnionDemo { public static void main(String[] args) throws Exception { /** * 实现步骤: * 1)初始化flink的流处理的运行环境 * 2)加载/创建数据源 * 3)处理数据 * 4)打印输出 * 5)递交执行作业 */ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource
ds1 = env.fromElements("hadoop", "hive", "flume"); DataStreamSource ds2 = env.fromElements("hadoop","hive","spark"); DataStream result = ds1.union(ds2); result.printToErr(); env.execute(); } } 结果:
2> hive 6> flume 3> spark 1> hadoop 4> hadoop 5> hive
总结:
- 1- Uinon 合并 dataset, 数据集类型必须一致
- 2- Union 合并不会去除
- 3- Union 合并出来的数据集是乱序的
2.9 Connect
DataStream,DataStream → ConnectedStreams,流相互独立, 作为对比Union后是真的变成一个流了
package cn.itcast.day02.transformation; /** * @author lql * @time 2024-02-14 23:10:14 * @description TODO */ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.concurrent.TimeUnit; /** * 读取两个数据流(生成两个不同类型的数据流),使用connect进行合并输出 * 和union类似,但是connect只能连接两个流,两个流之间的数据类型可以不同,对两个流的数据可以分别应用不同的处理逻辑 */ public class ConnectDemo { public static void main(String[] args) throws Exception { /** * 实现步骤: * 1)初始化flink流处理的运行环境 * 2)构建两个不同类型数据的数据流 * 3)对连接后的流数据进行业务处理 * 4)打印输出 * 5)启动作业 */ //TODO 1)初始化flink流处理的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //TODO 2)构建两个不同类型数据的数据流 DataStream
longDataStreamSource = env.addSource(new MyNoParallelSource()); DataStream longDataStreamSource2 = env.addSource(new MyNoParallelSource()); //TODO 3)对连接后的流数据进行业务处理 SingleOutputStreamOperator strDataStreamSource = longDataStreamSource2.map(new MapFunction () { @Override public String map(Long aLong) throws Exception { return "str_" + aLong; } }); ConnectedStreams connectedStreams = longDataStreamSource.connect(strDataStreamSource); //对连接后的流应用不同的业务逻辑 SingleOutputStreamOperator 结果:
3> 1 5> str_1 4> 2 6> str_2 5> 3 7> str_3
总结:
- Connect 两个流可以类型不一样
2.10 split、select 和 Side Outputs
Split 就是将一个 DataStream 分成两个或者多个 DataStream
Select 就是获取分流后对应的数据
Tips:
- 简单认为就是, Split会给数据打上标记,然后通过Select, 选择标记来划分出不同的Stream,效果类似KeyBy分流,但是比KeyBy更自由些,可以自由打标记并进行分流。
- Side Outputs:split 过期啦,可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中
package cn.itcast.day02.transformation; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; /** * @author lql * @time 2024-02-14 23:25:38 * @description TODO */ public class StreamSplitDemo { public static void main(String[] args) throws Exception { //TODO 0.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 当设置为 AUTOMATIC 时,Flink 会自动选择最佳的并行度来执行作业。 //TODO 1.source DataStreamSource
ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); //TODO 2.transformation //需求:对流中的数据按照奇数和偶数拆分并选择 OutputTag oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class)); OutputTag evenTag = new OutputTag("偶数", TypeInformation.of(Integer.class)); SingleOutputStreamOperator result = ds.process(new ProcessFunction () { @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { //out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag if (value % 2 == 0) { ctx.output(evenTag, value); } else { ctx.output(oddTag, value); } } }); DataStream oddResult = result.getSideOutput(oddTag); DataStream evenResult = result.getSideOutput(evenTag); //TODO 3.sink System.out.println(oddTag);//OutputTag(Integer, 奇数) System.out.println(evenTag);//OutputTag(Integer, 偶数) oddResult.print("奇数:"); evenResult.print("偶数:"); //TODO 4.execute env.execute(); } } 结果:
OutputTag(Integer, 奇数) OutputTag(Integer, 偶数) 奇数::3> 1 偶数::8> 6 偶数::6> 4 偶数::4> 2 奇数::5> 3 奇数::1> 7 奇数::7> 5 偶数::2> 8 偶数::4> 10 奇数::3> 9
总结:
- 1- OutputTag 对象用于定义输出类型
- 2- process 可以分流
- 3- 引流数据使用:getSideOutput 方法
2.11 Iterate
在流中创建“反馈(feedback)”循环,通过将一个算子的输出重定向到某个先前的算子。
迭代的数据流向:DataStream → IterativeStream → DataStream
package cn.itcast.day02.transformation; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.IterativeStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lql * @time 2024-02-14 23:34:23 * @description TODO:Iterate迭代流式计算 */ public class IterateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //10 DataStreamSource
strs = env.socketTextStream("node1", 9999); DataStream numbers = strs.map(Long::parseLong); //调用iterate方法 DataStream -> IterativeStream //对Nums进行迭代(不停的输入int的数字) IterativeStream iteration = numbers.iterate(); //IterativeStream -> DataStream //对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑 DataStream iterationBody = iteration.map(new MapFunction () { @Override public Long map(Long value) throws Exception { System.out.println("iterate input =>" + value); return value -= 2; } }); //只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑 DataStream feedback = iterationBody.filter(new FilterFunction () { @Override public boolean filter(Long value) throws Exception { return value > 0; } }); //传入迭代的条件 iteration.closeWith(feedback); //不满足迭代条件的最后要输出 DataStream output = iterationBody.filter(new FilterFunction () { @Override public boolean filter(Long value) throws Exception { return value <= 0; } }); //数据结果 output.printToErr("output value:"); env.execute(); } } 结果:
iterate input =>7 iterate input =>5 iterate input =>3 iterate input =>1 output value::2> -1 iterate input =>6 iterate input =>4 output value::3> 0 iterate input =>2
总结:
- 1- 更新模型,更新参数较为常见
- 2- 算子迭代,需要理解应用
ction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
-
@author lql
-
@time 2024-02-14 23:34:23
-
@description TODO:Iterate迭代流式计算
*/
public class IterateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //10 DataStreamSource
strs = env.socketTextStream("node1", 9999); DataStream numbers = strs.map(Long::parseLong); //调用iterate方法 DataStream -> IterativeStream //对Nums进行迭代(不停的输入int的数字) IterativeStream iteration = numbers.iterate(); //IterativeStream -> DataStream //对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑 DataStream iterationBody = iteration.map(new MapFunction () { @Override public Long map(Long value) throws Exception { System.out.println("iterate input =>" + value); return value -= 2; } }); //只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑 DataStream feedback = iterationBody.filter(new FilterFunction () { @Override public boolean filter(Long value) throws Exception { return value > 0; } }); //传入迭代的条件 iteration.closeWith(feedback); //不满足迭代条件的最后要输出 DataStream output = iterationBody.filter(new FilterFunction () { @Override public boolean filter(Long value) throws Exception { return value <= 0; } }); //数据结果 output.printToErr("output value:"); env.execute(); }
}
结果: ```java iterate input =>7 iterate input =>5 iterate input =>3 iterate input =>1 output value::2> -1 iterate input =>6 iterate input =>4 output value::3> 0 iterate input =>2
总结:
- 1- 更新模型,更新参数较为常见
- 2- 算子迭代,需要理解应用
-
- Connect 两个流可以类型不一样