02-Flink 流批一体 API开发(仅供学习,缓慢更新中)

文章目录

    • 概述
    • 第一部分:基础概念【3个小节】
      • 01-基础概念【DataStream】
      • 02-基础概念【并行度设置】
      • 03-基础概念【资源槽Slot】
      • 第二部分:Data Source & Data Sink【4个小节】
        • 04-Data Source【基本数据源】
          • 集合Collection数据源
          • 文件File数据源
          • 05-Data Source【自定义数据源】
          • 06-Data Source【MySQL Source】
          • 07-Data Sink【MySQL Sink】
          • 第三部分:DataStream Transformations【9个小节】
            • 08-Transformation【算子概述】
            • 09-Transformation【map 算子】
            • 10-Transformation【flatMap 算子】
            • 11-Transformation 【filter 算子 】
            • 12-Transformation【keyBy 算子 】
            • 13-Transformation【reduce 算子】
            • 14-Transformation【max和min 算子 】
            • 15-Transformation【union和connect 算子 】
            • 16-Transformation 【Side Outputs】

              概述

              Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流

              处理和批处理两种类型应用的功能。

              第一部分:基础概念【3个小节】

              01-基础概念【DataStream】

              ​ 在Flink计算引擎中,将数据当做:数据流DataStream,分为有界数据流和无界数据流。

              ​ [任何类型的数据都可以形成一种事件流,如信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。]

              • 1)、有边界流(bounded stream):==有定义流的开始,也有定义流的结束。==有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。
              • 2)、无边界流(unbounded stream):有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。

              DataStream(数据流)官方定义:

              DataStream(数据流)源码中定义:

              DataStream有如下几个子类:

              • 1)、DataStreamSource:

                • 表示从数据源直接获取数据流DataStream,比如从Socket或Kafka直接消费数据
                • 2)、KeyedStream:

                  • 当DataStream数据流进行分组时(调用keyBy),产生流称为KeyedStream,按照指定Key分组;
                  • 通常情况下数据流被分组以后,需要进行窗口window操作或聚合操作。
                  • 3)、SingleOutputStreamOperator:

                    • 当DataStream数据流没有进行keyBy分组,而是使用转换函数,产生的流称为SingleOutputStreamOperator。
                    • 比如使用filter、map、flatMap等函数,产生的流就是SingleOutputStreamOperator
                    • 4)、IterativeStream:迭代流,表示对流中数据进行迭代计算,比如机器学习,图计算等。

                      DataStream类是泛型(类型参数),数据类型支持如下所示:

                      在Flink计算引擎中,提供4个层次API,如下所示:

                      Flink中流计算DataStream层次API在使用时,还是包括三个方面:Source/Transformation/Sink

                      基于Flink开发流式计算程序五个步骤:

                      # 1)、Obtain an execution environment,
                      	执行环境-env:StreamExecutionEnvironment
                      	
                      # 2)、Load/create the initial data,
                          数据源-source:DataStream
                          
                      # 3)、Specify transformations on this data,
                          数据转换-transformation:DataStream API(算子,Operator)
                          
                      # 4)、Specify where to put the results of your computations,
                          数据接收器-sink
                      	
                      # 5)、Trigger the program execution
                      	触发执行-execute
                      

                      在IDEA中创建Flink Stream流计算编程模板:FlinkClass

                      模块中内容:FlinkClass

                      #if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "") package ${PACKAGE_NAME};#end
                      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                      /**
                       * 
                       * @author xuanyu
                       */
                      public class ${NAME} {public static void main(String[] args) throws Exception {// 1. 执行环境-env
                      		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                      		
                      		// 2. 数据源-source
                      		
                      		// 3. 数据转换-transformation
                      		
                      		// 4. 数据终端-sink
                      		
                      		// 5. 触发执行-execute
                      		env.execute("${NAME}") ;
                      	}
                      } 

                      依据上述定义FlinkStream模块Template,创建Flink Stream编程类:StreamDemo

                      02-基础概念【并行度设置】

                      一个Flink程序由多个Operator组成(source、transformation和 sink)。

                      ​ 一个Operator由多个并行的SubTask(以线程方式)来执行, 一个Operator的并行SubTask(数目就被称为该Operator(任务)的并行度(Parallelism)。

                      在Flink 中,并行度设置可以从4个层次级别指定,具体如下所示:

                      • 1)、Operator Level(算子级别)(可以使用)

                        一个operator、source和sink的并行度可以通过调用 setParallelism()方法来指定。

                        • 2)、Execution Environment Level(Env级别,可以使用)

                          执行环境并行度可以通过调用setParallelism()方法指定。

                          • 3)、Client Level(客户端级别,推荐使用)

                            并行度可以在客户端将job提交到Flink时设定,对于CLI客户端,可以通过-p参数指定并行度

                            • 4)、System Level(系统默认级别,尽量不使用)

                              ​ 在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度。

                              package cn.itcast.flink.parallelism;
                              import org.apache.flink.api.common.functions.FlatMapFunction;
                              import org.apache.flink.api.common.functions.MapFunction;
                              import org.apache.flink.api.java.tuple.Tuple2;
                              import org.apache.flink.api.java.utils.ParameterTool;
                              import org.apache.flink.streaming.api.datastream.DataStreamSource;
                              import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
                              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                              import org.apache.flink.util.Collector;
                              /**
                               * 使用Flink计算引擎实现实时流计算:词频统计WordCount,从TCP Socket消费数据,结果打印控制台。
                              		1.执行环境-env
                              		2.数据源-source
                              		3.数据转换-transformation
                              		4.数据接收器-sink
                              		5.触发执行-execute
                               * @author xuyuan
                               */
                              public class WordCountParallelism {
                              	/**
                              	 * 当运行Flink 程序时,传递参数,获取对应host和port值
                              	 * 		todo:  WordCount --host node1 --port 9999
                              	 */
                              	public static void main(String[] args) throws Exception {
                              		// 构建参数解析工具类ParameterTool
                              		ParameterTool parameterTool = ParameterTool.fromArgs(args);
                              		if(parameterTool.getNumberOfParameters() != 2){
                              			System.out.println("Usage: WordCount --host  --port  ............");
                              			System.exit(-1);
                              		}
                              		String host = parameterTool.get("host");
                              		int port = parameterTool.getInt("port", 9999) ;
                              		// 1.执行环境-env
                              		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
                              		// todo: 执行环境级别并行度
                              		env.setParallelism(2) ;
                              		// 2.数据源-source
                              		DataStreamSource inputDataStream = env.socketTextStream(host, port);
                              		// 3.数据转换-transformation
                              		/*
                              			流中每条数据: flink spark flink
                              							|
                              				词频统计,步骤与批处理完全一致
                              		 */
                              		// 3-1. 分割单词
                              		SingleOutputStreamOperator wordDataStream = inputDataStream.flatMap(
                              			new FlatMapFunction() {
                              				@Override
                              				public void flatMap(String value, Collector out) throws Exception {
                              					String[] words = value.split("\\s+");
                              					for (String word : words) {
                              						out.collect(word);
                              					}
                              				}
                              			}
                              		);
                              		// 3-2. 转换为二元组
                              		SingleOutputStreamOperator> tupleDataStream = wordDataStream.map(
                              			new MapFunction>() {
                              				@Override
                              				public Tuple2 map(String value) throws Exception {
                              					return Tuple2.of(value, 1);
                              				}
                              			}
                              		);
                              		// 3-3. 按照单词分组,并且组求和
                              		SingleOutputStreamOperator> resultDataStream = tupleDataStream
                              				.keyBy(0)
                              				.sum(1);
                              		// 4.数据接收器-sink, todo: 算子级别设置并行度,优先级最高
                              		resultDataStream.print().setParallelism(1);
                              		// 5.触发执行-execute
                              		env.execute("StreamWordCount");
                              	}
                              }
                              

                              总结:并行度的优先级:算子级别 > env级别 > Client级别 > 系统默认级别

                              • 1)、如果source不可以被并行执行,即使指定了并行度为多个,也不会生效
                              • 2)、实际生产中,推荐在算子级别显示指定各自的并行度,方便进行显示和精确的资源控制。
                              • 3)、slot是静态的概念,是指taskmanager具有的并发执行能力; parallelism是动态的概念,是指程序运行时实际使用的并发能力。

                                03-基础概念【资源槽Slot】

                                Flink中运行Task任务(SubTask)在Slot资源槽中:

                                [Slot为子任务SubTask运行资源抽象,每个TaskManager运行时设置Slot个数。]

                                官方建议:
                                	Slot资源槽个数  =  CPU Core核数
                                	
                                也就是说,
                                    分配给TaskManager多少CPU Core核数,可以等价为Slot个数
                                

                                每个TaskManager运行时设置内存大小:[TaskManager中内存平均划分给Slot]。

                                举例说明:
                                	假设TaskManager中分配内存为:4GB,Slot个数为4个,此时每个Slot分配内存就是 4GB / 4 = 1GB 内存 

                                每个Slot中运行SubTask子任务,以线程Thread方式运行。

                                • 1个Job中不同类型SubTask任务,可以运行在同一个Slot中,称为:[Slot Sharded 资源槽共享]
                                • 1个Job中相同类型SubTask任务必须运行在不同Slot中

                                第二部分:Data Source & Data Sink【4个小节】

                                DataStream API 编程
                                	https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview
                                	
                                # 1、Data Sources 数据源
                                	https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sources
                                # 2、DataStream Transformations
                                	https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/
                                	
                                # 3、Data Sinks 数据接收器
                                	https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sinks
                                

                                04-Data Source【基本数据源】

                                ​ 针对Flink 流计算来说,数据源可以是有界数据源(静态数据),也可以是无界数据源(流式数据),原因在于Flink框架中,将数据统一封装称为DataStream数据流

                                https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sources

                                1、基于File文件数据源
                                	readTextFile(path)
                                2、Sockect 数据源
                                	socketTextStream 
                                	
                                3、基于Collection数据源
                                	fromCollection(Collection)
                                	fromElements(T ...)
                                	fromSequence(from, to),相当于Python中range
                                4、自定义Custom数据源
                                	env.addSource()
                                	官方提供接口:
                                		SourceFunction			非并行
                                		RichSourceFunction 
                                		
                                		ParallelSourceFunction  并行
                                		RichParallelSourceFunction 
                                集合Collection数据源

                                基于集合Collection数据源Source,一般用于学习测试。

                                package cn.itqzd.flink.source;
                                import org.apache.flink.streaming.api.datastream.DataStreamSource;
                                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                                import java.util.Arrays;
                                /**
                                 * Flink 流计算数据源:基于集合的Source,分别为可变参数、集合和自动生成数据
                                 *      TODO: 基于集合数据源Source构建DataStream,属于有界数据流,当数据处理完成以后,应用结束
                                 */
                                public class StreamSourceCollectionDemo {public static void main(String[] args) throws Exception {// 1. 执行环境-env
                                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                		env.setParallelism(1) ;
                                		// 2. 数据源-source
                                		// 方式一:可变参数
                                		DataStreamSource dataStream01 = env.fromElements("spark", "flink", "mapreduce");
                                		dataStream01.print();
                                		// 方式二:集合对象
                                		DataStreamSource dataStream02 = env.fromCollection(Arrays.asList("spark", "flink", "mapreduce"));
                                		dataStream02.printToErr();
                                		// 方式三:自动生成序列数字
                                		DataStreamSource dataStream03 = env.fromSequence(1, 10);
                                		dataStream03.print();
                                		// 5. 触发执行-execute
                                		env.execute("StreamSourceCollectionDemo") ;
                                	}
                                }
                                
                                文件File数据源

                                基于文件数据源, 一般用于学习测试,演示代码如下所示:

                                从文本文件加载数据时,可以是压缩文件,支持压缩格式如下图。

                                案例演示代码:StreamSourceFileDemo

                                package cn.itqzd.flink.source;
                                import org.apache.flink.streaming.api.datastream.DataStreamSource;
                                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                                /**
                                 * Flink 流计算数据源:基于文件的Source
                                 */
                                public class StreamSourceFileDemo {public static void main(String[] args) throws Exception {// 1. 执行环境-env
                                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                		env.setParallelism(1) ;
                                		// 2. 数据源-source
                                		// 方式一:读取文本文件
                                		DataStreamSource dataStream01 = env.readTextFile("datas/words.txt");
                                		dataStream01.printToErr();
                                		// 方式二:读取压缩文件
                                		DataStreamSource dataStream02 = env.readTextFile("datas/words.txt.gz");
                                		dataStream02.print();
                                		// 5. 触发执行-execute
                                		env.execute("StreamSourceFileDemo") ;
                                	}
                                }
                                

                                05-Data Source【自定义数据源】

                                在Flink 流计算中,提供数据源Source接口,用户实现自定义数据源,可以从任何地方获取数据。

                                1、SourceFunction:
                                	非并行数据源(并行度parallelism=1)
                                2、RichSourceFunction:
                                	多功能非并行数据源(并行度parallelism=1)
                                3、ParallelSourceFunction:
                                	并行数据源(并行度parallelism>=1)
                                4、RichParallelSourceFunction:
                                	多功能并行数据源(parallelism>=1),Kafka数据源使用该接口
                                

                                实际项目中,如果自定义数据源,实现接口:RichSourceFunction或RichParallelSourceFunction。

                                查看SourceFunction接口中方法:

                                # 第一个方法:run
                                	实时从数据源端加载数据,并且发送给下一个Operator算子,进行处理
                                	实时产生数据
                                # 第二个方法:cancel
                                	字面意思:取消
                                	当将Job作业取消时,不在从数据源端读取数据
                                	
                                # 总结:当基于数据源接口自定义数据源时,只要实现上述2个 方法即可。
                                

                                需求:每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)

                                创建类:OrderSource,实现接口【RichParallelSourceFunction】,实现其中run和cancel方法。

                                编程实现自定义数据源:StreamSourceOrderDemo,实时产生交易订单数据。

                                package cn.itcast.flink.source;
                                import lombok.AllArgsConstructor;
                                import lombok.Data;
                                import lombok.NoArgsConstructor;
                                import org.apache.flink.streaming.api.datastream.DataStreamSource;
                                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                                import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
                                import java.util.Random;
                                import java.util.UUID;
                                import java.util.concurrent.TimeUnit;
                                /**
                                 * 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
                                	 * - 随机生成订单ID:UUID
                                	 * - 随机生成用户ID:0-2
                                	 * - 随机生成订单金额:0-100
                                	 * - 时间戳为当前系统时间:current_timestamp
                                 */
                                public class StreamSourceOrderDemo {@Data
                                	@NoArgsConstructor
                                	@AllArgsConstructor
                                	public static class Order {private String id;
                                		private Integer userId;
                                		private Double money;
                                		private Long orderTime;
                                	}
                                	/**
                                	 * 自定义数据源,继承抽象类:RichParallelSourceFunction,并行的和富有的
                                	 */
                                	private static class OrderSource extends RichParallelSourceFunction {// 定义变量,用于标识是否产生数据
                                		private boolean isRunning = true ;
                                		// 表示产生数据,从数据源Source源源不断加载数据
                                		@Override
                                		public void run(SourceContext ctx) throws Exception {Random random = new Random();
                                			while (isRunning){// 产生交易订单数据
                                				Order order = new Order(
                                					UUID.randomUUID().toString(), //
                                					random.nextInt(2), //
                                					(double)random.nextInt(100), //
                                					System.currentTimeMillis()
                                				);
                                				// 发送交易订单数据
                                				ctx.collect(order);
                                				// 每隔1秒产生1条数据,休眠1秒钟
                                				TimeUnit.SECONDS.sleep(1);
                                			}
                                		}
                                		// 取消从数据源加载数据
                                		@Override
                                		public void cancel() {isRunning = false ;
                                		}
                                	}
                                	public static void main(String[] args) throws Exception {// 1. 执行环境-env
                                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
                                		env.setParallelism(1);
                                		// 2. 数据源-source
                                		DataStreamSource orderDataStream = env.addSource(new OrderSource());
                                		// 3. 数据转换-transformation
                                		// 4. 数据接收器-sink
                                		orderDataStream.printToErr();
                                		// 5. 触发执行-execute
                                		env.execute("StreamSourceOrderDemo") ;
                                	}
                                }
                                

                                运行流式计算程序,查看模拟产生订单数据:

                                06-Data Source【MySQL Source】

                                07-Data Sink【MySQL Sink】

                                第三部分:DataStream Transformations【9个小节】

                                08-Transformation【算子概述】

                                09-Transformation【map 算子】

                                10-Transformation【flatMap 算子】

                                11-Transformation 【filter 算子 】

                                12-Transformation【keyBy 算子 】

                                13-Transformation【reduce 算子】

                                14-Transformation【max和min 算子 】

                                15-Transformation【union和connect 算子 】

                                16-Transformation 【Side Outputs】