【flink番外篇】19、Datastream数据类型到Table schema映射示例

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引


    文章目录

    • 一、DataStream 和 Table集成-Datastream数据类型到Table schema映射示例
      • 1、maven依赖
      • 2、将表转换成 DataStream的数据类型到 Table Schema 的映射
        • 1)、映射方式及示例
          • 1、基于位置映射介绍及示例
          • 2、基于字段名称介绍及示例
          • 2)、数据类型到Table Schema 的映射及示例
            • 1、原子类型映射介绍及示例
            • 2、Tuple类型和 Case Class类型映射介绍及示例
            • 3、POJO 类型映射介绍及示例
            • 4、Row类型映射介绍及示例

              本文介绍了DataStream 的数据类型到Table Schema 的映射方式及类型示例。

              如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

              本文除了maven依赖外,没有其他依赖。

              更多详细内容参考文章:

              21、Flink 的table API与DataStream API 集成(完整版)

              一、DataStream 和 Table集成-Datastream数据类型到Table schema映射示例

              1、maven依赖

              UTF-8UTF-81.81.81.82.121.17.0org.apache.flinkflink-clients${flink.version}providedorg.apache.flinkflink-java${flink.version}providedorg.apache.flinkflink-table-common${flink.version}providedorg.apache.flinkflink-streaming-java${flink.version}providedorg.apache.flinkflink-table-api-java-bridge${flink.version}providedorg.apache.flinkflink-sql-gateway${flink.version}providedorg.apache.flinkflink-csv${flink.version}providedorg.apache.flinkflink-json${flink.version}providedorg.apache.flinkflink-table-planner_2.12${flink.version}providedorg.apache.flinkflink-table-api-java-uber${flink.version}providedorg.apache.flinkflink-table-runtime${flink.version}providedorg.apache.flinkflink-connector-jdbc3.1.0-1.17mysqlmysql-connector-java5.1.38org.apache.flinkflink-connector-hive_2.121.17.0org.apache.hivehive-exec3.1.2org.apache.flinkflink-connector-kafka${flink.version}org.apache.flinkflink-sql-connector-kafka${flink.version}providedorg.apache.commonscommons-compress1.24.0org.projectlomboklombok1.18.2

              2、将表转换成 DataStream的数据类型到 Table Schema 的映射

              Table 可以被转换成 DataStream。 通过这种方式,定制的 DataStream 程序就可以在 Table API 或者 SQL 的查询结果上运行了。

              将 Table 转换为 DataStream 时,你需要指定生成的 DataStream 的数据类型,即,Table 的每行数据要转换成的数据类型。 通常最方便的选择是转换成 Row 。 以下列表概述了不同选项的功能:

              • Row: 字段按位置映射,字段数量任意,支持 null 值,无类型安全(type-safe)检查。
              • POJO: 字段按名称映射(POJO 必须按Table 中字段名称命名),字段数量任意,支持 null 值,无类型安全检查。
              • Case Class: 字段按位置映射,不支持 null 值,有类型安全检查。
              • Tuple: 字段按位置映射,字段数量少于 22(Scala)或者 25(Java),不支持 null 值,无类型安全检查。
              • Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。

                Flink 的 DataStream API 支持多样的数据类型。 例如 Tuple(Scala 内置,Flink Java tuple 和 Python tuples)、POJO 类型、Scala case class 类型以及 Flink 的 Row 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。其他类型被视为原子类型。下面,我们讨论 Table API 如何将这些数据类型类型转换为内部 row 表示形式,并提供将 DataStream 转换成 Table 的样例。

                1)、映射方式及示例

                数据类型到 table schema 的映射有两种方式:基于字段位置或基于字段名称。

                1、基于位置映射介绍及示例

                基于位置的映射可在保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于具有特定的字段顺序的复合数据类型以及原子类型。如 tuple、row 以及 case class 这些复合数据类型都有这样的字段顺序。然而,POJO 类型的字段则必须通过名称映射。可以将字段投影出来,但不能使用as(Java 和 Scala) 或者 alias(Python)重命名。

                定义基于位置的映射时,输入数据类型中一定不能存在指定的名称,否则 API 会假定应该基于字段名称进行映射。如果未指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。

                import static org.apache.flink.table.api.Expressions.$;
                import org.apache.flink.api.java.tuple.Tuple2;
                import org.apache.flink.streaming.api.datastream.DataStream;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import org.apache.flink.table.api.Table;
                import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                ......
                	public static void testDataStreamToTableByPosition() throws Exception {// 1、创建运行环境
                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                		DataStream> dataStream2 = env.fromElements(Tuple2.of("alan", 18), Tuple2.of("alanchan", 19), Tuple2.of("alanchanchn", 20), Tuple2.of("alan", 20));
                		Table table = tenv.fromDataStream(dataStream2, $("name"));
                		table.execute().print();
                //		+----+--------------------------------+
                //		| op |                           name |
                //		+----+--------------------------------+
                //		| +I |                           alan |
                //		| +I |                       alanchan |
                //		| +I |                    alanchanchn |
                //		| +I |                           alan |
                //		+----+--------------------------------+
                //		4 rows in set
                		
                		Table table2 = tenv.fromDataStream(dataStream2, $("name"), $("age"));
                		table2.execute().print();
                //		+----+--------------------------------+-------------+
                //		| op |                           name |         age |
                //		+----+--------------------------------+-------------+
                //		| +I |                           alan |          18 |
                //		| +I |                       alanchan |          19 |
                //		| +I |                    alanchanchn |          20 |
                //		| +I |                           alan |          20 |
                //		+----+--------------------------------+-------------+
                //		4 rows in set
                		
                		env.execute();
                	}
                
                2、基于字段名称介绍及示例

                基于名称的映射适用于任何数据类型包括 POJO 类型。这是定义 table schema 映射最灵活的方式。映射中的所有字段均按名称引用,并且可以通过 as 重命名。字段可以被重新排序和映射。

                若果没有指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。

                import static org.apache.flink.table.api.Expressions.$;
                import org.apache.flink.api.java.tuple.Tuple2;
                import org.apache.flink.streaming.api.datastream.DataStream;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import org.apache.flink.table.api.Table;
                import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                ..............
                	public static void testDataStreamToTableByName() throws Exception {// 1、创建运行环境
                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                		DataStream> dataStream = env.fromElements(Tuple2.of("alan", 18), Tuple2.of("alanchan", 19), Tuple2.of("alanchanchn", 20), Tuple2.of("alan", 20));
                		
                		// convert DataStream into Table with field "f1" only
                		Table table = tenv.fromDataStream(dataStream, $("f1"));
                		table.execute().print();
                //		+----+-------------+
                //		| op |          f1 |
                //		+----+-------------+
                //		| +I |          18 |
                //		| +I |          19 |
                //		| +I |          20 |
                //		| +I |          20 |
                //		+----+-------------+
                //		4 rows in set
                		
                		// convert DataStream into Table with swapped fields
                		Table table2 = tenv.fromDataStream(dataStream, $("f1"), $("f0"));
                		table2.execute().print();
                //		+----+-------------+--------------------------------+
                //		| op |          f1 |                             f0 |
                //		+----+-------------+--------------------------------+
                //		| +I |          18 |                           alan |
                //		| +I |          19 |                       alanchan |
                //		| +I |          20 |                    alanchanchn |
                //		| +I |          20 |                           alan |
                //		+----+-------------+--------------------------------+
                //		4 rows in set
                		
                		// convert DataStream into Table with swapped fields and field names "name" and "age"
                		Table table3 = tenv.fromDataStream(dataStream, $("f1").as("name"), $("f0").as("age"));
                		table3.execute().print();
                //		+----+-------------+--------------------------------+
                //		| op |        name |                            age |
                //		+----+-------------+--------------------------------+
                //		| +I |          18 |                           alan |
                //		| +I |          19 |                       alanchan |
                //		| +I |          20 |                    alanchanchn |
                //		| +I |          20 |                           alan |
                //		+----+-------------+--------------------------------+
                //		4 rows in set
                		
                		env.execute();
                	}
                

                2)、数据类型到Table Schema 的映射及示例

                1、原子类型映射介绍及示例

                Flink 将基础数据类型(Integer、Double、String)或者通用数据类型(不可再拆分的数据类型)视为原子类型。 原子类型的 DataStream 会被转换成只有一条属性的 Table。 属性的数据类型可以由原子类型推断出,还可以重新命名属性。

                import static org.apache.flink.table.api.Expressions.$;
                import org.apache.flink.api.java.tuple.Tuple2;
                import org.apache.flink.streaming.api.datastream.DataStream;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import org.apache.flink.table.api.Table;
                import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                ..................
                	public static void test1() throws Exception {// 1、创建运行环境
                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                		DataStream dataStream = env.fromElements("alan", "alanchan", "alanchanchn");
                		
                		// Convert DataStream into Table with field name "myName"
                		Table table = tenv.fromDataStream(dataStream, $("myName"));
                		table.execute().print();
                //		+----+--------------------------------+
                //		| op |                         myName |
                //		+----+--------------------------------+
                //		| +I |                           alan |
                //		| +I |                       alanchan |
                //		| +I |                    alanchanchn |
                //		+----+--------------------------------+
                //		3 rows in set
                		
                		env.execute();
                	}
                
                2、Tuple类型和 Case Class类型映射介绍及示例

                Flink 支持 Scala 的内置 tuple 类型并给 Java 提供自己的 tuple 类型。 两种 tuple 的 DataStream 都能被转换成表。 可以通过提供所有字段名称来重命名字段(基于位置映射)。 如果没有指明任何字段名称,则会使用默认的字段名称。 如果引用了原始字段名称(对于 Flink tuple 为f0、f1 … …),则 API 会假定映射是基于名称的而不是基于位置的。 基于名称的映射可以通过 as 对字段和投影进行重新排序。

                import static org.apache.flink.table.api.Expressions.$;
                import org.apache.flink.api.java.tuple.Tuple2;
                import org.apache.flink.streaming.api.datastream.DataStream;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import org.apache.flink.table.api.Table;
                import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                /**
                 * @author alanchan
                 *
                 */
                public class TestLegacyConversionDataStreamAndTableDemo2 {public static void testDataStreamToTableByPosition() throws Exception {// 1、创建运行环境
                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                		DataStream> dataStream2 = env.fromElements(Tuple2.of("alan", 18), Tuple2.of("alanchan", 19), Tuple2.of("alanchanchn", 20), Tuple2.of("alan", 20));
                		Table table = tenv.fromDataStream(dataStream2, $("name"));
                //		table.execute().print();
                //		+----+--------------------------------+
                //		| op |                           name |
                //		+----+--------------------------------+
                //		| +I |                           alan |
                //		| +I |                       alanchan |
                //		| +I |                    alanchanchn |
                //		| +I |                           alan |
                //		+----+--------------------------------+
                //		4 rows in set
                		Table table2 = tenv.fromDataStream(dataStream2, $("name"), $("age"));
                		table2.execute().print();
                //		+----+--------------------------------+-------------+
                //		| op |                           name |         age |
                //		+----+--------------------------------+-------------+
                //		| +I |                           alan |          18 |
                //		| +I |                       alanchan |          19 |
                //		| +I |                    alanchanchn |          20 |
                //		| +I |                           alan |          20 |
                //		+----+--------------------------------+-------------+
                //		4 rows in set
                		env.execute();
                	}
                	public static void testDataStreamToTableByName() throws Exception {// 1、创建运行环境
                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                		DataStream> dataStream = env.fromElements(Tuple2.of("alan", 18), Tuple2.of("alanchan", 19), Tuple2.of("alanchanchn", 20), Tuple2.of("alan", 20));
                		// convert DataStream into Table with field "f1" only
                		Table table = tenv.fromDataStream(dataStream, $("f1"));
                		table.execute().print();
                //		+----+-------------+
                //		| op |          f1 |
                //		+----+-------------+
                //		| +I |          18 |
                //		| +I |          19 |
                //		| +I |          20 |
                //		| +I |          20 |
                //		+----+-------------+
                //		4 rows in set
                		// convert DataStream into Table with swapped fields
                		Table table2 = tenv.fromDataStream(dataStream, $("f1"), $("f0"));
                		table2.execute().print();
                //		+----+-------------+--------------------------------+
                //		| op |          f1 |                             f0 |
                //		+----+-------------+--------------------------------+
                //		| +I |          18 |                           alan |
                //		| +I |          19 |                       alanchan |
                //		| +I |          20 |                    alanchanchn |
                //		| +I |          20 |                           alan |
                //		+----+-------------+--------------------------------+
                //		4 rows in set
                		// convert DataStream into Table with swapped fields and field names "name" and
                		// "age"
                		Table table3 = tenv.fromDataStream(dataStream, $("f1").as("name"), $("f0").as("age"));
                		table3.execute().print();
                //		+----+-------------+--------------------------------+
                //		| op |        name |                            age |
                //		+----+-------------+--------------------------------+
                //		| +I |          18 |                           alan |
                //		| +I |          19 |                       alanchan |
                //		| +I |          20 |                    alanchanchn |
                //		| +I |          20 |                           alan |
                //		+----+-------------+--------------------------------+
                //		4 rows in set
                		env.execute();
                	}
                	public static void main(String[] args) throws Exception {testDataStreamToTableByPosition();
                		testDataStreamToTableByName();
                	}
                }
                
                3、POJO 类型映射介绍及示例

                Flink 支持 POJO 类型作为复合类型。

                在不指定字段名称的情况下将 POJO 类型的 DataStream 转换成 Table 时,将使用原始 POJO 类型字段的名称。名称映射需要原始名称,并且不能按位置进行。字段可以使用别名(带有 as 关键字)来重命名,重新排序和投影。

                import static org.apache.flink.table.api.Expressions.$;
                import java.time.Instant;
                import org.apache.flink.api.java.tuple.Tuple2;
                import org.apache.flink.streaming.api.datastream.DataStream;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import org.apache.flink.table.api.Table;
                import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                import lombok.AllArgsConstructor;
                import lombok.Data;
                import lombok.NoArgsConstructor;
                ...............
                	@NoArgsConstructor
                	@AllArgsConstructor
                	@Data
                	public static class User {public String name;
                		public Integer age;
                		public Instant event_time;
                	}
                	
                	public static void test2() throws Exception {// 1、创建运行环境
                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                		
                		// 2、创建数据源
                		DataStream dataStream =
                			    env.fromElements(
                			        new User("alan", 4, Instant.ofEpochMilli(1000)),
                			        new User("alanchan", 6, Instant.ofEpochMilli(1001)),
                			        new User("alanchanchn", 10, Instant.ofEpochMilli(1002)));
                		
                		// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
                		Table table = tenv.fromDataStream(dataStream, $("age").as("myAge"), $("name").as("myName"),$("event_time").as("eventTime"));
                //		table.execute().print();
                //		+----+-------------+--------------------------------+-------------------------+
                //		| op |       myAge |                         myName |               eventTime |
                //		+----+-------------+--------------------------------+-------------------------+
                //		| +I |           4 |                           alan | 1970-01-01 08:00:01.000 |
                //		| +I |           6 |                       alanchan | 1970-01-01 08:00:01.001 |
                //		| +I |          10 |                    alanchanchn | 1970-01-01 08:00:01.002 |
                //		+----+-------------+--------------------------------+-------------------------+
                //		3 rows in set
                		
                		// convert DataStream into Table with projected field "name" (name-based)
                		Table table2 = tenv.fromDataStream(dataStream, $("name"));
                		table2.execute().print();
                //		+----+--------------------------------+
                //		| op |                           name |
                //		+----+--------------------------------+
                //		| +I |                           alan |
                //		| +I |                       alanchan |
                //		| +I |                    alanchanchn |
                //		+----+--------------------------------+
                //		3 rows in set
                		
                		// convert DataStream into Table with projected and renamed field "myName" (name-based)
                		Table table3 = tenv.fromDataStream(dataStream, $("name").as("myName"));
                		table3.execute().print();
                //		+----+--------------------------------+
                //		| op |                         myName |
                //		+----+--------------------------------+
                //		| +I |                           alan |
                //		| +I |                       alanchan |
                //		| +I |                    alanchanchn |
                //		+----+--------------------------------+
                //		3 rows in set
                		
                		env.execute();
                	}
                
                4、Row类型映射介绍及示例

                Row 类型支持任意数量的字段以及具有 null 值的字段。字段名称可以通过 RowTypeInfo 指定,也可以在将 Row 的 DataStream 转换为 Table 时指定。 Row 类型的字段映射支持基于名称和基于位置两种方式。 字段可以通过提供所有字段的名称的方式重命名(基于位置映射)或者分别选择进行投影/排序/重命名(基于名称映射)。

                官方示例好像有些错误,如果定义的是Row类型,在转换的时候,$(“name”).as(“myName”)是会报错的,因为row的字段名称只有f0、f1,所以不会有name。

                import static org.apache.flink.table.api.Expressions.$;
                import java.time.Instant;
                import org.apache.flink.api.java.tuple.Tuple2;
                import org.apache.flink.streaming.api.datastream.DataStream;
                import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import org.apache.flink.table.api.Table;
                import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                import org.apache.flink.types.Row;
                import lombok.AllArgsConstructor;
                import lombok.Data;
                import lombok.NoArgsConstructor;
                .....
                	public static void test3() throws Exception {// 1、创建运行环境
                		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                		DataStream dataStream = env.fromElements(Row.of("alan", 18), Row.of("alanchan", 19), Row.of("alanchanchn", 20), Row.of("alan", 20));
                		// Convert DataStream into Table with renamed field names "myName", "myAge"
                		// (position-based)
                		Table table = tenv.fromDataStream(dataStream, $("myName"), $("myAge"));
                //		table.execute().print();
                //		+----+--------------------------------+-------------+
                //		| op |                         myName |       myAge |
                //		+----+--------------------------------+-------------+
                //		| +I |                           alan |          18 |
                //		| +I |                       alanchan |          19 |
                //		| +I |                    alanchanchn |          20 |
                //		| +I |                           alan |          20 |
                //		+----+--------------------------------+-------------+
                //		4 rows in set
                		
                		// Convert DataStream into Table with renamed fields "myName", "myAge"
                		// (name-based)
                		Table table2 = tenv.fromDataStream(dataStream, $("f0").as("myName"), $("f1").as("myAge"));
                		table2.execute().print();
                //		+----+--------------------------------+-------------+
                //		| op |                         myName |       myAge |
                //		+----+--------------------------------+-------------+
                //		| +I |                           alan |          18 |
                //		| +I |                       alanchan |          19 |
                //		| +I |                    alanchanchn |          20 |
                //		| +I |                           alan |          20 |
                //		+----+--------------------------------+-------------+
                //		4 rows in set
                		
                		// Convert DataStream into Table with projected field "name" (name-based)
                		Table table3 = tenv.fromDataStream(dataStream, $("name"));
                //		table3.execute().print();
                //		+----+--------------------------------+
                //		| op |                           name |
                //		+----+--------------------------------+
                //		| +I |                           alan |
                //		| +I |                       alanchan |
                //		| +I |                    alanchanchn |
                //		| +I |                           alan |
                //		+----+--------------------------------+
                //		4 rows in set
                		
                		// Convert DataStream into Table with projected and renamed field "myName"
                		// (name-based)
                		Table table4 = tenv.fromDataStream(dataStream, $("f0").as("myName"));
                		table4.execute().print();
                //		+----+--------------------------------+
                //		| op |                         myName |
                //		+----+--------------------------------+
                //		| +I |                           alan |
                //		| +I |                       alanchan |
                //		| +I |                    alanchanchn |
                //		| +I |                           alan |
                //		+----+--------------------------------+
                //		4 rows in set
                		
                		env.execute();
                	}
                

                以上,本文介绍了DataStream 的数据类型到Table Schema 的映射方式及类型示例。