DataX数据同步

为什么找个对象这么难呢? 哭哭。 单身找对象,呜呜呜。

DataX

一.一 什么是 DataX

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据 库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP

等各种异构数据源之间稳定高 效的数据同步功能

一.二 DataX的设计

为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路, DataX 作为中间传输载体负责连接各种数据源。

当需要接入一个新的数据源的时候,只需要 将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步

一.三 支持的数据源

DataX 目前已经有了比较全面的插件体系,主流的 RDBMS 数据库、NOSQL、大数据计算系统都已经接入。

一.四 框架设计和运行原理

运行原理:

举例来说,用户提交了一个 DataX 作业,并且配置了 20 个并发,目的是将一个 100 张 分表的 mysql 数据同步到 odps 里面。

DataX 的调度决策思路是:

1)DataXJob 根据分库分表切分成了 100 个 Task。

2)根据 20 个并发,DataX 计算共需要分配 4 个 TaskGroup。

3)4 个 TaskGroup 平分切分好的 100 个 Task,每一个 TaskGroup 负责以 5 个并发共计运 行 25 个 Task。

DataX 使用

二.一 下载

下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

源码地址:https://github.com/alibaba/DataX

二.二 安装, 也有 windows 版本

需要 Python 环境

老蝴蝶这儿使用 windows 版本进行验证

  1. 先解压
tar -xvf datax.tar.gz 
  1. 进入 bin 目录,进行自检

自检脚本在 D:\datax\job\job.json

运行脚本在 bin 目录下

python datax.py D:\datax\job\job.json

二.三 查看配置文件,使用案例

可以通过 -r -w 查询相应的配置文件

最常用的就是 Mysql, 我们就用 Mysql 进行举例。

其他的,朋友们可以看官方文档。

