flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作

Flink学习笔记

前言:今天是学习 flink 第三天啦,学习了高级 api 开发中11 中重要算子,查找了好多资料理解其中的原理,以及敲了好几个小时代码抓紧理解原理。

Tips:虽然学习进度有点慢,希望自己继续努力,不断猜想 api 原理,通过敲代码不断印证自己的想法,转码大数据之路一定会越来越好的!

二、Flink 流批一体 API 开发

2. Transfromation

2.1 Map

将 DataStream 中的每一个元素转化为另一个元素,类似于之前 wordcount 案例中 word—> (word,1)

案例:使用map操作,读取 apache.log 文件中的字符串数据转换成 ApacheLogEvent 对象

# 日志数据
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.text.SimpleDateFormat;
/**
 * @author lql
 * @time 2024-02-13 19:44:52
 * @description TODO:使用map操作,读取apache.log文件中的字符串数据转换成ApacheLogEvent对象
 */
public class MapDemo { public static void main(String[] args) throws Exception { /**
         * 获取ExecutionEnvironment运行环境
         * 使用readTextFile读取数据构建数据源
         * 创建一个ApacheLogEvent类
         * 使用map操作执行转换
         * 打印测试
         */
        //TODO 获取ExecutionEnvironment运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //TODO 使用readTextFile读取数据构建数据源
        DataStream lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache2.log");
        //TODO 创建一个ApacheLogEvent类
        //TODO 使用map操作执行转换
        /**
         * String:传入值类型
         * ApacheEvent:返回值类型
         */
        SingleOutputStreamOperator apacheEventBean = lines.map(new MapFunction() { @Override
            public ApacheEvent map(String line) throws Exception { String[] elements = line.split(" ");
                String ip = elements[0];
                int userId = Integer.parseInt(elements[1]);
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                long timestamp = simpleDateFormat.parse(elements[2]).getTime();
                String method = elements[3];
                String path = elements[4];
                return new ApacheEvent(ip, userId, timestamp, method, path);
            }
        });
        //TODO 打印测试
        apacheEventBean.print();
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class ApacheEvent{ String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径
    }
}
# 打印数据
2> MapDemo.ApacheEvent(ip=10.0.0.1, userId=10003, timestamp=1431829613000, method=POST, path=/presentations/logstash-monitorama-2013/css/print/paper.css)

总结:

  • 1- env.readTextFile 返回的类型是:DataStream,而不是 DataStreamSource 类型,不然会报错,这里用 var 快捷键需要注意!
  • 2- 重写 map 方法,切割后列表形式,以脚标形式取值
  • 3- Intger.parseInt 可以将字符串转化为整数类型
  • 4- new 一个 SimpleDateFormat()进行日期格式化处理
  • 5- simpleDateFormat.parse(字符串).getTime() 可以获取指定格式的日期
    2.2 FlatMap

    将 DataStream 中的每一个元素转化为 0……n 个元素,类似于 wordcount 案例中以空格切割单词

    实例:读取 flatmap.log 文件中的数据

    将数据:
    张三,苹果手机,联想电脑,华为平板
    李四,华为手机,苹果电脑,小米平板
    转化为:
    张三有苹果手机
    张三有联想电脑
    张三有华为平板
    李四有…
    
    package cn.itcast.day02.transformation;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    /**
     * @author lql
     * @time 2024-02-14 21:04:09
     * @description TODO:读取 flatmap.log文件中的数据,以上数据为一条转换为三条
     */
    public class FlatMapDemo { public static void main(String[] args) throws Exception { /**
             * 开发步骤:
             * 构建批处理运行环境
             * 构建本地集合数据源
             * 使用flatMap将一条数据经过处理转换为三条数据
             * 使用逗号分隔字段
             * 分别构建三条数据
             * 打印输出
             */
            // TODO 1: 构建 flink 流处理环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // TODO 2: 获取本地数据源
            DataStream lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\flatmap.log");
            // TODO 3: 使用flatMap将一条数据经过处理转换为三条数据
            SingleOutputStreamOperator result = lines.flatMap(new FlatMapFunction() { @Override
                public void flatMap(String line, Collector collector) throws Exception { String[] elements = line.split(",");
                    collector.collect(elements[0] + "有" + elements[1]);
                    collector.collect(elements[0] + "有" + elements[2]);
                    collector.collect(elements[0] + "有" + elements[3]);
                }
            });
            result.print();
            env.execute();
        }
    }
    

