FlinkCDC系列之Oracle实时数据采集

目录

Oracle的安装部署

Oracle 实时数据采集的准备

开启归档日志

1、检查归档日志是否开启

2、如果没有开启归档日志,需要按照如下方式开启

3、再次检查一下有没有开启

用户表空间创建

1、创建表空间

2、创建用户,授权用户

3、加载导入数据

4、支持增量日志

FlinkSQL实时采集数据演示

Flink Table/SQL 程序

1、pom.xml

2、table api

3、启动程序

Flink WEB UI

Oracle的安装部署

注:本次部署采用docker, 采用oracle 11g

1. oracle的docker 镜像获取

 docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

2. 运行oracle的镜像,启动容器

docker run -d -p 1521:1521 --name oracle11g registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

3. 查看容器是否启动

4. 进入容器修改oracle信息

docker exec -it b8fc967bec2d /bin/bash

5. 配置oracle的初始库和实例信息

su root 

root用户密码:helowin

vi /etc/profile

添加一下配置信息:

export ORACLE_HOME=/home/oracle/app/oracle/product/11.2.0/dbhome_2

export ORACLE_SID=helowin

export PATH=$ORACLE_HOME/bin:$PATH

note: source /etc/profile

6. 登录,创建用户,授权

su oracle

sqlplus /nolog 或者sqlplus / as sysdba

SQL>alter user system identified bysytem; 

SQL>alter user sys identified by sys;

SQL>ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED; 

SQL>create user test identified by test;

SQL>grant connect,resource,dba to test;

7. navicate 客户端连接

Oracle 实时数据采集的准备

开启归档日志

首先要以sysdba的角色,登录oracle的客户端: sqlplus  /  as  sysdba

1、检查归档日志是否开启

SELECT dbid, name, log_mode FROM v$database;

解释:

NAME: 数据库名称

LOG_MODE: 日志模式  

  1. NOARCHIVELOG
  2. ARCHIVELOG
  3. MANUAL

注:noarchivelog表示未开启归档模式, archivelog表示开启自动归档模式, manual表示开启手动归档模式

ARCHIVE LOG LIST;

解释:

Automatic archival: 自动开启归档模式

next log sequence to archive: 下个将要归档的日志序号

这里要理清归档日志和重做日志的区别:

redolog:即重做日志

  1. 记录数据库的变化(DML, DDL)
  2. 用于数据块的recovery
  3. 以组的方式管理redo flie, 最少两组redo, 循环使用
  4. 和数据文件存放到不同的磁盘上,需读写速度快的磁盘(否则会成为瓶颈)

查询redolog的位置:select member from V$LOGFILE;

archivelog:归档日志

重做日志会不断循环拷贝到archivelog里。形成一个个归档文件。重做日志大小固定,归档文件可以设置大小:alter system set db_recovery_file_dest_size = 10G;

查看归档日志的位置:show parameter recover;

查看归档日志使用率:select * from V$FLASH_RECOVERY_AREA_USAGE;

2、如果没有开启归档日志,需要按照如下方式开启

SQL>alter system set db_recovery_file_dest_size = 10G;

SQL>alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile;

SQL>shutdown immediate;

SQL>startup mount;

SQL>alter database archivelog;

SQL>alter database open;

3、再次检查一下有没有开启

SQL>archive log list;

用户表空间创建

1、创建表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

2、创建用户,授权用户

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

grant connect,resource,dba to flinkuser;

 GRANT CREATE SESSION TO flinkcdc;

  GRANT SELECT ON V_$DATABASE to flinkcdc;

  GRANT FLASHBACK ANY TABLE TO flinkcdc;

  GRANT SELECT ANY TABLE TO flinkcdc;

  GRANT SELECT_CATALOG_ROLE TO flinkcdc;

  GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;

  GRANT SELECT ANY TRANSACTION TO flinkcdc;

  GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc;

  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;

  GRANT CREATE TABLE TO flinkcdc;

  GRANT LOCK ANY TABLE TO flinkcdc;

  GRANT ALTER ANY TABLE TO flinkcdc;

  GRANT CREATE SEQUENCE TO flinkcdc;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;

  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;

  GRANT SELECT ON V_$LOG TO flinkcdc;

  GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;

  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;

  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;

  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;

  GRANT SELECT ON V_$LOGFILE TO flinkcdc;

  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;

  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;

3、加载导入数据

DROP TABLE "FLINKUSER"."ORDERS";

CREATE TABLE "FLINKUSER"."ORDERS" (

  "ORDER_ID" NUMBER(9) NOT NULL ,

  "ORDER_DATE" TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL ,

  "CUSTOMER_NAME" VARCHAR2(255 BYTE) NOT NULL ,

  "PRICE" NUMBER(10,5) NOT NULL ,

  "PRODUCT_ID" NUMBER(9) NOT NULL ,

  "ORDER_STATUS" NUMBER(1) NOT NULL 

)

TABLESPACE "LOGMINER_TBS"

