【大数据】Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

    😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

    Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function

    • 1.Lookup Join(维表 Join)
    • 2.Array Expansion(数组列转行)
    • 3.Table Function(自定义列转行)

      1.Lookup Join(维表 Join)

      Lookup Join 定义(支持 Batch / Streaming):Lookup Join 其实就是维表 Join,比如拿离线数仓来说,常常会有用户画像,设备画像等数据,而对应到实时数仓场景中,这种实时获取外部缓存的 Join 就叫做维表 Join。

      应用场景:小伙伴萌会问,我们既然已经有了上面介绍的 Regular Join,Interval Join 等,为啥还需要一种 Lookup Join?因为上面说的这几种 Join 都是 流与流之间的 Join,而 Lookup Join 是流与 Redis,MySQL,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,而实时的画像数据一般都是存储在 Redis,MySQL,HBase 中,这就是 Lookup Join 的由来。

      实际案例:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。

      • 曝光用户日志流(show_log)数据(数据存储在 Kafka 中)
        log_id  timestamp            user_id
        1       2021-11-01 00:01:03  a
        2       2021-11-01 00:03:00  b
        3       2021-11-01 00:05:00  c
        4       2021-11-01 00:06:00  b
        5       2021-11-01 00:07:00  c
        
        • 用户画像维表(user_profile)数据(数据存储在 Redis 中)
          user_id(主键)   age     sex
          a               12-18   男
          b               18-24   女
          c               18-24   男
          

          注意:Redis 中的数据结构存储是按照 Key-Value 去存储的。其中 Key 为 user_id,Value 为 age,sex 的 JSON。

          具体 SQL:

          CREATE TABLE show_log (
              log_id BIGINT,
              `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
              user_id STRING,
              proctime AS PROCTIME()
          )
          WITH (
            'connector' = 'datagen',
            'rows-per-second' = '10',
            'fields.user_id.length' = '1',
            'fields.log_id.min' = '1',
            'fields.log_id.max' = '10'
          );
          CREATE TABLE user_profile (
              user_id STRING,
              age STRING,
              sex STRING
              ) WITH (
            'connector' = 'redis',
            'hostname' = '127.0.0.1',
            'port' = '6379',
            'format' = 'json',
            'lookup.cache.max-rows' = '500',
            'lookup.cache.ttl' = '3600',
            'lookup.max-retries' = '1'
          );
          CREATE TABLE sink_table (
              log_id BIGINT,
              `timestamp` TIMESTAMP(3),
              user_id STRING,
              proctime TIMESTAMP(3),
              age STRING,
              sex STRING
          ) WITH (
            'connector' = 'print'
          );
          -- lookup join 的 query 逻辑
          INSERT INTO sink_table
          SELECT 
              s.log_id as log_id, 
              s.`timestamp` as `timestamp`, 
              s.user_id as user_id, 
              s.proctime as proctime, 
              u.sex as sex, 
              u.age as age
          FROM show_log AS s
          LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
          ON s.user_id = u.user_id
          

          输出数据如下:

          log_id  timestamp            user_id  age     sex
          1       2021-11-01 00:01:03  a        12-18   男
          2       2021-11-01 00:03:00  b        18-24   女
          3       2021-11-01 00:05:00  c        18-24   男
          4       2021-11-01 00:06:00  b        18-24   女
          5       2021-11-01 00:07:00  c        18-24   男
          

          注意:实时的 Lookup 维表关联能使用 处理时间 去做关联。

          • 同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 为 1 1 1 的数据在 08 : 00 08:00 08:00 时 age 由 12-18 变为了 18-24,那么当我们的任务在 08 : 01 08:01 08:01 failover 之后从 07 : 59 07:59 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24 的 age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。
          • 会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据。

            再说说维表常见的性能问题及优化思路。

            所有的维表性能问题都可以总结为:高 QPS 下访问维表存储引擎产生的任务背压,数据产出延迟问题

            举个例子:

            • 在没有使用维表的情况下:一条数据从输入 Flink 任务到输出 Flink 任务的时延假如为 0.1   m s 0.1\ ms 0.1 ms,那么并行度为 1 1 1 的任务的吞吐可以达到 1   q u e r y   /   0.1   m s = 10000   q p s 1\ query\ /\ 0.1\ ms = 10000\ qps 1 query / 0.1 ms=10000 qps。
            • 在使用维表之后:每条数据访问维表的外部存储的时长为 2   m s 2\ ms 2 ms,那么一条数据从输入 Flink 任务到输出 Flink 任务的时延就会变成 2.1   m s 2.1\ ms 2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到 1   q u e r y   /   2.1   m s = 476   q p s 1\ query\ /\ 2.1\ ms = 476\ qps 1 query / 2.1 ms=476 qps。两者的吞吐量相差 21 21 21 倍。

              这就是为什么维表 Join 的算子会产生背压,任务产出会延迟。

              那么当然,解决方案也是有很多的。抛开 Flink SQL 想一下,如果我们使用 DataStream API,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:

              • 1️⃣ 按照 Redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过的 local cache 即可。这样就可以把访问外部存储 2.1   m s 2.1\ ms 2.1 ms 处理一个 Query 变为访问内存的 0.1   m s 0.1\ ms 0.1 ms 处理一个 Query 的时长。
              • 2️⃣ 异步访问外存:DataStream API 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1   m s 2.1\ ms 2.1 ms 处理 1 1 1 个 Query 变为 2.1   m s 2.1\ ms 2.1 ms 处理 10 10 10 个 Query。吞吐可变优化到 10   q u e r y   /   2.1   m s = 4761   q p s 10\ query\ /\ 2.1\ ms = 4761\ qps 10 query / 2.1 ms=4761 qps。
              • 3️⃣ 批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 Redis 维表的 1 1 1 Query 占用 2.1   m s 2.1\ ms 2.1 ms 时长中,其中可能有 2   m s 2\ ms 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1   m s 0.1\ ms 0.1 ms 是 Redis Server 处理请求的时长。那么我们就可以使用 Redis 提供的 pipeline 能力,在客户端(也就是 Flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 Redis Sever。这样就可以把 2.1   m s 2.1\ ms 2.1 ms 处理 1 1 1 个 Query 变为 7   m s = 2   m s + 50 ∗ 0.1   m s 7\ ms=2\ ms + 50 * 0.1\ ms 7 ms=2 ms+50∗0.1 ms 处理 50 50 50 个 Query。吞吐可变为 50   q u e r y   /   7   m s = 7143   q p s 50\ query\ /\ 7\ ms = 7143\ qps 50 query / 7 ms=7143 qps。

                博主认为上述优化效果中,最好用的是 1️⃣ + 3️⃣,2️⃣ 相比 3️⃣ 还是一条一条发请求,性能会差一些。

                既然 DataStream 可以这样做,Flink SQL 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作

                • 1️⃣ 按照 Redis 维表的 key 分桶 + local cache:SQL 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf(user defined aggregation function)中做访问 Redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 Redis 压力。在博主实现的 Redis Connector 中,内置了 local cache 的实现。
                • 2️⃣ 异步访问外存:目前博主实现的 Redis Connector 不支持异步访问,但是官方实现的 HBase Connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
                • 3️⃣ 批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 Redis 的批量访问外存优化的功能。

                  2.Array Expansion(数组列转行)

                  应用场景(支持 Batch / Streaming):将表中 ARRAY 类型字段(列)拍平,转为多行。

                  实际案例:比如某些场景下,日志是合并、攒批上报的,就可以使用这种方式将一个 Array 转为多行。

                  CREATE TABLE show_log_table (
                      log_id BIGINT,
                      show_params ARRAY) WITH (
                    'connector' = 'datagen',
                    'rows-per-second' = '1',
                    'fields.log_id.min' = '1',
                    'fields.log_id.max' = '10'
                  );
                  CREATE TABLE sink_table (
                      log_id BIGINT,
                      show_param STRING
                  ) WITH (
                    'connector' = 'print'
                  );
                  INSERT INTO sink_table
                  SELECT
                      log_id,
                      t.show_param as show_param
                  FROM show_log_table
                  -- array 炸开语法
                  CROSS JOIN UNNEST(show_params) AS t (show_param)
                  

                  show_log_table 原始数据:

                  +I[7, [a, b, c]]
                  +I[5, [d, e, f]]
                  

                  输出结果如下所示:

                  -- +I[7, [a, b, c]] 一行转为 3 行
                  +I[7, a]
                  +I[7, b]
                  +I[7, b]
                  -- +I[5, [d, e, f]] 一行转为 3 行
                  +I[5, d]
                  +I[5, e]
                  +I[5, f]
                  

                  3.Table Function(自定义列转行)

                  应用场景(支持 Batch / Streaming):这个其实和 Array Expansion 功能类似,但是 Table Function 本质上是个 UDTF 函数,和离线 Hive SQL 一样,我们可以自定义 UDTF 去决定列转行的逻辑。

                  Table Function 使用分类:

                  • Inner Join Table Function:如果 UDTF 返回结果为空,则相当于 1 1 1 行转为 0 0 0 行,这行数据直接被丢弃。
                  • Left Join Table Function:如果 UDTF 返回结果为空,折行数据不会被丢弃,只会在结果中填充 null 值。
                    public class TableFunctionInnerJoin_Test { public static void main(String[] args) throws Exception { FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
                            String sql = "CREATE FUNCTION user_profile_table_func AS 'flink.examples.sql._07.query._06_joins._06_table_function"
                                    + "._01_inner_join.TableFunctionInnerJoin_Test$UserProfileTableFunction';\n"
                                    + "\n"
                                    + "CREATE TABLE source_table (\n"
                                    + "    user_id BIGINT NOT NULL,\n"
                                    + "    name STRING,\n"
                                    + "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n"
                                    + "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"
                                    + ") WITH (\n"
                                    + "  'connector' = 'datagen',\n"
                                    + "  'rows-per-second' = '10',\n"
                                    + "  'fields.name.length' = '1',\n"
                                    + "  'fields.user_id.min' = '1',\n"
                                    + "  'fields.user_id.max' = '10'\n"
                                    + ");\n"
                                    + "\n"
                                    + "CREATE TABLE sink_table (\n"
                                    + "    user_id BIGINT,\n"
                                    + "    name STRING,\n"
                                    + "    age INT,\n"
                                    + "    row_time TIMESTAMP(3)\n"
                                    + ") WITH (\n"
                                    + "  'connector' = 'print'\n"
                                    + ");\n"
                                    + "\n"
                                    + "INSERT INTO sink_table\n"
                                    + "SELECT user_id,\n"
                                    + "       name,\n"
                                    + "       age,\n"
                                    + "       row_time\n"
                                    + "FROM source_table,\n"
                                    // Table Function Join 语法对应 LATERAL TABLE
                                    + "LATERAL TABLE(user_profile_table_func(user_id)) t(age)";
                            Arrays.stream(sql.split(";"))
                                    .forEach(flinkEnv.streamTEnv()::executeSql);
                        }
                        public static class UserProfileTableFunction extends TableFunction { public void eval(long userId) { // 自定义输出逻辑
                                if (userId <= 5) { // 一行转 1 行
                                    collect(1);
                                } else { // 一行转 3 行
                                    collect(1);
                                    collect(2);
                                    collect(3);
                                }
                            }
                        }
                    }
                    

                    执行结果如下:

                    -- userId <= 5,则只有 1 行结果
                    +I[3, 7, 1, 2021-05-01T18:23:42.560]
                    -- userId > 5,则有行 3 结果
                    +I[8, e, 1, 2021-05-01T18:23:42.560]
                    +I[8, e, 2, 2021-05-01T18:23:42.560]
                    +I[8, e, 3, 2021-05-01T18:23:42.560]
                    -- userId <= 5,则只有 1 行结果
                    +I[4, 9, 1, 2021-05-01T18:23:42.561]
                    -- userId > 5,则有行 3 结果
                    +I[8, c, 1, 2021-05-01T18:23:42.561]
                    +I[8, c, 2, 2021-05-01T18:23:42.561]
                    +I[8, c, 3, 2021-05-01T18:23:42.561]