    结果:

    8> 李四有华为手机
    8> 李四有苹果电脑
    8> 李四有小米平板
    5> 张三有苹果手机
    5> 张三有联想电脑
    5> 张三有华为平板
    

    总结:collect 可以多行书写

    2.3 Filter

    过滤出来符合条件的元素

    实例:读取 apache.log 文件中的访问日志数据,过滤出来以下访问IP是 83.149.9.216 的访问日志。

    83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
    83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
    83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
    
    package cn.itcast.day02.transformation;
    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    /**
     * @author lql
     * @time 2024-02-14 21:20:30
     * @description TODO:读取apache.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志。
     */
    public class FilterDemo { public static void main(String[] args) throws Exception { /**
             * 获取ExecutionEnvironment运行环境
             * 使用fromCollection构建数据源
             * 使用filter操作执行过滤
             * 打印测试
             */
            //TODO 获取ExecutionEnvironment运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //TODO 使用fromCollection构建数据源
            DataStream lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");
            //TODO 使用filter操作执行过滤(66.249.73.135)
            SingleOutputStreamOperator result = lines.filter(new FilterFunction() { @Override
                public boolean filter(String line) throws Exception { return line.contains("83.149.9.216");
                }
            });
            result.print();
            env.execute();
        }
    }
    

    结果:

    2> 83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
    2> 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
    2> 83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
    2> 83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
    2> 83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
    2> 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
    

    总结:contains 方法可以达到过滤效果

    2.4 KeyBy

    流处理中没有 groupBy,而是 keyBy

    实例:读取本地数据源, 进行单词的计数

    package cn.itcast.day02.transformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    /**
     * @author lql
     * @time 2024-02-14 21:29:52
     * @description TODO:读取本地元组数据源, 进行单词的计数
     */
    public class KeyByDemo { public static void main(String[] args) throws Exception { // TODO 1: 初始化 Fink 环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // TODO 2: 读取本地数据源
            DataStreamSource> source  = env.fromElements(
                    Tuple2.of("篮球", 1),
                    Tuple2.of("篮球", 2),
                    Tuple2.of("篮球", 3),
                    Tuple2.of("足球", 3),
                    Tuple2.of("足球", 2),
                    Tuple2.of("足球", 3)
            );
            // 在流计算内,来一条算一条,就是每个组的数据,挨个进行计算,求和累加,所以结果中最后一个打印的数据才是最终的求和结果
            SingleOutputStreamOperator> sum = source.keyBy(t -> t.f0).sum(1);
            // 如果不分组的话, sum的结果是 1+2+3+3+2+3 = 14 分组后是 篮球 6  足球 8
            sum.print();
            env.execute();
        }
    }
    

    结果:

    4> (足球,3)
    4> (足球,5)
    4> (足球,8)
    5> (篮球,1)
    5> (篮球,3)
    5> (篮球,6)
    

    总结:

    • 1- keyBy 是流式分组
    • 2- keyBy () 可以填写 t -> f0, 也可以直接填 0
      2.5 Reduce

      可以对一个 dataset 或者一个 group 来进行聚合计算,最终聚合成一个元素

      实例:读取 apache.log 日志,统计ip地址访问pv数量,使用 reduce 操作聚合成一个最终结果

      83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
      83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
      83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
      
