文章目录
- 概述
- 第一部分:基础概念【3个小节】
- 01-基础概念【DataStream】
- 02-基础概念【并行度设置】
- 03-基础概念【资源槽Slot】
- 第二部分:Data Source & Data Sink【4个小节】
- 04-Data Source【基本数据源】
- 集合Collection数据源
- 文件File数据源
- 05-Data Source【自定义数据源】
- 06-Data Source【MySQL Source】
- 07-Data Sink【MySQL Sink】
- 第三部分:DataStream Transformations【9个小节】
- 08-Transformation【算子概述】
- 09-Transformation【map 算子】
- 10-Transformation【flatMap 算子】
- 11-Transformation 【filter 算子 】
- 12-Transformation【keyBy 算子 】
- 13-Transformation【reduce 算子】
- 14-Transformation【max和min 算子 】
- 15-Transformation【union和connect 算子 】
- 16-Transformation 【Side Outputs】
概述
Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流
处理和批处理两种类型应用的功能。
第一部分:基础概念【3个小节】
01-基础概念【DataStream】
在Flink计算引擎中,将数据当做:数据流DataStream,分为有界数据流和无界数据流。
[任何类型的数据都可以形成一种事件流,如信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。]
- 1)、有边界流(bounded stream):==有定义流的开始,也有定义流的结束。==有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。
- 2)、无边界流(unbounded stream):有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
DataStream(数据流)官方定义:
DataStream(数据流)源码中定义:
DataStream有如下几个子类:
-
1)、DataStreamSource:
- 表示从数据源直接获取数据流DataStream,比如从Socket或Kafka直接消费数据
-
2)、KeyedStream:
- 当DataStream数据流进行分组时(调用keyBy),产生流称为KeyedStream,按照指定Key分组;
- 通常情况下数据流被分组以后,需要进行窗口window操作或聚合操作。
-
3)、SingleOutputStreamOperator:
- 当DataStream数据流没有进行keyBy分组,而是使用转换函数,产生的流称为SingleOutputStreamOperator。
- 比如使用filter、map、flatMap等函数,产生的流就是SingleOutputStreamOperator
-
4)、IterativeStream:迭代流,表示对流中数据进行迭代计算,比如机器学习,图计算等。
DataStream类是泛型(类型参数),数据类型支持如下所示:
在Flink计算引擎中,提供4个层次API,如下所示:
Flink中流计算DataStream层次API在使用时,还是包括三个方面:Source/Transformation/Sink
基于Flink开发流式计算程序五个步骤:
# 1)、Obtain an execution environment, 执行环境-env:StreamExecutionEnvironment # 2)、Load/create the initial data, 数据源-source:DataStream # 3)、Specify transformations on this data, 数据转换-transformation:DataStream API(算子,Operator) # 4)、Specify where to put the results of your computations, 数据接收器-sink # 5)、Trigger the program execution 触发执行-execute
在IDEA中创建Flink Stream流计算编程模板:FlinkClass
模块中内容:FlinkClass
#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "") package ${PACKAGE_NAME};#end import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * * @author xuanyu */ public class ${NAME} {public static void main(String[] args) throws Exception {// 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 数据源-source // 3. 数据转换-transformation // 4. 数据终端-sink // 5. 触发执行-execute env.execute("${NAME}") ; } }
依据上述定义FlinkStream模块Template,创建Flink Stream编程类:StreamDemo
02-基础概念【并行度设置】
一个Flink程序由多个Operator组成(source、transformation和 sink)。
一个Operator由多个并行的SubTask(以线程方式)来执行, 一个Operator的并行SubTask(数目就被称为该Operator(任务)的并行度(Parallelism)。
在Flink 中,并行度设置可以从4个层次级别指定,具体如下所示:
- 1)、Operator Level(算子级别)(可以使用)
一个operator、source和sink的并行度可以通过调用 setParallelism()方法来指定。
- 2)、Execution Environment Level(Env级别,可以使用)
执行环境并行度可以通过调用setParallelism()方法指定。
- 3)、Client Level(客户端级别,推荐使用)
并行度可以在客户端将job提交到Flink时设定,对于CLI客户端,可以通过-p参数指定并行度
- 4)、System Level(系统默认级别,尽量不使用)
在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度。
package cn.itcast.flink.parallelism; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * 使用Flink计算引擎实现实时流计算:词频统计WordCount,从TCP Socket消费数据,结果打印控制台。 1.执行环境-env 2.数据源-source 3.数据转换-transformation 4.数据接收器-sink 5.触发执行-execute * @author xuyuan */ public class WordCountParallelism { /** * 当运行Flink 程序时,传递参数,获取对应host和port值 * todo: WordCount --host node1 --port 9999 */ public static void main(String[] args) throws Exception { // 构建参数解析工具类ParameterTool ParameterTool parameterTool = ParameterTool.fromArgs(args); if(parameterTool.getNumberOfParameters() != 2){ System.out.println("Usage: WordCount --host
--port ............"); System.exit(-1); } String host = parameterTool.get("host"); int port = parameterTool.getInt("port", 9999) ; // 1.执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ; // todo: 执行环境级别并行度 env.setParallelism(2) ; // 2.数据源-source DataStreamSource inputDataStream = env.socketTextStream(host, port); // 3.数据转换-transformation /* 流中每条数据: flink spark flink | 词频统计,步骤与批处理完全一致 */ // 3-1. 分割单词 SingleOutputStreamOperator wordDataStream = inputDataStream.flatMap( new FlatMapFunction () { @Override public void flatMap(String value, Collector out) throws Exception { String[] words = value.split("\\s+"); for (String word : words) { out.collect(word); } } } ); // 3-2. 转换为二元组 SingleOutputStreamOperator > tupleDataStream = wordDataStream.map( new MapFunction >() { @Override public Tuple2 map(String value) throws Exception { return Tuple2.of(value, 1); } } ); // 3-3. 按照单词分组,并且组求和 SingleOutputStreamOperator > resultDataStream = tupleDataStream .keyBy(0) .sum(1); // 4.数据接收器-sink, todo: 算子级别设置并行度,优先级最高 resultDataStream.print().setParallelism(1); // 5.触发执行-execute env.execute("StreamWordCount"); } } 总结:并行度的优先级:算子级别 > env级别 > Client级别 > 系统默认级别
- 1)、如果source不可以被并行执行,即使指定了并行度为多个,也不会生效
- 2)、实际生产中,推荐在算子级别显示指定各自的并行度,方便进行显示和精确的资源控制。
- 3)、slot是静态的概念,是指taskmanager具有的并发执行能力; parallelism是动态的概念,是指程序运行时实际使用的并发能力。
03-基础概念【资源槽Slot】
Flink中运行Task任务(SubTask)在Slot资源槽中:
[Slot为子任务SubTask运行资源抽象,每个TaskManager运行时设置Slot个数。]
官方建议: Slot资源槽个数 = CPU Core核数 也就是说, 分配给TaskManager多少CPU Core核数,可以等价为Slot个数
每个TaskManager运行时设置内存大小:[TaskManager中内存平均划分给Slot]。
举例说明: 假设TaskManager中分配内存为:4GB,Slot个数为4个,此时每个Slot分配内存就是 4GB / 4 = 1GB 内存
每个Slot中运行SubTask子任务,以线程Thread方式运行。
- 1个Job中不同类型SubTask任务,可以运行在同一个Slot中,称为:[Slot Sharded 资源槽共享]
- 1个Job中相同类型SubTask任务必须运行在不同Slot中
第二部分:Data Source & Data Sink【4个小节】
DataStream API 编程 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview # 1、Data Sources 数据源 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sources # 2、DataStream Transformations https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/ # 3、Data Sinks 数据接收器 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sinks
04-Data Source【基本数据源】
针对Flink 流计算来说,数据源可以是有界数据源(静态数据),也可以是无界数据源(流式数据),原因在于Flink框架中,将数据统一封装称为DataStream数据流。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/#data-sources
1、基于File文件数据源 readTextFile(path) 2、Sockect 数据源 socketTextStream 3、基于Collection数据源 fromCollection(Collection) fromElements(T ...) fromSequence(from, to),相当于Python中range 4、自定义Custom数据源 env.addSource() 官方提供接口: SourceFunction 非并行 RichSourceFunction ParallelSourceFunction 并行 RichParallelSourceFunction
集合Collection数据源
基于集合Collection数据源Source,一般用于学习测试。
package cn.itqzd.flink.source; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; /** * Flink 流计算数据源:基于集合的Source,分别为可变参数、集合和自动生成数据 * TODO: 基于集合数据源Source构建DataStream,属于有界数据流,当数据处理完成以后,应用结束 */ public class StreamSourceCollectionDemo {public static void main(String[] args) throws Exception {// 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1) ; // 2. 数据源-source // 方式一:可变参数 DataStreamSource
dataStream01 = env.fromElements("spark", "flink", "mapreduce"); dataStream01.print(); // 方式二:集合对象 DataStreamSource dataStream02 = env.fromCollection(Arrays.asList("spark", "flink", "mapreduce")); dataStream02.printToErr(); // 方式三:自动生成序列数字 DataStreamSource dataStream03 = env.fromSequence(1, 10); dataStream03.print(); // 5. 触发执行-execute env.execute("StreamSourceCollectionDemo") ; } } 文件File数据源
基于文件数据源, 一般用于学习测试,演示代码如下所示:
从文本文件加载数据时,可以是压缩文件,支持压缩格式如下图。
案例演示代码:StreamSourceFileDemo
package cn.itqzd.flink.source; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Flink 流计算数据源:基于文件的Source */ public class StreamSourceFileDemo {public static void main(String[] args) throws Exception {// 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1) ; // 2. 数据源-source // 方式一:读取文本文件 DataStreamSource
dataStream01 = env.readTextFile("datas/words.txt"); dataStream01.printToErr(); // 方式二:读取压缩文件 DataStreamSource dataStream02 = env.readTextFile("datas/words.txt.gz"); dataStream02.print(); // 5. 触发执行-execute env.execute("StreamSourceFileDemo") ; } } 05-Data Source【自定义数据源】
在Flink 流计算中,提供数据源Source接口,用户实现自定义数据源,可以从任何地方获取数据。
1、SourceFunction: 非并行数据源(并行度parallelism=1) 2、RichSourceFunction: 多功能非并行数据源(并行度parallelism=1) 3、ParallelSourceFunction: 并行数据源(并行度parallelism>=1) 4、RichParallelSourceFunction: 多功能并行数据源(parallelism>=1),Kafka数据源使用该接口
实际项目中,如果自定义数据源,实现接口:RichSourceFunction或RichParallelSourceFunction。
查看SourceFunction接口中方法:
# 第一个方法:run 实时从数据源端加载数据,并且发送给下一个Operator算子,进行处理 实时产生数据 # 第二个方法:cancel 字面意思:取消 当将Job作业取消时,不在从数据源端读取数据 # 总结:当基于数据源接口自定义数据源时,只要实现上述2个 方法即可。
需求:每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
创建类:OrderSource,实现接口【RichParallelSourceFunction】,实现其中run和cancel方法。
编程实现自定义数据源:StreamSourceOrderDemo,实时产生交易订单数据。
package cn.itcast.flink.source; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳) * - 随机生成订单ID:UUID * - 随机生成用户ID:0-2 * - 随机生成订单金额:0-100 * - 时间戳为当前系统时间:current_timestamp */ public class StreamSourceOrderDemo {@Data @NoArgsConstructor @AllArgsConstructor public static class Order {private String id; private Integer userId; private Double money; private Long orderTime; } /** * 自定义数据源,继承抽象类:RichParallelSourceFunction,并行的和富有的 */ private static class OrderSource extends RichParallelSourceFunction
{// 定义变量,用于标识是否产生数据 private boolean isRunning = true ; // 表示产生数据,从数据源Source源源不断加载数据 @Override public void run(SourceContext ctx) throws Exception {Random random = new Random(); while (isRunning){// 产生交易订单数据 Order order = new Order( UUID.randomUUID().toString(), // random.nextInt(2), // (double)random.nextInt(100), // System.currentTimeMillis() ); // 发送交易订单数据 ctx.collect(order); // 每隔1秒产生1条数据,休眠1秒钟 TimeUnit.SECONDS.sleep(1); } } // 取消从数据源加载数据 @Override public void cancel() {isRunning = false ; } } public static void main(String[] args) throws Exception {// 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ; env.setParallelism(1); // 2. 数据源-source DataStreamSource orderDataStream = env.addSource(new OrderSource()); // 3. 数据转换-transformation // 4. 数据接收器-sink orderDataStream.printToErr(); // 5. 触发执行-execute env.execute("StreamSourceOrderDemo") ; } } 运行流式计算程序,查看模拟产生订单数据:
06-Data Source【MySQL Source】
07-Data Sink【MySQL Sink】
第三部分:DataStream Transformations【9个小节】
08-Transformation【算子概述】
09-Transformation【map 算子】
10-Transformation【flatMap 算子】
11-Transformation 【filter 算子 】
12-Transformation【keyBy 算子 】
13-Transformation【reduce 算子】
14-Transformation【max和min 算子 】
15-Transformation【union和connect 算子 】
16-Transformation 【Side Outputs】
- 4)、System Level(系统默认级别,尽量不使用)
- 3)、Client Level(客户端级别,推荐使用)
- 2)、Execution Environment Level(Env级别,可以使用)
- 1)、Operator Level(算子级别)(可以使用)