一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
-
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
-
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
-
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
-
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- 一、DataStream 和 Table集成-数据管道
- 1、maven依赖
- 2、Adding Table API Pipelines to DataStream API 示例
本文介绍了将table api管道加入datastream。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
更多详细内容参考文章:
21、Flink 的table API与DataStream API 集成(完整版)
一、DataStream 和 Table集成-数据管道
1、maven依赖
UTF-8 UTF-8 1.8 1.8 1.8 2.12 1.17.0 org.apache.flink flink-clients ${flink.version} provided org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-streaming-java ${flink.version} provided org.apache.flink flink-table-api-java-bridge ${flink.version} provided org.apache.flink flink-sql-gateway ${flink.version} provided org.apache.flink flink-csv ${flink.version} provided org.apache.flink flink-json ${flink.version} provided org.apache.flink flink-table-planner_2.12 ${flink.version} provided org.apache.flink flink-table-api-java-uber ${flink.version} provided org.apache.flink flink-table-runtime ${flink.version} provided org.apache.flink flink-connector-jdbc 3.1.0-1.17 mysql mysql-connector-java 5.1.38 org.apache.flink flink-connector-hive_2.12 1.17.0 org.apache.hive hive-exec 3.1.2 org.apache.flink flink-connector-kafka ${flink.version} org.apache.flink flink-sql-connector-kafka ${flink.version} provided org.apache.commons commons-compress 1.24.0 org.projectlombok lombok 1.18.2 2、Adding Table API Pipelines to DataStream API 示例
单个Flink作业可以由多个相邻运行的断开连接的管道组成。
Table API中定义的Source-to-sink管道可以作为一个整体附加到StreamExecutionEnvironment,并在调用DataStream API中的某个执行方法时提交。
源不一定是table source,也可以是以前转换为Table API的另一个DataStream管道。因此,可以将 table sinks用于DataStream API程序。
通过使用StreamTableEnvironment.createStatementSet()创建的专用StreamStatementSet实例可以使用该功能。通过使用语句集,planner 可以一起优化所有添加的语句,并在调用StreamStatement set.attachAsDataStream()时提供一个或多个添加到StreamExecutionEnvironment的端到端管道( end-to-end pipelines)。
下面的示例演示如何将表程序添加到一个作业中的DataStream API程序。
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableDescriptor; import org.apache.flink.table.api.bridge.java.StreamStatementSet; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @author alanchan * */ public class TestTablePipelinesToDataStreamDemo {/** * @param args * @throws Exception */ public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); StreamStatementSet statementSet = tenv.createStatementSet(); // 建立数据源 TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen") .option("number-of-rows", "3") .schema( Schema.newBuilder() .column("myCol", DataTypes.INT()) .column("myOtherCol", DataTypes.BOOLEAN()) .build()) .build(); // 建立sink TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build(); // add a pure Table API pipeline Table tableFromSource = tenv.from(sourceDescriptor); statementSet.add(tableFromSource.insertInto(sinkDescriptor)); // use table sinks for the DataStream API pipeline DataStream
dataStream = env.fromElements(1, 2, 3); Table tableFromStream = tenv.fromDataStream(dataStream); statementSet.add(tableFromStream.insertInto(sinkDescriptor)); // attach both pipelines to StreamExecutionEnvironment (the statement set will be cleared after calling this method) statementSet.attachAsDataStream(); // define other DataStream API parts env.fromElements(4, 5, 6).addSink(new DiscardingSink<>()); // use DataStream API to submit the pipelines env.execute(); // 1> +I[287849559, true] // +I[1] // +I[2] // +I[3] // 3> +I[-1058230612, false] // 2> +I[-995481497, false] } } 以上,本文介绍了将table api管道加入datastream。