      package cn.itcast.day02.transformation;
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.common.functions.ReduceFunction;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      /**
       * @author lql
       * @time 2024-02-14 21:43:10
       * @description TODO: 读取apache.log日志,统计ip地址访问pv数量,使用 reduce 操作聚合成一个最终结果
       */
      public class ReduceDemo { public static void main(String[] args) throws Exception { /**
               * 获取 ExecutionEnvironment 运行环境
               * 使用 readTextFile 构建数据源
               * 使用 reduce 执行聚合操作
               * 打印测试
               */
              //TODO 获取 ExecutionEnvironment 运行环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              //TODO 使用 readTextFile 构建数据源
              DataStream lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");
              //TODO 使用 reduce 执行聚合操作
              SingleOutputStreamOperator> ipAndOne  = lines.map(new MapFunction>() { @Override
                  public Tuple2 map(String line) throws Exception { String[] dataArray = line.split(" ");
                      return Tuple2.of(dataArray[0], 1);
                  }
              });
              SingleOutputStreamOperator> result = ipAndOne.keyBy(0).reduce(new ReduceFunction>() { @Override
                  public Tuple2 reduce(Tuple2 tuple1, Tuple2 tuple2) throws Exception { return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1);
                  }
              });
              result.print();
              env.execute();
          }
      }
      

      结果:

      3> (74.218.234.48,3)
      3> (74.218.234.48,4)
      3> (74.218.234.48,5)
      3> (74.218.234.48,6)
      

      总结:

      • 1- reduce 类似于 sum 操作
      • 2- 重写方法注意返回值写法:return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1)
        2.6 minBy 和 maxBy

        获取指定字段的最大值、最小值

        2.6.1 场景一:

        实例:Tuple2 情况

        package cn.itcast.day02.transformation;
        import org.apache.flink.api.common.functions.MapFunction;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.KeyedStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        /**
         * @author lql
         * @time 2024-02-14 21:57:18
         * @description TODO:分组后,求组内最值
         */
        public class MinMaxByDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                DataStream lines = env.socketTextStream("node1", 9999);
                SingleOutputStreamOperator> wordAndCount = lines.map(new MapFunction>() { @Override
                    public Tuple2 map(String line) throws Exception { String[] fields = line.split(",");
                        String word = fields[0];
                        int count = Integer.parseInt(fields[1]);
                        return Tuple2.of(word, count);
                    }
                });
                KeyedStream, String> keyd = wordAndCount.keyBy(t -> t.f0);
                keyd.minBy(1).print("最小数据>>>");
                keyd.maxBy(1).print("最大数据>>>");
                env.execute();
            }
        }
        

        结果:

        最大数据>>>:1> (spark,2)
        最小数据>>>:1> (spark,2)
        最小数据>>>:1> (spark,2)
        最大数据>>>:1> (spark,5)
        最大数据>>>:8> (hadoop,7)
        最大数据>>>:8> (hadoop,7)
        最小数据>>>:8> (hadoop,3)
        最小数据>>>:8> (hadoop,3)
        
        2.6.2 场景二

        实例:Tuple3 情况

        package cn.itcast.day02.transformation;
        import org.apache.flink.api.common.functions.MapFunction;
        import org.apache.flink.api.java.tuple.Tuple3;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.KeyedStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        /**
         * @author lql
         * @time 2024-02-14 21:57:18
         * @description TODO:分组后,求组内最值
         */
        public class MinMaxByDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                //调用Source创建DataStream
                //辽宁,沈阳,1000
                //北京,朝阳,8000
                //辽宁,朝阳,1000
                //辽宁,朝阳,1000
                //辽宁,沈阳,2000
                //北京,朝阳,1000
                //辽宁,大连,3000
                //辽宁,铁岭,500
                DataStream lines = env.socketTextStream("node1", 9999);
                SingleOutputStreamOperator> pcm = lines.map(new MapFunction>() { @Override
                    public Tuple3 map(String value) throws Exception { String[] fields = value.split(",");
                        String province = fields[0];
                        String city = fields[1];
                        double money = Double.parseDouble(fields[2]);
                        return Tuple3.of(province, city, money);
                    }
                });
                KeyedStream, String> keyed = pcm.keyBy(t -> t.f0);
                // considerTimestamps 设置为 false,则 Flink 在比较时不会考虑元素的时间戳,而只会根据指定的字段
                SingleOutputStreamOperator> res = keyed.minBy(2, false);
                res.print();
                env.execute();
            }
        }
        

        结果:

        5> (辽宁,沈阳,1000.0)
        4> (北京,朝阳,8000.0)
        5> (辽宁,朝阳,1000.0)
        5> (辽宁,朝阳,1000.0)
        5> (辽宁,朝阳,1000.0)
        4> (北京,朝阳,1000.0)
        5> (辽宁,朝阳,1000.0)
        5> (辽宁,铁岭,500.0)
        
        2.7 min max 和 minBy maxBy 的区别
        package cn.itcast.day02.transformation;
        import org.apache.flink.api.java.tuple.Tuple3;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import java.awt.event.TextEvent;
        /**
         * @author lql
         * @time 2024-02-14 22:52:36
         * @description TODO
         */
        public class MinVSMinByDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                DataStream> source = env.fromElements(
                        Tuple3.of(1, 3, 2),
                        Tuple3.of(1, 1, 2),
                        Tuple3.of(1, 2, 3),
                        Tuple3.of(1, 111, 1),
                        Tuple3.of(1, 1, 1),
                        Tuple3.of(1, 2, 0),
                        Tuple3.of(1, 33, 2)
                );
                source.keyBy(t -> t.f0).min(2).print("min>>>");
                source.keyBy(t->t.f0).minBy(2).printToErr("minBy>>>");
                env.execute();
            }
        }
        

        结果:

        minBy>>>:6> (1,3,2)
        minBy>>>:6> (1,3,2)
        minBy>>>:6> (1,3,2)
        minBy>>>:6> (1,111,1)
        minBy>>>:6> (1,111,1)
        minBy>>>:6> (1,2,0)
        minBy>>>:6> (1,2,0)
        min>>>:6> (1,3,2)
        min>>>:6> (1,3,2)
        min>>>:6> (1,3,2)
        min>>>:6> (1,3,1)
        min>>>:6> (1,3,1)
        min>>>:6> (1,3,0)
        min>>>:6> (1,3,0)
        

        总结:

        • 1- minBy 和 maxBy 会返回整个对象数据(包括最小值所在的前缀
        • 2- min 和 max 只会返回最小值以及第一次最小值的前缀
          2.8 Union

          将多个DataSet合并成一个DataSet,union合并的DataSet的类型必须是一致的

          package cn.itcast.day02.transformation;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.datastream.DataStreamSource;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          /**
           * @author lql
           * @time 2024-02-14 23:06:20
           * @description TODO:
           *  * 使用union实现
           *  * 将以下数据进行取并集操作
           *  * 数据集1
           *  * "hadoop", "hive","flume"
           *  * 数据集2
           *  * "hadoop","hive","spark"
           *  *
           *  * 注意:
           *  * 1:合并后的数据不会自动去重
           *  * 2:要求数据类型必须一致
           *  */
          public class UnionDemo { public static void main(String[] args) throws Exception { /**
                   * 实现步骤:
                   * 1)初始化flink的流处理的运行环境
                   * 2)加载/创建数据源
                   * 3)处理数据
                   * 4)打印输出
                   * 5)递交执行作业
                   */
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  DataStreamSource ds1 = env.fromElements("hadoop", "hive", "flume");
                  DataStreamSource ds2 = env.fromElements("hadoop","hive","spark");
                  DataStream result = ds1.union(ds2);
                  result.printToErr();
                  env.execute();
              }
          }
          

          结果:

          2> hive
          6> flume
          3> spark
          1> hadoop
          4> hadoop
          5> hive
          

          总结:

          • 1- Uinon 合并 dataset, 数据集类型必须一致
          • 2- Union 合并不会去除
          • 3- Union 合并出来的数据集是乱序的
            2.9 Connect

            DataStream,DataStream → ConnectedStreams,流相互独立, 作为对比Union后是真的变成一个流了

            package cn.itcast.day02.transformation;
            /**
             * @author lql
             * @time 2024-02-14 23:10:14
             * @description TODO
             */
            import org.apache.flink.api.common.functions.MapFunction;
            import org.apache.flink.streaming.api.datastream.ConnectedStreams;
            import org.apache.flink.streaming.api.datastream.DataStream;
            import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import org.apache.flink.streaming.api.functions.co.CoMapFunction;
            import org.apache.flink.streaming.api.functions.source.SourceFunction;
            import java.util.concurrent.TimeUnit;
            /**
             * 读取两个数据流(生成两个不同类型的数据流),使用connect进行合并输出
             * 和union类似,但是connect只能连接两个流,两个流之间的数据类型可以不同,对两个流的数据可以分别应用不同的处理逻辑
             */
            public class ConnectDemo { public static void main(String[] args) throws Exception { /**
                     * 实现步骤:
                     * 1)初始化flink流处理的运行环境
                     * 2)构建两个不同类型数据的数据流
                     * 3)对连接后的流数据进行业务处理
                     * 4)打印输出
                     * 5)启动作业
                     */
                    //TODO 1)初始化flink流处理的运行环境
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    //TODO 2)构建两个不同类型数据的数据流
                    DataStream longDataStreamSource = env.addSource(new MyNoParallelSource());
                    DataStream longDataStreamSource2 = env.addSource(new MyNoParallelSource());
                    //TODO 3)对连接后的流数据进行业务处理
                    SingleOutputStreamOperator strDataStreamSource = longDataStreamSource2.map(new MapFunction() { @Override
                        public String map(Long aLong) throws Exception { return "str_" + aLong;
                        }
                    });
                    ConnectedStreams connectedStreams = longDataStreamSource.connect(strDataStreamSource);
                    //对连接后的流应用不同的业务逻辑
                    SingleOutputStreamOperator result = connectedStreams.map(new CoMapFunction() { @Override
                        public Object map1(Long value) throws Exception { return value;
                        }
                        @Override
                        public Object map2(String value) throws Exception { return value;
                        }
                    });
                    //TODO 4)打印输出
                    result.print();
                    //TODO 5)启动作业
                    env.execute();
                }
                public static class MyNoParallelSource implements SourceFunction { //定义一个变量,是否循环生成数据
                    private boolean isRunning = true;
                    private Long count = 0L;
                    /**
                     * 这是主要的方法,启动一个数据源
                     * 实现数据的生成操作
                     * @param ctx
                     * @throws Exception
                     */
                    @Override
                    public void run(SourceContext ctx) throws Exception { //不断生成订单数据
                        while (isRunning){ count+=1;
                            //收集数据返回
                            ctx.collect(count);
                            //每隔一秒钟生成一条订单数据
                            TimeUnit.SECONDS.sleep(1);
                        }
                    }
                    /**
                     * 取消数据的生成操作
                     */
                    @Override
                    public void cancel() { isRunning = false;
                    }
                }
            }
             

            结果:

            3> 1
            5> str_1
            4> 2
            6> str_2
            5> 3
            7> str_3
            

            总结:

            • Connect 两个流可以类型不一样
              2.10 split、select 和 Side Outputs

              Split 就是将一个 DataStream 分成两个或者多个 DataStream

              Select 就是获取分流后对应的数据

              Tips:

              • 简单认为就是, Split会给数据打上标记,然后通过Select, 选择标记来划分出不同的Stream,效果类似KeyBy分流,但是比KeyBy更自由些,可以自由打标记并进行分流。
              • Side Outputs:split 过期啦,可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中
                package cn.itcast.day02.transformation;
                import org.apache.flink.api.common.RuntimeExecutionMode;
                import org.apache.flink.api.common.typeinfo.TypeInformation;
                import org.apache.flink.streaming.api.datastream.DataStream;
                import org.apache.flink.streaming.api.datastream.DataStreamSource;
                import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import org.apache.flink.streaming.api.functions.ProcessFunction;
                import org.apache.flink.util.Collector;
                import org.apache.flink.util.OutputTag;
                /**
                 * @author lql
                 * @time 2024-02-14 23:25:38
                 * @description TODO
                 */
                public class StreamSplitDemo { public static void main(String[] args) throws Exception { //TODO 0.env
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
                        // 当设置为 AUTOMATIC 时,Flink 会自动选择最佳的并行度来执行作业。
                        //TODO 1.source
                        DataStreamSource ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
                        //TODO 2.transformation
                        //需求:对流中的数据按照奇数和偶数拆分并选择
                        OutputTag oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class));
                        OutputTag evenTag = new OutputTag("偶数", TypeInformation.of(Integer.class));
                        SingleOutputStreamOperator result = ds.process(new ProcessFunction() { @Override
                            public void processElement(Integer value, Context ctx, Collector out) throws Exception { //out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
                                if (value % 2 == 0) { ctx.output(evenTag, value);
                                } else { ctx.output(oddTag, value);
                                }
                            }
                        });
                        DataStream oddResult = result.getSideOutput(oddTag);
                        DataStream evenResult = result.getSideOutput(evenTag);
                        //TODO 3.sink
                        System.out.println(oddTag);//OutputTag(Integer, 奇数)
                        System.out.println(evenTag);//OutputTag(Integer, 偶数)
                        oddResult.print("奇数:");
                        evenResult.print("偶数:");
                        //TODO 4.execute
                        env.execute();
                    }
                }
                

                结果:

                OutputTag(Integer, 奇数)
                OutputTag(Integer, 偶数)
                奇数::3> 1
                偶数::8> 6
                偶数::6> 4
                偶数::4> 2
                奇数::5> 3
                奇数::1> 7
                奇数::7> 5
                偶数::2> 8
                偶数::4> 10
                奇数::3> 9
                

                总结:

                • 1- OutputTag 对象用于定义输出类型
                • 2- process 可以分流
                • 3- 引流数据使用:getSideOutput 方法
                  2.11 Iterate

                  在流中创建“反馈(feedback)”循环,通过将一个算子的输出重定向到某个先前的算子。

                  迭代的数据流向:DataStream → IterativeStream → DataStream

                  package cn.itcast.day02.transformation;
                  import org.apache.flink.api.common.functions.FilterFunction;
                  import org.apache.flink.api.common.functions.MapFunction;
                  import org.apache.flink.streaming.api.datastream.DataStream;
                  import org.apache.flink.streaming.api.datastream.DataStreamSource;
                  import org.apache.flink.streaming.api.datastream.IterativeStream;
                  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                  /**
                   * @author lql
                   * @time 2024-02-14 23:34:23
                   * @description TODO:Iterate迭代流式计算
                   */
                  public class IterateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                          //10
                          DataStreamSource strs = env.socketTextStream("node1", 9999);
                          DataStream numbers = strs.map(Long::parseLong);
                          //调用iterate方法 DataStream -> IterativeStream
                          //对Nums进行迭代(不停的输入int的数字)
                          IterativeStream iteration = numbers.iterate();
                          //IterativeStream -> DataStream
                          //对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑
                          DataStream iterationBody = iteration.map(new MapFunction() { @Override
                              public Long map(Long value) throws Exception { System.out.println("iterate input =>" + value);
                                  return value -= 2;
                              }
                          });
                          //只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑
                          DataStream feedback = iterationBody.filter(new FilterFunction() { @Override
                              public boolean filter(Long value) throws Exception { return value > 0;
                              }
                          });
                          //传入迭代的条件
                          iteration.closeWith(feedback);
                          //不满足迭代条件的最后要输出
                          DataStream output = iterationBody.filter(new FilterFunction() { @Override
                              public boolean filter(Long value) throws Exception { return value <= 0;
                              }
                          });
                          //数据结果
                          output.printToErr("output value:");
                          env.execute();
                      }
                  }
                  

                  结果:

                  iterate input =>7
                  iterate input =>5
                  iterate input =>3
                  iterate input =>1
                  output value::2> -1
                  iterate input =>6
                  iterate input =>4
                  output value::3> 0
                  iterate input =>2
                  

                  总结:

                  • 1- 更新模型,更新参数较为常见
                  • 2- 算子迭代,需要理解应用

                    ction;

                    import org.apache.flink.streaming.api.datastream.DataStream;

                    import org.apache.flink.streaming.api.datastream.DataStreamSource;

                    import org.apache.flink.streaming.api.datastream.IterativeStream;

                    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

                    /**

                    • @author lql

                    • @time 2024-02-14 23:34:23

                    • @description TODO:Iterate迭代流式计算

                      */

                      public class IterateDemo {

                      public static void main(String[] args) throws Exception {

                       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                       //10
                       DataStreamSource strs = env.socketTextStream("node1", 9999);
                       DataStream numbers = strs.map(Long::parseLong);
                       //调用iterate方法 DataStream -> IterativeStream
                       //对Nums进行迭代(不停的输入int的数字)
                       IterativeStream iteration = numbers.iterate();
                       //IterativeStream -> DataStream
                       //对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑
                       DataStream iterationBody = iteration.map(new MapFunction() {
                           @Override
                           public Long map(Long value) throws Exception {
                               System.out.println("iterate input =>" + value);
                               return value -= 2;
                           }
                       });
                       //只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑
                       DataStream feedback = iterationBody.filter(new FilterFunction() {
                           @Override
                           public boolean filter(Long value) throws Exception {
                               return value > 0;
                           }
                       });
                       //传入迭代的条件
                       iteration.closeWith(feedback);
                       //不满足迭代条件的最后要输出
                       DataStream output = iterationBody.filter(new FilterFunction() {
                           @Override
                           public boolean filter(Long value) throws Exception {
                               return value <= 0;
                           }
                       });
                       //数据结果
                       output.printToErr("output value:");
                       env.execute();
                      

                      }

                      }

                      结果:
                      ```java
                      iterate input =>7
                      iterate input =>5
                      iterate input =>3
                      iterate input =>1
                      output value::2> -1
                      iterate input =>6
                      iterate input =>4
                      output value::3> 0
                      iterate input =>2
                      

                      总结:

                      • 1- 更新模型,更新参数较为常见
                      • 2- 算子迭代,需要理解应用