dolphin上MySQL到hive、seatunnel任务创建:
1.先获取所需数量的taskCode
Long taskCode = getClient().opsForProcess().generateTaskCode(projectCode, 1).get(0);
2.创建MySqlSource对象,result_table_name、url、user、password、query是必须的
public class MySqlSource extends Source { private String url; private String driver = "com.mysql.cj.jdbc.Driver"; private String user; private String password; private Integer connection_check_timeout_sec = 100; private String query; private String partition_column; private Integer partition_num; /** @param result_table_name 结果临时表表名 */ public MySqlSource(String result_table_name) { super(result_table_name); } }
3.根据需要创建TransformParam,TransformParam中可添加多个Transform,Transform用于对数据源表处理产生结果表供后续transform或sink使用,transform的source_table_name为上一段流程的结果表,result_table_name为transform的结果表
CopyTransform:对数据源指定列复制产生额外列
FieldMapperTransform:字段映射
FilterRowKindTransform:过滤数据行类型,如INSERT、UPDATE_BEFORE等
FilterTransform:字段过滤
ReplaceTransform:对数据源的某一字段的值按匹配替换
SplitTransform:对数据源某一字段按分隔符拆分出新列
SQLTransform:对数据源表进行SQL操作,不支持复杂SQL,如join、聚合、like等操作
支持的函数:SQL Functions | Apache SeaTunnel
4.创建HiveSink对象,四个参数都是必须的,dbName为数据注入表所在库库名,tableName为数据注入表的表名,metastoreUri为hive的metastoreserver的地址,source_table_name为前置流程处理的最终结果表表名
public class HiveSink extends Sink { private String tableName; private