Flink学习笔记
前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。
Tips:我觉得学习 Flink 还是挺有意思的,虽然学习进度有点慢,但是数据源已经理解清楚了,我相信接下来一切会越来越好的!
二、Flink 流批一体 API 开发
1. 输入数据集 Data Source
1.1 预定义 Source
1.1.1 基于本地集合的 Source
- (1) env.fromElements()
# 两种输入类型,一种是元素,一种是元组 DataStreamSource
- (2) env.fromCollection()
# 传入列表 DataStreamSource
ds3 = env.fromCollection(Arrays.asList("spark", "flink", "hadoop")); # 输出-3 8> hadoop 6> spark 7> flink # fromParallelCollection 并行度队列(0-10闭区间) DataStreamSource parallelCollection = env.fromParallelCollection( new NumberSequenceIterator(0L, 10L), TypeInformation.of(Long.TYPE) ).setParallelism(3); # 乱序输出 -parallelCollection 8> 8 2> 10 8> 7 6> 3 6> 5 3> 0 7> 6 1> 9 5> 2 5> 4 4> 1 - (3) env.generateSequence()
# 传入队列(左开右闭区间) DataStreamSource
ds4 = env.generateSequence(1, 10); # 输出 -4 8> 8 3> 3 2> 2 5> 5 1> 1 1> 9 7> 7 6> 6 4> 4 2> 10 - (4) env.fromSequence()
# 传入队列(左开右闭区间) DataStreamSource
ds5 = env.fromSequence(1, 10); # 输出 -5 1> 8 7> 6 6> 10 2> 5 3> 1 3> 2 8> 7 4> 9 5> 3 5> 4 1.1.2 基于文件的 Source
- (1) 批的方式读取文本文件:env.readTextFile(path)
package cn.itcast.day02.source; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lql * @time 2024-02-12 23:47:53 * @description TODO:批的方式读取文件 */ public class BatchFromFile { public static void main(String[] args) throws Exception { // 配置端口号信息 Configuration configuration = new Configuration(); configuration.setInteger("rest.port",8081); // 初始化 UI 环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); // 读取数据源 String path = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\wordcount.txt"; DataStreamSource
lines = env.readTextFile(path); // 数据源并行度 int parallelism = lines.getParallelism(); System.out.println("ReadTextFileDemo创建的DataStream的并行度为:" + parallelism); lines.print(); env.execute(); } } - (2) 流的方式读取文本文件:env.readFile()
- 细节点:流式处理 PROCESS_CONTINUOUSLY 时,文件状态改变才能触发重新打印一次
package cn.itcast.day02.source; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; /** * @author lql * @time 2024-02-13 15:34:11 * @description TODO:流的方式读取数据源,无限流 */ public class StreamFromFile { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port",8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); String path = "./data/input/wordcount.txt"; // new TextInputFormat(null),文本输入编码格式,null表示默认为utf-8编码 // FileProcessingMode.PROCESS_ONCE 只处理一次 // 2000毫秒表示间隔处理时间 DataStreamSource
lines1 = env.readFile(new TextInputFormat(null), path, FileProcessingMode.PROCESS_ONCE, 2000 ); // FileProcessingMode.PROCESS_CONTINUOUSLY 永续处理,不会停止 DataStreamSource lines2 = env.readFile(new TextInputFormat(null), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 2000 ); // 查看并行度 System.out.println("lines1的并行度:"+lines1.getParallelism()); System.out.println("lines2的并行度:"+lines2.getParallelism()); //lines1.print(); lines2.print(); env.execute(); } } 1.1.3 基于 Socket 的 Source
- 现象:socket 的并行度是 1(单并行度数据源)
- 细节:在虚拟机上用 nc -lk 8888 启动 socket 服务端
package cn.itcast.day02.source; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @author lql * @time 2024-02-13 16:00:47 * @description TODO:基于socket的数据源 */ public class StreamSocketSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); int parallelism0 = env.getParallelism(); System.out.println("执行环境默认的并行度:" + parallelism0); DataStreamSource
lines = env.socketTextStream("192.168.88.161", 8888); int parallelism1 = lines.getParallelism(); System.out.println("SocketSource的并行度:" + parallelism1); SingleOutputStreamOperator words = lines.flatMap(new FlatMapFunction () { @Override public void flatMap(String line, Collector out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(word); } } }); int parallelism2 = words.getParallelism(); System.out.println("调用完FlatMap后DataStream的并行度:" + parallelism2); words.print(); env.execute(); } } 1.2 自定义 Source
1.2.1 基于随机生成DataSource
- (1) 自定义实现 SourceFunction 接口
- 例子:自定义数据源, 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
- 要求: 随机生成订单ID(UUID),用户ID(0-2),订单金额(0-100),时间戳为当前系统时间
package cn.itcast.day02.source.custom; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; import java.util.UUID; /** * @author lql * @time 2024-02-13 16:21:31 * @description TODO */ public class CustomerSourceWithoutParallelDemo { /** * 自定义 java Bean 类 * @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。 * @AllArgsConstructor:自动生成一个包含所有参数的构造函数。 * @NoArgsConstructor:自动生成一个无参构造函数。 */ @Data @NoArgsConstructor @AllArgsConstructor public static class Order{ // 订单 private String id; // 用户 ID private String userId; // 订单金额 private int money; // 时间戳 private Long timestamp; } /** * 主函数 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //todo 1)获取flink流处理的运行环境 Configuration configuration = new Configuration(); configuration.setInteger("rest.port",8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); System.out.println("初始化环境的并行度:"+ env.getParallelism()); // todo 2) 接入自定义数据源 DataStreamSource
streamSource = env.addSource(new MySource()); System.out.println("streamSource并行度: " + streamSource.getParallelism()); // todo 3) 打印输出 streamSource.printToErr(); env.execute(); } /** * 自定义数据源,每秒钟生成一个订单信息 */ private static class MySource implements SourceFunction { // 定义循环生成数据的标记 private boolean isRunning = true; /** * 核心方法:生成数据 */ @Override public void run(SourceContext sourceContext) throws Exception { Random random = new Random(); while (isRunning){ // 订单ID String orderID = UUID.randomUUID().toString(); // 用户 Id String userID = String.valueOf(random.nextInt(3)); // 订单金额 int money = random.nextInt(1000); // 时间 long time = System.currentTimeMillis(); // 返回数据 sourceContext.collect(new Order(orderID, userID, money, time)); } } @Override public void cancel() { isRunning = false; } } } 结果:默认运行环境的并行度:8, 自定义streamSource的并行度为:1
总结:
-
1- env.addSource(new MySource()),自定义数据源 [私有静态方法]:
- new 一个 实现(implements) SourceFunction 接口,并重写核心方法
-
2- 认识了 java bean 类,@Data,@NoArgsConstructor,@AllArgsConstructor 的作用
-
3- UUID 这个工具类可以随机生成 id,随机数使用需要先 new 一个,random.nextInt() 是左闭右开
-
4- String.valuesOf()是可以生成字符串类型,while 循环需要有 boolean 标记
-
5- collect()可以返回对象数据
-
(2) 实现ParallelSourceFunction创建可并行Source
DataStreamSource
mySource = env.addSource(new MySource()).setParallelism(6); # 上述非rich的自定义mySource数据源不支持多个并行度 - (3) 实现RichParallelSourceFunction:创建并行并带有Rich功能的Source
package cn.itcast.day02.source.custom; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.util.Random; import java.util.UUID; /** * @author lql * @time 2024-02-13 16:58:49 * @description TODO:多并行度的自定义数据源 */ public class RichParallelismDemo { /** * 自定义 java Bean 类 * * @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。 * @AllArgsConstructor:自动生成一个包含所有参数的构造函数。 * @NoArgsConstructor:自动生成一个无参构造函数。 */ @Data @NoArgsConstructor @AllArgsConstructor public static class Order { // 订单 private String id; // 用户 ID private String userId; // 订单金额 private int money; // 时间戳 private Long timestamp; } /** * 主函数 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //todo 1)获取flink流处理的运行环境 Configuration configuration = new Configuration(); configuration.setInteger("rest.port", 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); System.out.println("初始化环境的并行度:" + env.getParallelism()); // todo 2) 接入自定义数据源 DataStreamSource
streamSource = env.addSource(new MySource()); streamSource = streamSource; System.out.println("streamSource并行度: " + streamSource.getParallelism()); // todo 3) 打印输出 streamSource.printToErr(); env.execute(); } /** * 自定义数据源,每秒钟生成一个订单信息 */ private static class MySource extends RichParallelSourceFunction { // 定义循环生成数据的标记 private boolean isRunning = true; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public void close() throws Exception { super.close(); } @Override public void cancel() { } @Override public void run(SourceContext sourceContext) throws Exception { Random random = new Random(); while (isRunning) { // 订单ID String orderID = UUID.randomUUID().toString(); // 用户 Id String userID = String.valueOf(random.nextInt(3)); // 订单金额 int money = random.nextInt(1000); // 时间 long time = System.currentTimeMillis(); // 返回数据 sourceContext.collect(new Order(orderID, userID, money, time)); } } } } 结果:自定义RichParallelSourceFunction支持多个并行度
总结:继承 RichParallelSourceFunction 方法,需要重写方法 open 和 close !
1.2.2 基于 MySQL 的 Source 操作
DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` int(11) NOT NULL, `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Records of user -- ---------------------------- INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮'); INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫'); INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖'); SET FOREIGN_KEY_CHECKS = 1;
package cn.itcast.day02.source.custom; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.concurrent.TimeUnit; /** * @author lql * @time 2024-02-13 17:14:06 * @description TODO:自定义 mysql 数据源 */ public class MysqlSource { public static void main(String[] args) throws Exception { // TODO 1: 获取 flink 流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // TODO 2: 接入自定义数据源 DataStreamSource
streamSource = env.addSource(new MysqlSourceFunction()); System.out.println("MysqlSourceFunction的并行度为:"+streamSource.getParallelism()); // todo 3) 打印输出 streamSource.print(); // todo 4) 启动运行作业 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class UserInfo{ private int id; private String username; private String password; private String name; } /** * 自定义数据源:获取 mysql 数据 */ private static class MysqlSourceFunction extends RichParallelSourceFunction { // 定义 mysql 的连接对象 private Connection connection = null; // 定义 mysql statement 对象 private PreparedStatement statement = null; /** * 实例化的时候会被执行一次,多个并行度会执行多次,因为有多个实例 * 一般由于资源的初始化操作 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 注册驱动 Class.forName("com.mysql.jdbc.Driver"); // 实例化 mysql 的连接对象 connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root"); // 实例化 statement 对象 statement = connection.prepareStatement("select * from test.user"); } @Override public void close() throws Exception { super.close(); } @Override public void run(SourceContext sourceContext) throws Exception { while(true){ ResultSet resultSet = statement.executeQuery(); while(resultSet.next()) { int id = resultSet.getInt("id"); String username = resultSet.getString("username"); String password = resultSet.getString("password"); String name = resultSet.getString("name"); sourceContext.collect(new UserInfo(id,username,password,name)); } resultSet.close(); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { } } } 结果:mysql 的自定义 source,可以多并行度
总结:
- 1- java Bean 类,给 mysql 字段名定义用
- 2- 初始化 mysql 连接对象 connection 和 statement 记为null
- 3- 重写 open 驱动方法:
- 注册 mysql 驱动:Class.forName(“com.mysql.jdbc.Driver”)
- 实例化连接对象 connection:DriverManager.getConnection()
- 实例化 statement:connection.prepareStatement(),这里放置 sql 查询语句
- 4- 重写 run 核心方法:
- 双重循环,第一层:结果集关闭和停顿间隔,第二层:statement.executeQuery()获取结果集,字段类型和内容获取
- 获取完字段后,需要collect(new 实体类(字段集))
- 5- 睡眠时间:TimeUnit.SECONDS.sleep()
- (3) 实现RichParallelSourceFunction:创建并行并带有Rich功能的Source
-
- (1) 自定义实现 SourceFunction 接口
- 细节点:流式处理 PROCESS_CONTINUOUSLY 时,文件状态改变才能触发重新打印一次
- (2) 流的方式读取文本文件:env.readFile()
- (1) 批的方式读取文本文件:env.readTextFile(path)
- (4) env.fromSequence()
- (3) env.generateSequence()
- (2) env.fromCollection()