flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作

Flink学习笔记

前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。

Tips:我觉得学习 Flink 还是挺有意思的,虽然学习进度有点慢,但是数据源已经理解清楚了,我相信接下来一切会越来越好的!

二、Flink 流批一体 API 开发

1. 输入数据集 Data Source

1.1 预定义 Source
1.1.1 基于本地集合的 Source
  • (1) env.fromElements()
    # 两种输入类型,一种是元素,一种是元组
    DataStreamSource ds1 = env.fromElements("hadoop","spark", "spark", "flink");
    List> tuple2List  = new ArrayList<>();
    tuple2List.add(Tuple2.of("hadoop",1L));
    tuple2List.add(Tuple2.of("spark", 2L));
    tuple2List.add(Tuple2.of("flink", 3L));
    DataStreamSource>> ds2 = env.fromElements(tuple2List);
    # 输出-1
    6> spark
    4> hadoop
    5> spark
    7> flink
    # 输出-2
    6> [(hadoop,1), (spark,2), (flink,3)]
     
    • (2) env.fromCollection()
      # 传入列表
      DataStreamSource ds3 = env.fromCollection(Arrays.asList("spark", "flink", "hadoop"));
      # 输出-3
      8> hadoop
      6> spark
      7> flink
      # fromParallelCollection 并行度队列(0-10闭区间)
      DataStreamSource parallelCollection  = env.fromParallelCollection(
                      new NumberSequenceIterator(0L, 10L),
                      TypeInformation.of(Long.TYPE)
              ).setParallelism(3);
      # 乱序输出 -parallelCollection
      8> 8
      2> 10
      8> 7
      6> 3
      6> 5
      3> 0
      7> 6
      1> 9
      5> 2
      5> 4
      4> 1
      
      • (3) env.generateSequence()
        # 传入队列(左开右闭区间)
        DataStreamSource ds4 = env.generateSequence(1, 10);
        # 输出 -4
        8> 8
        3> 3
        2> 2
        5> 5
        1> 1
        1> 9
        7> 7
        6> 6
        4> 4
        2> 10
        
        • (4) env.fromSequence()
          # 传入队列(左开右闭区间)
          DataStreamSource ds5 = env.fromSequence(1, 10);
          # 输出 -5
          1> 8
          7> 6
          6> 10
          2> 5
          3> 1
          3> 2
          8> 7
          4> 9
          5> 3
          5> 4
          
          1.1.2 基于文件的 Source
          • (1) 批的方式读取文本文件:env.readTextFile(path)
            package cn.itcast.day02.source;
            import org.apache.flink.configuration.Configuration;
            import org.apache.flink.streaming.api.datastream.DataStreamSource;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            /**
             * @author lql
             * @time 2024-02-12 23:47:53
             * @description TODO:批的方式读取文件
             */
            public class BatchFromFile { public static void main(String[] args) throws Exception { // 配置端口号信息
                    Configuration configuration = new Configuration();
                    configuration.setInteger("rest.port",8081);
                    // 初始化 UI 环境
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
                    // 读取数据源
                    String path = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\wordcount.txt";
                    DataStreamSource lines = env.readTextFile(path);
                    // 数据源并行度
                    int parallelism = lines.getParallelism();
                    System.out.println("ReadTextFileDemo创建的DataStream的并行度为:" + parallelism);
                    lines.print();
                    env.execute();
                }
            }
            
            • (2) 流的方式读取文本文件:env.readFile()
              • 细节点:流式处理 PROCESS_CONTINUOUSLY 时,文件状态改变才能触发重新打印一次
                package cn.itcast.day02.source;
                import org.apache.flink.api.java.io.TextInputFormat;
                import org.apache.flink.configuration.Configuration;
                import org.apache.flink.streaming.api.datastream.DataStreamSource;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
                /**
                 * @author lql
                 * @time 2024-02-13 15:34:11
                 * @description TODO:流的方式读取数据源,无限流
                 */
                public class StreamFromFile { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration();
                        configuration.setInteger("rest.port",8081);
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
                        String path = "./data/input/wordcount.txt";
                        // new TextInputFormat(null),文本输入编码格式,null表示默认为utf-8编码
                        // FileProcessingMode.PROCESS_ONCE 只处理一次
                        // 2000毫秒表示间隔处理时间
                        DataStreamSource lines1 = env.readFile(new TextInputFormat(null), path,
                                FileProcessingMode.PROCESS_ONCE, 2000
                        );
                        // FileProcessingMode.PROCESS_CONTINUOUSLY 永续处理,不会停止
                        DataStreamSource lines2 = env.readFile(new TextInputFormat(null), path,
                                FileProcessingMode.PROCESS_CONTINUOUSLY, 2000
                        );
                        // 查看并行度
                        System.out.println("lines1的并行度:"+lines1.getParallelism());
                        System.out.println("lines2的并行度:"+lines2.getParallelism());
                        //lines1.print();
                        lines2.print();
                        env.execute();
                    }
                }
                
                1.1.3 基于 Socket 的 Source
                • 现象:socket 的并行度是 1(单并行度数据源)
                • 细节:在虚拟机上用 nc -lk 8888 启动 socket 服务端
                  package cn.itcast.day02.source;
                  import org.apache.flink.api.common.functions.FlatMapFunction;
                  import org.apache.flink.streaming.api.datastream.DataStreamSource;
                  import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
                  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                  import org.apache.flink.util.Collector;
                  /**
                   * @author lql
                   * @time 2024-02-13 16:00:47
                   * @description TODO:基于socket的数据源
                   */
                  public class StreamSocketSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                          int parallelism0 = env.getParallelism();
                          System.out.println("执行环境默认的并行度:" + parallelism0);
                          DataStreamSource lines = env.socketTextStream("192.168.88.161", 8888);
                          int parallelism1 = lines.getParallelism();
                          System.out.println("SocketSource的并行度:" + parallelism1);
                          SingleOutputStreamOperator words = lines.flatMap(new FlatMapFunction() { @Override
                              public void flatMap(String line, Collector out) throws Exception { String[] words = line.split(" ");
                                  for (String word : words) { out.collect(word);
                                  }
                              }
                          });
                          int parallelism2 = words.getParallelism();
                          System.out.println("调用完FlatMap后DataStream的并行度:" + parallelism2);
                          words.print();
                          env.execute();
                      }
                  }
                  
                  1.2 自定义 Source
                  1.2.1 基于随机生成DataSource
                  • (1) 自定义实现 SourceFunction 接口
                    • 例子:自定义数据源, 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
                    • 要求: 随机生成订单ID(UUID),用户ID(0-2),订单金额(0-100),时间戳为当前系统时间
                      package cn.itcast.day02.source.custom;
                      import lombok.AllArgsConstructor;
                      import lombok.Data;
                      import lombok.NoArgsConstructor;
                      import org.apache.flink.configuration.Configuration;
                      import org.apache.flink.streaming.api.datastream.DataStreamSource;
                      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                      import org.apache.flink.streaming.api.functions.source.SourceFunction;
                      import java.util.Random;
                      import java.util.UUID;
                      /**
                       * @author lql
                       * @time 2024-02-13 16:21:31
                       * @description TODO
                       */
                      public class CustomerSourceWithoutParallelDemo { /**
                           * 自定义 java Bean 类
                           *     @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
                           *     @AllArgsConstructor:自动生成一个包含所有参数的构造函数。
                           *     @NoArgsConstructor:自动生成一个无参构造函数。
                           */
                          @Data
                          @NoArgsConstructor
                          @AllArgsConstructor
                          public static class Order{ // 订单
                              private String id;
                              // 用户 ID
                              private String userId;
                              // 订单金额
                              private int money;
                              // 时间戳
                              private Long timestamp;
                          }
                          /**
                           * 主函数
                           * @param args
                           * @throws Exception
                           */
                          public static void main(String[] args) throws Exception { //todo 1)获取flink流处理的运行环境
                              Configuration configuration = new Configuration();
                              configuration.setInteger("rest.port",8081);
                              StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
                              System.out.println("初始化环境的并行度:"+ env.getParallelism());
                              // todo 2) 接入自定义数据源
                              DataStreamSource streamSource = env.addSource(new MySource());
                              System.out.println("streamSource并行度: " + streamSource.getParallelism());
                              
                              // todo 3) 打印输出
                              streamSource.printToErr();
                              env.execute();
                          }
                          
                          /**
                           * 自定义数据源,每秒钟生成一个订单信息
                           */
                          private static class MySource implements SourceFunction { // 定义循环生成数据的标记
                              private boolean isRunning = true;
                              /**
                               * 核心方法:生成数据
                               */
                              @Override
                              public void run(SourceContext sourceContext) throws Exception { Random random = new Random();
                                  while (isRunning){ // 订单ID
                                      String orderID = UUID.randomUUID().toString();
                                      // 用户 Id
                                      String userID = String.valueOf(random.nextInt(3));
                                      // 订单金额
                                      int money = random.nextInt(1000);
                                      // 时间
                                      long time = System.currentTimeMillis();
                                      // 返回数据
                                      sourceContext.collect(new Order(orderID, userID, money, time));
                                  }
                              }
                              @Override
                              public void cancel() { isRunning = false;
                              }
                          }
                      }
                      

                      结果:默认运行环境的并行度:8, 自定义streamSource的并行度为:1

                      总结:

                      • 1- env.addSource(new MySource()),自定义数据源 [私有静态方法]

                        • new 一个 实现(implements) SourceFunction 接口,并重写核心方法
                        • 2- 认识了 java bean 类,@Data,@NoArgsConstructor,@AllArgsConstructor 的作用

                        • 3- UUID 这个工具类可以随机生成 id,随机数使用需要先 new 一个,random.nextInt() 是左闭右开

                        • 4- String.valuesOf()是可以生成字符串类型,while 循环需要有 boolean 标记

                        • 5- collect()可以返回对象数据

                        • (2) 实现ParallelSourceFunction创建可并行Source

                          DataStreamSource mySource = env.addSource(new MySource()).setParallelism(6);
                          # 上述非rich的自定义mySource数据源不支持多个并行度
                          
                          • (3) 实现RichParallelSourceFunction:创建并行并带有Rich功能的Source
                            package cn.itcast.day02.source.custom;
                            import lombok.AllArgsConstructor;
                            import lombok.Data;
                            import lombok.NoArgsConstructor;
                            import org.apache.flink.configuration.Configuration;
                            import org.apache.flink.streaming.api.datastream.DataStreamSource;
                            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                            import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
                            import java.util.Random;
                            import java.util.UUID;
                            /**
                             * @author lql
                             * @time 2024-02-13 16:58:49
                             * @description TODO:多并行度的自定义数据源
                             */
                            public class RichParallelismDemo { /**
                                 * 自定义 java Bean 类
                                 *
                                 * @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
                                 * @AllArgsConstructor:自动生成一个包含所有参数的构造函数。
                                 * @NoArgsConstructor:自动生成一个无参构造函数。
                                 */
                                @Data
                                @NoArgsConstructor
                                @AllArgsConstructor
                                public static class Order { // 订单
                                    private String id;
                                    // 用户 ID
                                    private String userId;
                                    // 订单金额
                                    private int money;
                                    // 时间戳
                                    private Long timestamp;
                                }
                                /**
                                 * 主函数
                                 * @param args
                                 * @throws Exception
                                 */
                                public static void main(String[] args) throws Exception { //todo 1)获取flink流处理的运行环境
                                    Configuration configuration = new Configuration();
                                    configuration.setInteger("rest.port", 8081);
                                    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
                                    System.out.println("初始化环境的并行度:" + env.getParallelism());
                                    // todo 2) 接入自定义数据源
                                    DataStreamSource streamSource = env.addSource(new MySource());
                                    streamSource = streamSource;
                                    System.out.println("streamSource并行度: " + streamSource.getParallelism());
                                    // todo 3) 打印输出
                                    streamSource.printToErr();
                                    env.execute();
                                }
                                /**
                                 * 自定义数据源,每秒钟生成一个订单信息
                                 */
                                private static class MySource extends RichParallelSourceFunction { // 定义循环生成数据的标记
                                    private boolean isRunning = true;
                                    @Override
                                    public void open(Configuration parameters) throws Exception { super.open(parameters);
                                    }
                                    @Override
                                    public void close() throws Exception { super.close();
                                    }
                                    @Override
                                    public void cancel() { }
                                    @Override
                                    public void run(SourceContext sourceContext) throws Exception { Random random = new Random();
                                        while (isRunning) { // 订单ID
                                            String orderID = UUID.randomUUID().toString();
                                            // 用户 Id
                                            String userID = String.valueOf(random.nextInt(3));
                                            // 订单金额
                                            int money = random.nextInt(1000);
                                            // 时间
                                            long time = System.currentTimeMillis();
                                            // 返回数据
                                            sourceContext.collect(new Order(orderID, userID, money, time));
                                        }
                                    }
                                }
                            }
                            

                            结果:自定义RichParallelSourceFunction支持多个并行度

                            总结:继承 RichParallelSourceFunction 方法,需要重写方法 open 和 close !

                            1.2.2 基于 MySQL 的 Source 操作
                            DROP TABLE IF EXISTS `user`;
                            CREATE TABLE `user`  (
                              `id` int(11) NOT NULL,
                              `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
                              `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
                              `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
                              PRIMARY KEY (`id`) USING BTREE
                            ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
                            -- ----------------------------
                            -- Records of user
                            -- ----------------------------
                            INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
                            INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
                            INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');
                            SET FOREIGN_KEY_CHECKS = 1;
                            
                            package cn.itcast.day02.source.custom;
                            import lombok.AllArgsConstructor;
                            import lombok.Data;
                            import lombok.NoArgsConstructor;
                            import org.apache.flink.configuration.Configuration;
                            import org.apache.flink.streaming.api.datastream.DataStreamSource;
                            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                            import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
                            import java.sql.Connection;
                            import java.sql.DriverManager;
                            import java.sql.PreparedStatement;
                            import java.sql.ResultSet;
                            import java.util.concurrent.TimeUnit;
                            /**
                             * @author lql
                             * @time 2024-02-13 17:14:06
                             * @description TODO:自定义 mysql 数据源
                             */
                            public class MysqlSource { public static void main(String[] args) throws Exception { // TODO 1: 获取 flink 流处理环境
                                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                    // TODO 2: 接入自定义数据源
                                    DataStreamSource streamSource = env.addSource(new MysqlSourceFunction());
                                    System.out.println("MysqlSourceFunction的并行度为:"+streamSource.getParallelism());
                                    // todo 3) 打印输出
                                    streamSource.print();
                                    // todo 4) 启动运行作业
                                    env.execute();
                                }
                                @Data
                                @AllArgsConstructor
                                @NoArgsConstructor
                                public static class UserInfo{ private int id;
                                    private String username;
                                    private String password;
                                    private String name;
                                }
                                /**
                                 * 自定义数据源:获取 mysql 数据
                                 */
                                private static class MysqlSourceFunction extends RichParallelSourceFunction { // 定义 mysql 的连接对象
                                    private Connection connection = null;
                                    // 定义 mysql statement 对象
                                    private PreparedStatement statement = null;
                                    /**
                                     * 实例化的时候会被执行一次,多个并行度会执行多次,因为有多个实例
                                     * 一般由于资源的初始化操作
                                     * @param parameters
                                     * @throws Exception
                                     */
                                    @Override
                                    public void open(Configuration parameters) throws Exception { super.open(parameters);
                                        // 注册驱动
                                        Class.forName("com.mysql.jdbc.Driver");
                                        // 实例化 mysql 的连接对象
                                        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root");
                                        // 实例化 statement 对象
                                        statement = connection.prepareStatement("select * from test.user");
                                    }
                                    @Override
                                    public void close() throws Exception { super.close();
                                    }
                                    @Override
                                    public void run(SourceContext sourceContext) throws Exception { while(true){ ResultSet resultSet = statement.executeQuery();
                                            while(resultSet.next()) { int id = resultSet.getInt("id");
                                                String username = resultSet.getString("username");
                                                String password = resultSet.getString("password");
                                                String name = resultSet.getString("name");
                                                sourceContext.collect(new UserInfo(id,username,password,name));
                                            }
                                            resultSet.close();
                                            TimeUnit.SECONDS.sleep(1);
                                        }
                                    }
                                    @Override
                                    public void cancel() { }
                                }
                            }
                            

                            结果:mysql 的自定义 source,可以多并行度

                            总结: