dolphin上MySQL到hive、seatunnel任务创建

dolphin上MySQL到hive、seatunnel任务创建:

1.先获取所需数量的taskCode

Long taskCode = getClient().opsForProcess().generateTaskCode(projectCode, 1).get(0);

2.创建MySqlSource对象,result_table_name、url、user、password、query是必须的

public class MySqlSource extends Source {  private String url;
  private String driver = "com.mysql.cj.jdbc.Driver";
  private String user;
  private String password;
  private Integer connection_check_timeout_sec = 100;
  private String query;
  private String partition_column;
  private Integer partition_num;
  /** @param result_table_name 结果临时表表名 */
  public MySqlSource(String result_table_name) {  super(result_table_name);
  }
}

3.根据需要创建TransformParam,TransformParam中可添加多个Transform,Transform用于对数据源表处理产生结果表供后续transform或sink使用,transform的source_table_name为上一段流程的结果表,result_table_name为transform的结果表

​ CopyTransform:对数据源指定列复制产生额外列

​ FieldMapperTransform:字段映射

​ FilterRowKindTransform:过滤数据行类型,如INSERT、UPDATE_BEFORE等

​ FilterTransform:字段过滤

​ ReplaceTransform:对数据源的某一字段的值按匹配替换

​ SplitTransform:对数据源某一字段按分隔符拆分出新列

​ SQLTransform:对数据源表进行SQL操作,不支持复杂SQL,如join、聚合、like等操作

​ 支持的函数:SQL Functions | Apache SeaTunnel

4.创建HiveSink对象,四个参数都是必须的,dbName为数据注入表所在库库名,tableName为数据注入表的表名,metastoreUri为hive的metastoreserver的地址,source_table_name为前置流程处理的最终结果表表名

public class HiveSink extends Sink {  private String tableName;
  private