【flink番外篇】18、通过数据管道将table source加入datastream示例

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引


    文章目录

    • 一、DataStream 和 Table集成-数据管道
      • 1、maven依赖
      • 2、Adding Table API Pipelines to DataStream API 示例

        本文介绍了将table api管道加入datastream。

        如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

        本文除了maven依赖外,没有其他依赖。

        更多详细内容参考文章:

        21、Flink 的table API与DataStream API 集成(完整版)

        一、DataStream 和 Table集成-数据管道

        1、maven依赖

        UTF-8UTF-81.81.81.82.121.17.0org.apache.flinkflink-clients${flink.version}providedorg.apache.flinkflink-java${flink.version}providedorg.apache.flinkflink-table-common${flink.version}providedorg.apache.flinkflink-streaming-java${flink.version}providedorg.apache.flinkflink-table-api-java-bridge${flink.version}providedorg.apache.flinkflink-sql-gateway${flink.version}providedorg.apache.flinkflink-csv${flink.version}providedorg.apache.flinkflink-json${flink.version}providedorg.apache.flinkflink-table-planner_2.12${flink.version}providedorg.apache.flinkflink-table-api-java-uber${flink.version}providedorg.apache.flinkflink-table-runtime${flink.version}providedorg.apache.flinkflink-connector-jdbc3.1.0-1.17mysqlmysql-connector-java5.1.38org.apache.flinkflink-connector-hive_2.121.17.0org.apache.hivehive-exec3.1.2org.apache.flinkflink-connector-kafka${flink.version}org.apache.flinkflink-sql-connector-kafka${flink.version}providedorg.apache.commonscommons-compress1.24.0org.projectlomboklombok1.18.2

        2、Adding Table API Pipelines to DataStream API 示例

        单个Flink作业可以由多个相邻运行的断开连接的管道组成。

        Table API中定义的Source-to-sink管道可以作为一个整体附加到StreamExecutionEnvironment,并在调用DataStream API中的某个执行方法时提交。

        源不一定是table source,也可以是以前转换为Table API的另一个DataStream管道。因此,可以将 table sinks用于DataStream API程序。

        通过使用StreamTableEnvironment.createStatementSet()创建的专用StreamStatementSet实例可以使用该功能。通过使用语句集,planner 可以一起优化所有添加的语句,并在调用StreamStatement set.attachAsDataStream()时提供一个或多个添加到StreamExecutionEnvironment的端到端管道( end-to-end pipelines)。

        下面的示例演示如何将表程序添加到一个作业中的DataStream API程序。

        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
        import org.apache.flink.table.api.DataTypes;
        import org.apache.flink.table.api.Schema;
        import org.apache.flink.table.api.Table;
        import org.apache.flink.table.api.TableDescriptor;
        import org.apache.flink.table.api.bridge.java.StreamStatementSet;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        /**
         * @author alanchan
         *
         */
        public class TestTablePipelinesToDataStreamDemo {/**
        	 * @param args
        	 * @throws Exception 
        	 */
        	public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        		StreamStatementSet statementSet = tenv.createStatementSet();
        		
        		// 建立数据源
        		TableDescriptor sourceDescriptor =
        			    TableDescriptor.forConnector("datagen")
        			        .option("number-of-rows", "3")
        			        .schema(
        			            Schema.newBuilder()
        			                .column("myCol", DataTypes.INT())
        			                .column("myOtherCol", DataTypes.BOOLEAN())
        			                .build())
        			        .build();
        		
        		// 建立sink
        		TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();
        		
        		// add a pure Table API pipeline
        		Table tableFromSource = tenv.from(sourceDescriptor);
        		statementSet.add(tableFromSource.insertInto(sinkDescriptor));
        		
        		// use table sinks for the DataStream API pipeline
        		DataStream dataStream = env.fromElements(1, 2, 3);
        		Table tableFromStream = tenv.fromDataStream(dataStream);
        		statementSet.add(tableFromStream.insertInto(sinkDescriptor));
        		
        		// attach both pipelines to StreamExecutionEnvironment (the statement set will be cleared after calling this method)
        		statementSet.attachAsDataStream();
        		// define other DataStream API parts
        		env.fromElements(4, 5, 6).addSink(new DiscardingSink<>());
        		// use DataStream API to submit the pipelines
        		env.execute();
        		
        //		1> +I[287849559, true]
        //		+I[1]
        //		+I[2]
        //		+I[3]
        //		3> +I[-1058230612, false]
        //		2> +I[-995481497, false]
        		
        	}
        }
        

        以上,本文介绍了将table api管道加入datastream。