【flink番外篇】22、通过 Table API 和 SQL Client 操作 Catalog 示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引


    文章目录

    • Flink 系列文章
    • 一、通过 Table API 和 SQL Client 操作 HiveCatalog
      • 1、注册 Catalog
        • 1)、方式一:java实现
        • 2)、方式二:yaml配置
        • 2、修改当前的 Catalog 和数据库
          • 1)、java实现
          • 2)、sql
          • 3、列出可用的 Catalog
            • 1)、java实现
            • 2)、sql
            • 4、列出可用的数据库
              • 1)、java实现
              • 2)、sql
              • 5、列出可用的表
                • 1)、java实现
                • 2)、sql

                  本文以示例展示了sql 和 table api 操作hivecatalog。

                  一、通过 Table API 和 SQL Client 操作 HiveCatalog

                  1、注册 Catalog

                  用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中。

                  以下通过api 和 配置文件注册catalog及配置。

                  1)、方式一:java实现

                  public class TestCreateHiveTable {public static final String tableName = "alan_hivecatalog_hivedb_testTable";
                  	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
                  																					  "  id INT,\n" + 
                  																					  "  name STRING,\n" + 
                  																					  "  age INT" + ") " + 
                  																					  "TBLPROPERTIES (\n" + 
                  																					  "  'sink.partition-commit.delay'='5 s',\n" + 
                  																					  "  'sink.partition-commit.trigger'='partition-time',\n" + 
                  																					  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
                  	/**
                  	 * @param args
                  	 * @throws DatabaseAlreadyExistException
                  	 * @throws CatalogException
                  	 */
                  	public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                  		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
                  		String name = "alan_hive";
                  		// default 数据库名称
                  		String defaultDatabase = "default";
                  		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
                  		tenv.registerCatalog("alan_hive", hiveCatalog);
                  		tenv.useCatalog("alan_hive");
                  		String newDatabaseName = "alan_hivecatalog_hivedb";
                  		tenv.useDatabase(newDatabaseName);
                  		// 创建表
                  		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
                  		tenv.executeSql(hive_create_table_sql);
                  		// 插入数据
                  		String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
                  		tenv.executeSql(insertSQL);
                  		// 查询数据
                  		String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
                  		Table table = tenv.sqlQuery(selectSQL);
                  		table.printSchema();
                  		DataStream> result = tenv.toRetractStream(table, Row.class);
                  		result.print();
                  		env.execute();
                  	}
                  }
                  

                  2)、方式二:yaml配置

                  # 定义 catalogs
                  catalogs:
                     - name: alan_hivecatalog
                       type: hive
                       property-version: 1
                       hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf  # 须包含 hive-site.xml
                  # 改变表程序基本的执行行为属性。
                  execution:
                   planner: blink                            # 可选: 'blink' (默认)或 'old'
                   type: streaming                           # 必选:执行模式为 'batch' 或 'streaming'
                   result-mode: table                        # 必选:'table' 或 'changelog'
                   max-table-result-rows: 1000000            # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)
                   time-characteristic: event-time           # 可选: 'processing-time' 或 'event-time' (默认)
                   parallelism: 1                            # 可选:Flink 的并行数量(默认为 1)
                   periodic-watermarks-interval: 200         # 可选:周期性 watermarks 的间隔时间(默认 200 ms)
                   max-parallelism: 16                       # 可选:Flink 的最大并行数量(默认 128)
                   min-idle-state-retention: 0               # 可选:表程序的最小空闲状态时间
                   max-idle-state-retention: 0               # 可选:表程序的最大空闲状态时间
                   current-catalog: alan_hivecatalog         # 可选:当前会话 catalog 的名称(默认为 'default_catalog')
                   current-database: viewtest_db # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)
                   restart-strategy:                         # 可选:重启策略(restart-strategy)
                      type: fallback                          # 默认情况下“回退”到全局重启策略 

                  2、修改当前的 Catalog 和数据库

                  Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF。

                  1)、java实现

                  代码片段,只列出了关键的代码。

                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                  String catalogName = "alan_hive";
                  String defaultDatabase = "default";
                  String databaseName = "viewtest_db";
                  String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
                  HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
                  tenv.registerCatalog(catalogName, hiveCatalog);
                  tenv.useCatalog(catalogName);
                  hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {}, true);
                  //		tenv.executeSql("create database "+databaseName);
                  tenv.useDatabase(databaseName); 

                  2)、sql

                  Flink SQL> USE CATALOG alan_hive;
                  Flink SQL> USE viewtest_db;
                  

                  通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息。

                  • java
                    tenv.from("not_the_current_catalog.not_the_current_db.my_table");
                    
                    • sql
                      Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
                      

                      3、列出可用的 Catalog

                      1)、java实现

                      tenv.listCatalogs();
                      

                      2)、sql

                      show catalogs;
                      

                      4、列出可用的数据库

                      1)、java实现

                      tenv.listDatabases();
                      

                      2)、sql

                      show databases;
                      

                      5、列出可用的表

                      1)、java实现

                      tenv.listTables();
                      

                      2)、sql

                      show tables;
                      

                      以上,本文以示例展示了sql 和 table api 操作hivecatalog。