Flink SQL无疑是实时数仓领域一个最耀眼的明星,他对于统一流批一体的设计可谓是居功至伟。鉴于Flink SQL在实时数仓领域的卓越表现,我们很有必要对Flink SQL在ETL场景下的表现要有深刻的理解。本文聚焦于Flink SQL UDF使用场景下由于SQL重写导致UDF翻倍调用的原理分析及对应的应对策略。
一 场景复现
这里通过一个案例来说明问题,本次依然采用《网上书店项目实时数仓学习模拟数据源》案例进行问题说明。数据源更多的说明请参考笔者对应的博客。
-
这里以实时维表关联为例,首先来看UDF定义
private static JedisUtil jedis; @Override public void open(FunctionContext context) throws Exception { super.open(context); jedis = JedisUtil.getInstance(); } /** * Flink 维表查询 UDF 定义 * * @param nkey * @param timestamp */ public String eval(String nkey,Long timestamp) { // 1 构造Redis Sorted set集合key String key = "dim_data:"+nkey; // 2 从Sorted set集合获取key对应的版本数据集合 Set
values = jedis.SORTSETS.zrange(key,0,-1); // 3 获取对应的版本数据 String versionKey = null; for(String value:values){ String[] split = value.split(":"); if(Long.valueOf(split[2]) > timestamp){ break; }else { versionKey = value; } } // 4 构造hash对应的key String hashKey = null; if(versionKey != null){ hashKey = "versions:"+versionKey; } // 5 获取版本数据 Map versionValue = jedis.HASHS.hgetAll(hashKey); // 6 返回数据 String result = JSONUtils.getJSONObjectFromMap(versionValue).toJSONString(); // *** 这里的打印条件用来跟踪UDF被调用次数 *** System.out.println("Log => " + System.currentTimeMillis() + " : " + result); return result; } 这个UDF支持订单明细数据源从Redis中查询对应的图书维度数据,其中System.out.println("Log => " + System.currentTimeMillis() + " : " + result);用来跟踪UDF被调用次数,理论上每调用一次UDF都会打印一条日志记录。
-
Flink SQL主程序代码如下
public static void redisUDFDimQueryWithSubQueryDemo(){ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment); executionEnvironment.setParallelism(1); tableEnvironment.getConfig().set("table.exec.source.idle-timeout","3s"); // 注册UDF tableEnvironment.createTemporarySystemFunction("dim_product_with_versions", DimProductWithVersionsForRedis.class); // 定义订单明细数据源 tableEnvironment.executeSql("create table tbl_order_detail(\n" + " order_book_id int comment '订单明细ID',\n" + " order_id int comment '订单ID',\n" + " book_id int comment '图书ID',\n" + " book_number int comment '图书下单数量',\n" + " original_price double comment '原始交易额',\n" + " actual_price double comment '实付交易额',\n" + " discount_price double comment '折扣金额',\n" + " create_time string comment '下单时间',\n" + " update_time bigint comment '更新时间戳'\n" + ")with(\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'tbl_order_detail',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + " );"); // 打印SQL执行计划 String explainSql = tableEnvironment.explainSql("select\n" + "\n" + " order_book_id,\n" + " order_id,\n" + " book_id,\n" + " json_value(dim_product,'$.price') as price,\n" + " json_value(dim_product,'$.book_name') as book_name\n" + "\n" + "from (\n" + " select\n" + "\n" + " order_book_id as order_book_id,\n" + " order_id as order_id,\n" + " book_id as book_id,\n" + " book_number as book_number,\n" + " original_price as original_price,\n" + " actual_price as actual_price,\n" + " discount_price as discount_price,\n" + " create_time as create_time,\n" + " update_time as update_time,\n" + " dim_product_with_versions(concat('dim_book:',cast(book_id as string)),cast(update_time as bigint)) as dim_product\n" + "\n" + " from tbl_order_detail\n" + ") tmp\n" + "where json_value(dim_product,'$.price') > 5\n" + ";"); System.out.println("SQL Explain Plan: "); System.out.println(explainSql); // 定义ETL逻辑,支持订单明细数据源每一条数据查询Redis维表获取对应的维度信息 tableEnvironment.executeSql("select\n" + "\n" + " order_book_id,\n" + " order_id,\n" + " book_id,\n" + " json_value(dim_product,'$.price') as price,\n" + " json_value(dim_product,'$.book_name') as book_name\n" + "\n" &