Iceberg从入门到精通系列之十:flink sql往Iceberg表插入数据,Batch模式和Streaming模式查询数据

Iceberg从入门到精通系列之十:flink sql往Iceberg表插入数据

  • 一、INSERT INTO
  • 二、INSERT OVERWRITE
  • 三、UPSERT
  • 四、查询Batch模式
  • 五、查询Streaming模式
  • 六、读取Kafka流插入到iceberg表中

    一、INSERT INTO

    CREATE TABLE `stu` (id int,name string, age int)
    PARTITIONED BY (age)
    insert into stu values(3,'杀sheng',16),(4,'鸣人',19)
    

    二、INSERT OVERWRITE

    仅支持Flink的Batch模式

    SET execution.runtime-mode = batch;
    INSERT OVERWRITE sample VALUES (1,'a');
    INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
    

    三、UPSERT

    当将数据写入v2表格时,Iceberg支持基于主键的UPSERT。有两种方法可以启用upsert。

    建表时指定

    CREATE TABLE `hive_catalog`.`test`.`sample5`(
    `id` INT UNIQUE COMMENT 'unique id',
    `data` STRING NOT NULL,
    PRIMARY KEY(`id`) NOT ENFORCED
    ) with (
    'format-version'='2',
    'write.upsert.enabled'='true'
    );
    

    UPSERT模式下,如果对表进行分区,则分区字段必须是主键。

    insert into sample5 values(1,'a');
    insert into sample5 values(2,'b');
    
    SET sql-client.execution.result-mode=tableau;
    select * from sample5;
    
    insert into sample5 values(2,'c');
    

    四、查询Batch模式

    Batch模式:

    SET execution.runtime-mode = batch;
    select * from sample;
    

    五、查询Streaming模式

    Streaming模式:

    SET execution.runtime-mode = streaming;
    SET table.dynamic-table-options.enabled=true;
    SET sql-client.execution.result-mode=tableau;
    

    从当前快照读取所有记录,然后从该快照读取增量数据

    SELECT * FROM sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s')*/;
    

    读取指定快照id(不包含)后的增量数据

    SELECT * FROM sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s','start-snapshot-id'='384023852058202')*/;
    

    六、读取Kafka流插入到iceberg表中

    下载flink-connector-kafka:

    https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

    创建iceberg表:

    CREATE TABLE `hive_catalog`.`test`.`sample5`(
    `id` INT UNIQUE COMMENT 'unique id',
    `data` STRING NOT NULL,
    PRIMARY KEY(`id`) NOT ENFORCED
    )
    

    创建kafka topic对应的表:

    create table default_catalog.default_database.kafka(
    id int,
    data string
    ) with(
    'connector' = 'kafka',
    'topic' = 'testKafkaTopic',
    'properties.zookeeper.connect'='hadoop1:2101',
    'properties.bootstrap.servers' = 'hadoop1:9092',
    'format' = 'json',
    'properties.group.id'='iceberg',
    'scan.startup.mode'='earliest-offset'
    );
    

    流式读取:

    SET sql-client.execution.result-mode=tableau;
    SET execution.runtime-mode = streaming;
    

    插入数据

    insert into hive_catalog.test1.sample5 select * from default_catalog.default_database.kafka;
    

    查询数据

    SELECT * FROM sample5 /*+ OPTIONS('streaming'='true','monitor-interval'='1s')*/;
    

    topic有最新数据时候,就能源源不断查询到最新数据。