LOGGING

NOCOMPRESS

PCTFREE 10

INITRANS 1

STORAGE (

  INITIAL 65536 

  NEXT 1048576 

  MINEXTENTS 1

  MAXEXTENTS 2147483645

  BUFFER_POOL DEFAULT

)

PARALLEL 1

NOCACHE

DISABLE ROW MOVEMENT

;

-- ----------------------------

-- Records of "ORDERS"

-- ----------------------------

INSERT INTO "FLINKUSER"."ORDERS" VALUES ('10001', TO_TIMESTAMP('2020-07-30 18:08:22.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Jark', '50.5', '102', '0');

INSERT INTO "FLINKUSER"."ORDERS" VALUES ('10002', TO_TIMESTAMP('2020-07-30 18:11:09.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Sally', '15', '105', '0');

INSERT INTO "FLINKUSER"."ORDERS" VALUES ('10003', TO_TIMESTAMP('2020-07-30 20:00:30.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Edward', '25.25', '106', '0');

-- ----------------------------

-- Primary Key structure for table ORDERS

-- ----------------------------

ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006996" PRIMARY KEY ("ORDER_ID");

-- ----------------------------

-- Checks structure for table ORDERS

-- ----------------------------

ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006990" CHECK ("ORDER_ID" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;

ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006991" CHECK ("ORDER_DATE" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;

ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006992" CHECK ("CUSTOMER_NAME" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;

ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006993" CHECK ("PRICE" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;

ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006994" CHECK ("PRODUCT_ID" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;

ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006995" CHECK ("ORDER_STATUS" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;

4、支持增量日志

数据库级别:ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

表级别:ALTER TABLE test.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

FlinkSQL实时采集数据演示

类型版本
Flink1.16.1
Flink CDC2.3.0

Flink Table/SQL 程序

1、pom.xml

 

        1.16.1

        2.12

        8

        8

        UTF-8

   

   

       

       

            org.apache.flink

            flink-core

            ${flink.version}

       

       

            org.apache.flink

            flink-streaming-java

            ${flink.version}

       

       

            org.apache.flink

            flink-table-common

            ${flink.version}

       

       

            org.apache.flink

            flink-table-api-java-bridge

            ${flink.version}

       

       

            org.apache.flink

            flink-connector-base

            ${flink.version}

       

       

            org.apache.flink

            flink-table-planner_${scala.binary.version}

            ${flink.version}

       

       

            org.apache.flink

            flink-clients

            ${flink.version}

       

       

            com.ververica

            flink-sql-connector-oracle-cdc

            2.3.0

       

       

       

            org.apache.flink

            flink-runtime-web

            ${flink.version}

       

       

       

       

            org.slf4j

            slf4j-api

            1.7.30

            compile

       

       

            ch.qos.logback

            logback-core

            1.2.3

       

       

            ch.qos.logback

            logback-classic

            1.2.3

       

   

2、table api

public class FlinkSqlCDCOracleSourceExample {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        //设置WebUI绑定的本地端口

        conf.setString(RestOptions.BIND_PORT,"8081");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql(" CREATE TABLE orders (\n" +

                "   ORDER_ID INT,\n" +

                "   ORDER_DATE TIMESTAMP_LTZ(3),\n" +

                "   CUSTOMER_NAME STRING,\n" +

                "   PRICE DECIMAL(10, 5),\n" +

                "   PRODUCT_ID INT,\n" +

                "   ORDER_STATUS BOOLEAN\n" +

                " ) WITH (\n" +

                "   'connector' = 'oracle-cdc',\n" +

                "   'hostname' = 's171',\n" +

                "   'port' = '1521',\n" +

                "   'username' = 'flinkuser',\n" +

                "   'password' = 'flinkpw',\n" +

                "   'database-name' = 'HELOWIN',\n" +

                "   'schema-name' = 'flinkuser',  \n" +

                "   'table-name' = 'orders',\n" +

                "   'scan.startup.mode' = 'latest-offset',\n" +

                "   'debezium.poll.interval.ms' = '1000',\n" +

                "   'debezium.log.mining.strategy' = 'online_catalog',\n" +

                "    'debezium.log.mining.continuous.mine' = 'true'\n" +

                " )");

        Table table = tableEnv.sqlQuery("select * from orders");

//        tableEnv.toRetractStream(table, Orders.class).print("Oracle cdc: ").setParallelism(1);

        tableEnv.toChangelogStream(table).print("Oracle cdc: ").setParallelism(1);

        env.execute("Flink CDC: Oracle -> Print");

    }

}

3、启动程序

新增一条数据:INSERT INTO "FLINKUSER"."ORDERS"("ORDER_ID", "ORDER_DATE", "CUSTOMER_NAME", "PRICE", "PRODUCT_ID", "ORDER_STATUS") VALUES ('10006', TO_TIMESTAMP('2020-07-30 20:00:30.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Edward', '25.25', '106', '0');

Flink WEB UI

http://localhost:8081