Iceberg从入门到精通系列之十:flink sql往Iceberg表插入数据
- 一、INSERT INTO
- 二、INSERT OVERWRITE
- 三、UPSERT
- 四、查询Batch模式
- 五、查询Streaming模式
- 六、读取Kafka流插入到iceberg表中
一、INSERT INTO
CREATE TABLE `stu` (id int,name string, age int) PARTITIONED BY (age) insert into stu values(3,'杀sheng',16),(4,'鸣人',19)
二、INSERT OVERWRITE
仅支持Flink的Batch模式
SET execution.runtime-mode = batch; INSERT OVERWRITE sample VALUES (1,'a'); INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
三、UPSERT
当将数据写入v2表格时,Iceberg支持基于主键的UPSERT。有两种方法可以启用upsert。
建表时指定
CREATE TABLE `hive_catalog`.`test`.`sample5`( `id` INT UNIQUE COMMENT 'unique id', `data` STRING NOT NULL, PRIMARY KEY(`id`) NOT ENFORCED ) with ( 'format-version'='2', 'write.upsert.enabled'='true' );
UPSERT模式下,如果对表进行分区,则分区字段必须是主键。
insert into sample5 values(1,'a'); insert into sample5 values(2,'b');
SET sql-client.execution.result-mode=tableau; select * from sample5;
insert into sample5 values(2,'c');
四、查询Batch模式
Batch模式:
SET execution.runtime-mode = batch; select * from sample;
五、查询Streaming模式
Streaming模式:
SET execution.runtime-mode = streaming; SET table.dynamic-table-options.enabled=true; SET sql-client.execution.result-mode=tableau;
从当前快照读取所有记录,然后从该快照读取增量数据
SELECT * FROM sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s')*/;
读取指定快照id(不包含)后的增量数据
SELECT * FROM sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s','start-snapshot-id'='384023852058202')*/;
六、读取Kafka流插入到iceberg表中
下载flink-connector-kafka:
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1
创建iceberg表:
CREATE TABLE `hive_catalog`.`test`.`sample5`( `id` INT UNIQUE COMMENT 'unique id', `data` STRING NOT NULL, PRIMARY KEY(`id`) NOT ENFORCED )
创建kafka topic对应的表:
create table default_catalog.default_database.kafka( id int, data string ) with( 'connector' = 'kafka', 'topic' = 'testKafkaTopic', 'properties.zookeeper.connect'='hadoop1:2101', 'properties.bootstrap.servers' = 'hadoop1:9092', 'format' = 'json', 'properties.group.id'='iceberg', 'scan.startup.mode'='earliest-offset' );
流式读取:
SET sql-client.execution.result-mode=tableau; SET execution.runtime-mode = streaming;
插入数据
insert into hive_catalog.test1.sample5 select * from default_catalog.default_database.kafka;
查询数据
SELECT * FROM sample5 /*+ OPTIONS('streaming'='true','monitor-interval'='1s')*/;
topic有最新数据时候,就能源源不断查询到最新数据。