FlinkSQL优化器查询重写技术引发UDF翻倍调用问题分析及解决方案

​ Flink SQL无疑是实时数仓领域一个最耀眼的明星,他对于统一流批一体的设计可谓是居功至伟。鉴于Flink SQL在实时数仓领域的卓越表现,我们很有必要对Flink SQL在ETL场景下的表现要有深刻的理解。本文聚焦于Flink SQL UDF使用场景下由于SQL重写导致UDF翻倍调用的原理分析及对应的应对策略。

一 场景复现

​ 这里通过一个案例来说明问题,本次依然采用《网上书店项目实时数仓学习模拟数据源》案例进行问题说明。数据源更多的说明请参考笔者对应的博客。

  1. 这里以实时维表关联为例,首先来看UDF定义

    private static JedisUtil jedis;
    @Override
    public void open(FunctionContext context) throws Exception {  super.open(context);
        jedis = JedisUtil.getInstance();
    }
    /**
     * Flink 维表查询 UDF 定义
     *
     * @param nkey
     * @param timestamp
     */
    public String eval(String nkey,Long timestamp) {  // 1 构造Redis Sorted set集合key
        String key = "dim_data:"+nkey;
        // 2 从Sorted set集合获取key对应的版本数据集合
        Set values = jedis.SORTSETS.zrange(key,0,-1);
        // 3 获取对应的版本数据
        String versionKey = null;
        for(String value:values){  String[] split = value.split(":");
            if(Long.valueOf(split[2]) > timestamp){  break;
            }else {  versionKey = value;
            }
        }
        // 4 构造hash对应的key
        String hashKey = null;
        if(versionKey != null){  hashKey = "versions:"+versionKey;
        }
        // 5 获取版本数据
        Map versionValue = jedis.HASHS.hgetAll(hashKey);
        // 6 返回数据
        String result = JSONUtils.getJSONObjectFromMap(versionValue).toJSONString();
        // *** 这里的打印条件用来跟踪UDF被调用次数 *** 
        System.out.println("Log => " + System.currentTimeMillis() + " : " + result);
        return result;
    }
    

    这个UDF支持订单明细数据源从Redis中查询对应的图书维度数据,其中System.out.println("Log => " + System.currentTimeMillis() + " : " + result);用来跟踪UDF被调用次数,理论上每调用一次UDF都会打印一条日志记录。

  2. Flink SQL主程序代码如下

    public static void redisUDFDimQueryWithSubQueryDemo(){  StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment);
        executionEnvironment.setParallelism(1);
        tableEnvironment.getConfig().set("table.exec.source.idle-timeout","3s");
        // 注册UDF
        tableEnvironment.createTemporarySystemFunction("dim_product_with_versions", DimProductWithVersionsForRedis.class);
        // 定义订单明细数据源
        tableEnvironment.executeSql("create table tbl_order_detail(\n" +
                "    order_book_id           int            comment '订单明细ID',\n" +
                "    order_id                int            comment '订单ID',\n" +
                "    book_id                 int            comment '图书ID',\n" +
                "    book_number             int            comment '图书下单数量',\n" +
                "    original_price          double         comment '原始交易额',\n" +
                "    actual_price            double         comment '实付交易额',\n" +
                "    discount_price          double         comment '折扣金额',\n" +
                "    create_time             string         comment '下单时间',\n" +
                "    update_time             bigint         comment '更新时间戳'\n" +
                ")with(\n" +
                "     'connector' = 'kafka',\n" +
                "     'topic' = 'tbl_order_detail',\n" +
                "     'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "     'properties.group.id' = 'testGroup',\n" +
                "     'scan.startup.mode' = 'latest-offset',\n" +
                "     'format' = 'json',\n" +
                "     'json.fail-on-missing-field' = 'false',\n" +
                "     'json.ignore-parse-errors' = 'true'\n" +
                " );");
        // 打印SQL执行计划 
    	String explainSql = tableEnvironment.explainSql("select\n" +
    	        "\n" +
    	        "    order_book_id,\n" +
    	        "    order_id,\n" +
    	        "    book_id,\n" +
    	        "    json_value(dim_product,'$.price') as price,\n" +
    	        "    json_value(dim_product,'$.book_name') as book_name\n" +
    	        "\n" +
    	        "from (\n" +
    	        "    select\n" +
    	        "\n" +
    	        "        order_book_id as order_book_id,\n" +
    	        "        order_id      as order_id,\n" +
    	        "        book_id       as book_id,\n" +
    	        "        book_number    as book_number,\n" +
    	        "        original_price as original_price,\n" +
    	        "        actual_price   as actual_price,\n" +
    	        "        discount_price as discount_price,\n" +
    	        "        create_time    as create_time,\n" +
    	        "        update_time    as update_time,\n" +
    	        "        dim_product_with_versions(concat('dim_book:',cast(book_id as string)),cast(update_time as bigint)) as dim_product\n" +
    	        "\n" +
    	        "    from tbl_order_detail\n" +
    	        ") tmp\n" +
    	        "where json_value(dim_product,'$.price') > 5\n" +
    	        ";");
    	System.out.println("SQL Explain Plan: ");
    	System.out.println(explainSql);
        
        // 定义ETL逻辑,支持订单明细数据源每一条数据查询Redis维表获取对应的维度信息
        tableEnvironment.executeSql("select\n" +
                "\n" +
                "    order_book_id,\n" +
                "    order_id,\n" +
                "    book_id,\n" +
                "    json_value(dim_product,'$.price')     as price,\n" +
                "    json_value(dim_product,'$.book_name') as book_name\n" +
                "\n" &