FlinkSQL学习笔记(一)快速入门

写在前面

本篇作为FlinkSQL的起始篇,主要介绍了FlinkSQL在使用的概述,通过本篇,可以快速上手。需要注意的一点是:FlinkSQL中的表是动态表,这是其特性之一。

1、FlinkSQL概述

FlinkSQL是架构于 flink core 之上用 sql 语义方便快捷地进行结构化数据处理的上层库;(非常类似 sparksql 和 sparkcore 的关系)

1.1、核心工作原理

整体上讲,FlinkSQL的核心工作原理如下:

  1. 将数据流(数据集),绑定元数据(schema)后,注册成catalog中的表(table、view)
  2. 然后由用户通过table API或者table SQL来表达计算逻辑;
  3. 由table-planer利用apache Calcite进行SQL语法解析,绑定元数据得到逻辑执行计划
  4. 再经过optimizer进行优化后,得到物理执行计划
  5. 物理执行计划经过代码生成器生成代码,得到transformation tree
  6. transformation tree转成JobGraph后提交到Flink集群执行

下面给出一个样例对上述过程进行说明:

逻辑执行计划:通过Calcite,将SQL转化为逻辑执行计划。从逻辑执行计划中,可以看出,SQL的语句的执行顺序为from-->join-->where-->select。

查询优化:FlinkSQL中存在两个优化器,RBO(基于规则的优化器)和CBO(基于成本的优化器)。

  • RBO(基于规则的优化器):遍历一系列规则(RelOptRule),只要满足条件就对原来的计划节点(表达式)进行转换或调整位置,生成最终的执行计划。常见的规则包括:
    • 分区裁剪(Partition Prune)、列裁剪
    • 谓词下推(Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit 下推、sort 下推
    • 常量折叠(Constant Folding):比如将1+2折叠为3
    • 子查询内联转 join 等。
      • CBO(基于代价的优化器):会保留原有表达式,基于统计信息和代价模型,尝试探索生成等价关系表达式,最终取代价最小的执行计划。CBO 的实现有两种模型:Volcano 模型和Cascades 模型。这两种模型思想很是相似,不同点在于 Cascades 模型一边遍历 SQL 逻辑树,一边优化,从而进一步裁剪掉一些执行计划。

        根据代价 cost 选择批处理 join 有方式(sortmergejoin,hashjoin,boradcasthashjoin)。

        比如前文中的例子,再 filter 下推之后,在 t2.id<1000 的情况下,由 1 百万数据量变为了 1 千条,计算 cost 之后,使用 broadcasthashjoin 最合适。

        算子树(transformation tree):将物理执行计划中的节点转化为对应的算子,即Flink中的一种Function,通过相应的条件动态生成代码。

        1.2、动态表特性

        与 spark、hive 等组件中的“表”的最大不同之处:FlinkSQL中的表是动态表!

        Flink的核心决定了上述特性:

        • flink 对数据的核心抽象是“无界(或有界)的数据流”
        • 对数据处理过程的核心抽象是“流式持续处理”

          因而,flinksql 对“源表(动态表)”的计算及输出结果(结果表),也是流式、动态、持续的;

          • 数据源的数据是持续输入
          • 查询过程是持续计算
          • 查询结果是持续输出

            以下图为例:

            • “源表 clicks”是流式动态的;
            • “聚合查询的输出结果表”,也是流式动态的

              这其中的动态,不仅体现在“数据追加”,对于输出结果表来说,“动态”还包含对“前序输出结果”的“撤

              回(删除)”、“更新”等模式。它的核心设计是在底层的数据流中为每条数据添加**“ChangeMode(修正模式)标记”**,而添加了这种ChangeMode 标记的底层数据流,取名为 changelogStream

              2、FlinkSQL编程概述

              由于FlinkSQL建立在Flink core的基础之上,这里进行先对一个简单的FlinkSQL编程过程进行说明。

              FlinkSQL编程包括TableAPI和SQLAPI,运用中更多地使用SQLAPI,这里对于TableAPI不做详细介绍,后续用到的时候再进行详细介绍。此外,在编程方式上,两种SQL可以进行混合使用。

              2.1、FlinkSQL程序结构

              导入依赖:

               org.apache.flink flink-table-api-java-bridge_2.12 ${flink.version} org.apache.flink flink-json ${flink.version}

              FlinkSQL编程步骤:

              1. 创建 flinksql 编程入口
              EnvironmentSettings envSettings = EnvironmentSettings
              									.fromConfiguration(new Configuration());
              TableEnvironment tableEnv = TableEnvironment.create(envSettings);
              
              1. 将数据源定义(映射)成表(视图)
              /*
              * 把kafka中的一个topic,映射成一张FlinkSQL表
              * kafka:{"id":1,"name":"zs","age":20,"gender":"male"}
              * */
              tableEnv.executeSql(
                               " CREATE TABLE KafkaTable (										"
                              +	" 	id		int, "
                              +	" 	name	string,                                             "
                              +	" 	age		int, "
                              +	" 	gender	string                                              "
                              +	" ) WITH (       "
                              +	"   'connector' = 'kafka',                                      "
                              +	"   'topic' = 'table_test',                                     "
                              +	"   'properties.bootstrap.servers' = '192.168.247.129:9092',    "
                              +	"   'properties.group.id' = 'testGroup',                        "
                              +	"   'scan.startup.mode' = 'earliest-offset',                    "
                              +	"   'format' = 'json'                                           "
                              +	" )																"
              );
              
              1. 执行 sql 语义的查询(sql 语法或者 tableapi)
               tableEnv.executeSql("select gender,avg(age) as avg_age from KafkaTable group by gender").print();
              
              1. 将查询结果输出到目标表

                注:这里的输出通过print进行输出,已经合并到步骤3中

                输入数据:

              {"id":1,"name":"zs","age":20,"gender":"male"}
              {"id":2,"name":"ls","age":30,"gender":"female"}
              {"id":3,"name":"ww","age":40,"gender":"female"}
              {"id":4,"name":"zl","age":50,"gender":"male"}
              

              输出结果:

              2.2、FlinkSQL,TableAPI方式

              1. 建表
               Table table = tableEnv.from(TableDescriptor
                       .forConnector("kafka")
                       .schema(Schema.newBuilder()
                               .column("id", DataTypes.INT())
                               .column("name", DataTypes.STRING())
                               .column("age", DataTypes.INT())
                               .column("gender", DataTypes.STRING())
                               .build())
                       .format("json")
                       .option("topic", "testTopic")
                       .option("properties.bootstrap.servers", "192.168.247.129:9092")
                       .option("properties.group.id", "testGroup")
                       .option("scan.startup.mode", "earliest-offset")
                       .option("json.fail-on-missing-field", "false")
                       .option("json.ignore-parse-errors", "true")
                       .build());
              
              1. TableAPI
              Table tableApi = table.groupBy($("gender"))
                      .select($("gender"), $("age").avg().as("avg_age", "avg_age_2"));
              
              1. 执行
              tableApi.execute().print();