python datax.py -r mysqlreader -w mysqlwriter
D:\datax\bin>python datax.py -r mysqlreader -w mysqlwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
Please refer to the mysqlwriter document:
     https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{ "job": { "content": [
            { "reader": { "name": "mysqlreader",
                    "parameter": { "column": [],
                        "connection": [
                            { "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": { "name": "mysqlwriter",
                    "parameter": { "column": [],
                        "connection": [
                            { "jdbcUrl": "",
                                "table": []
                            }
                        ],
                        "password": "",
                        "preSql": [],
                        "session": [],
                        "username": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": { "speed": { "channel": ""
            }
        }
    }
}

三. dataX 的使用

三.一 前期准备

我们在一个 本地服务器 211 上 创建一个 表, user_token , 里面有 309 条数据。

CREATE TABLE `user_token` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT 'id',
  `account` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '账号',
  `token` varchar(4000) NOT NULL COMMENT 'token',
  `login_type` int DEFAULT NULL COMMENT '登录设备类型(1:web 2:pda)',
  `login_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '登录时间',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `account_type` (`account`,`login_type`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=311 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='用户登录token记录表';

在远程服务器上 创建一个表 user_token, 表结构与 本地服务器一致, 但没有数据。

三.二 编写脚本文件 mysql2mysql.json

{ "job": { "content": [
            { "reader": { "name": "mysqlreader",
                    "parameter": { "column": ["id","account","token","login_type","login_date"],
                        "connection": [
                            { "jdbcUrl": ["jdbc:mysql://192.168.100.211:3306/demo"],
                                "table": ["user_token"]
                            }
                        ],
                        "password": "Xxxx",
                        "username": "root",
                        "where": "1=1"
                    }
                },
                "writer": { "name": "mysqlwriter",
                    "parameter": { "column": ["id","account","token","login_type","login_date"],
                        "connection": [
                            { "jdbcUrl": "jdbc:mysql://www.yueshushu.top:3306/mpcode",
                                "table": ["user_token"]
                            }
                        ],
                        "password": "Xxxx",
                        "preSql": [],
                        "session": [],
                        "username": "xxxx",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": { "speed": { "channel": "1"
            }
        }
    }
}

如果是增加同步的话, 在 reader 中 通过 where 条件进行控制。

三.三 执行脚本

D:\datax\bin>python datax.py mysql2mysql.json
D:\datax\bin>python datax.py mysql2mysql.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2023-04-03 19:52:59.601 [main] INFO  MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
2023-04-03 19:52:59.602 [main] INFO  MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
2023-04-03 19:52:59.605 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2023-04-03 19:52:59.608 [main] INFO  Engine - the machine info  => osInfo: Oracle Corporation 1.8 25.102-b14
        jvmInfo:        Windows 10 amd64 10.0
        cpu num:        20
        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1
        GC Names        [PS MarkSweep, PS Scavenge]
        MEMORY_NAME                    | allocation_size                | init_size
        PS Eden Space                  | 256.00MB                       | 256.00MB
        Code Cache                     | 240.00MB                       | 2.44MB
        Compressed Class Space         | 1,024.00MB                     | 0.00MB
        PS Survivor Space              | 42.50MB                        | 42.50MB
        PS Old Gen                     | 683.00MB                       | 683.00MB
        Metaspace                      | -0.00MB                        | 0.00MB
2023-04-03 19:52:59.614 [main] INFO  Engine -
{ "content":[
                { "reader":{ "name":"mysqlreader",
                                "parameter":{ "column":[
 "id",
 "account",
 "token",
 "login_type",
 "login_date"
                                        ],
                                        "connection":[
 { "jdbcUrl":[
                 "jdbc:mysql://192.168.100.211:3306/demo"
         ],
         "table":[
                 "user_token"
         ]
 }
                                        ],
                                        "password":"**********",
                                        "username":"root",
                                        "where":"1=1"
                                }
                        },
                        "writer":{ "name":"mysqlwriter",
                                "parameter":{ "column":[
 "id",
 "account",
 "token",
 "login_type",
 "login_date"
                                        ],
                                        "connection":[
 { "jdbcUrl":"jdbc:mysql://www.yueshushu.top:3306/mpcode",
         "table":[
                 "user_token"
         ]
 }
                                        ],
                                        "password":"******",
                                        "preSql":[
                                        ],
                                        "session":[
                                        ],
                                        "username":"yuejl",
                                        "writeMode":"insert"
                                }
                        }
                }
        ],
        "setting":{ "speed":{ "channel":"1"
                }
        }
}
2023-04-03 19:52:59.621 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false
2023-04-03 19:52:59.621 [main] INFO  JobContainer - DataX jobContainer starts job.
2023-04-03 19:52:59.622 [main] INFO  JobContainer - Set jobId = 0
Mon Apr 03 19:52:59 GMT+08:00 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-04-03 19:53:05.230 [job-0] INFO  OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://192.168.100.211:3306/demo?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
Mon Apr 03 19:53:05 GMT+08:00 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-04-03 19:53:05.266 [job-0] INFO  OriginalConfPretreatmentUtil - table:[user_token] has columns:[id,account,token,login_type,login_date].
Mon Apr 03 19:53:05 GMT+08:00 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-04-03 19:53:05.909 [job-0] INFO  OriginalConfPretreatmentUtil - table:[user_token] all columns:[
id,account,token,login_type,login_date
].
Mon Apr 03 19:53:05 GMT+08:00 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-04-03 19:53:06.351 [job-0] INFO  OriginalConfPretreatmentUtil - Write data [
insert INTO %s (id,account,token,login_type,login_date) VALUES(?,?,?,?,?)
], which jdbcUrl like:[jdbc:mysql://www.yueshushu.top:3306/mpcode?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true&tinyInt1isBit=false]
2023-04-03 19:53:06.351 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2023-04-03 19:53:06.351 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2023-04-03 19:53:06.351 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] do prepare work .
2023-04-03 19:53:06.352 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2023-04-03 19:53:06.352 [job-0] INFO  JobContainer - Job set Channel-Number to 1 channels.
2023-04-03 19:53:06.353 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2023-04-03 19:53:06.354 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] splits to [1] tasks.
2023-04-03 19:53:06.365 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2023-04-03 19:53:06.366 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2023-04-03 19:53:06.367 [job-0] INFO  JobContainer - Running by standalone Mode.
2023-04-03 19:53:06.369 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2023-04-03 19:53:06.371 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2023-04-03 19:53:06.371 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2023-04-03 19:53:06.403 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2023-04-03 19:53:06.405 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Begin to read record by Sql: [select id,account,token,login_type,login_date from user_token where (1=1)
] jdbcUrl:[jdbc:mysql://192.168.100.211:3306/demo?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
Mon Apr 03 19:53:06 GMT+08:00 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-04-03 19:53:06.468 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select id,account,token,login_type,login_date from user_token where (1=1)
] jdbcUrl:[jdbc:mysql://192.168.100.211:3306/demo?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
Mon Apr 03 19:53:06 GMT+08:00 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
Mon Apr 03 19:53:06 GMT+08:00 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2023-04-03 19:53:07.697 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[1295]ms
2023-04-03 19:53:07.697 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2023-04-03 19:53:16.409 [job-0] INFO  StandAloneJobContainerCommunicator - Total 309 records, 51097 bytes | Speed 4.99KB/s, 30 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-04-03 19:53:16.411 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2023-04-03 19:53:16.412 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2023-04-03 19:53:16.412 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do post work.
2023-04-03 19:53:16.412 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2023-04-03 19:53:16.412 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: D:\datax\hook
2023-04-03 19:53:16.413 [job-0] INFO  JobContainer -
         [total cpu info] => averageCpu                     | maxDeltaCpu                    | minDeltaCpu
                -1.00%                         | -1.00%                         | -1.00%
         [total gc info] => NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.025s             | 0.025s             | 0.025s
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.006s             | 0.006s             | 0.006s
2023-04-03 19:53:16.413 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-04-03 19:53:16.413 [job-0] INFO  StandAloneJobContainerCommunicator - Total 309 records, 51097 bytes | Speed 4.99KB/s, 30 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-04-03 19:53:16.414 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-04-03 19:52:59
任务结束时刻                    : 2023-04-03 19:53:16
任务总计耗时                    :                 16s
任务平均流量                    :            4.99KB/s
记录写入速度                    :             30rec/s
读出记录总数                    :                 309
读写失败总数                    :                   0

执行后,可以发现, 远端数据库数据同步成功。

四. 使用优化

四.一 关键参数

➢ job.setting.speed.channel : channel 并发数

➢ job.setting.speed.record : 2 全局配置 channel 的 record 限速

➢ job.setting.speed.byte:全局配置 channel 的 byte 限速

➢ core.transport.channel.speed.record:单个 channel 的 record 限速

➢ core.transport.channel.speed.byte:单个 channel 的 byte 限速

四.二 优化

四.二.一 提升每个 channel 的速度

在 DataX 内部对每个 Channel 会有严格的速度控制,分两种,一种是控制每秒同步的记 录数,

另外一种是每秒同步的字节数,默认的速度限制是 1MB/s,

可以根据具体硬件情况设 置这个 byte 速度或者 record 速度,一般设置 byte 速度,

比如:我们可以把单个 Channel 的 速度上限配置为 5MB

四.二.二 提升 DataX Job 内 Channel 并发数

并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数 (默认为 5)。

提升 job 内 Channel 并发有三种配置方式:

四.二.二.一 配置全局 Byte 限速以及单 Channel Byte 限速

Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速

core.transport.channel.speed.byte=1048576,job.setting.speed.byte=5242880,

所以 Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速=5242880/1048576=5 个

四.二.二.二 配置全局 Record 限速以及单 Channel Record 限速

Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速

{ "core": { "transport": { "channel": { "speed": { "record": 100
 }
 }
 }
 },
 "job": { "setting": { "speed": { "record" : 500
 }
 },
 ...
 }
}

core.transport.channel.speed.record=100 , job.setting.speed.record=500,

所 以 配 置 全 局 Record 限速以及单 Channel Record 限速,

Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速=500/100=5

四.二.二.三 直接配置 Channel 个数

只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的 channel 数。

{ "job": { "setting": { "speed": { "channel" : 5
 }
 },
 ...
 }
}

直接配置 job.setting.speed.channel=5,所以 job 内 Channel 并发=5 个

四.二.三 提高 JVM 堆内存

当提升 DataX Job 内 Channel 并发数时,内存的占用会显著增加,因为 DataX 作为数据 交换通道,在内存中会缓存较多的数据。

例如 Channel 中会有一个 Buffer,作为临时的数据 交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些 Buffer,

为了防止 OOM 等错 误,调大 JVM 的堆内存。

建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。

调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动 的时候,加上对应的参数,如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json

只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的 channel 数。

{ "job": { "setting": { "speed": { "channel" : 5
 }
 },
 ...
 }
}

直接配置 job.setting.speed.channel=5,所以 job 内 Channel 并发=5 个

四.二.三 提高 JVM 堆内存

当提升 DataX Job 内 Channel 并发数时,内存的占用会显著增加,因为 DataX 作为数据 交换通道,在内存中会缓存较多的数据。

例如 Channel 中会有一个 Buffer,作为临时的数据 交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些 Buffer,

为了防止 OOM 等错 误,调大 JVM 的堆内存。

建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。

调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动 的时候,加上对应的参数,如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json

DataX 有相应的 Web 端页面, datax-web

谢谢您的观看,如果喜欢,请关注我,再次感谢 !!!