一、版本信息
- Flink:1.16.1
二代码实现
- pom文件如下
4.0.0 com.wys flink 1.0.0 jar 8 8 UTF-8 1.16.1 2.3.0 1.7.30 org.apache.flink flink-java ${flink.version} org.apache.flink flink-clients ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-table-api-java-bridge ${flink.version} org.apache.flink flink-table-planner-loader ${flink.version} org.apache.flink flink-table-runtime ${flink.version} org.apache.flink flink-connector-base ${flink.version} org.apache.flink flink-connector-jdbc ${flink.version} com.ververica flink-sql-connector-mysql-cdc ${flink-cdc.version} org.apache.flink flink-runtime-web ${flink.version} org.slf4j slf4j-api ${slf4j.version} provided org.slf4j slf4j-log4j12 ${slf4j.version} provided org.apache.logging.log4j log4j-to-slf4j 2.14.0 provided com.starrocks flink-connector-starrocks 1.2.9_flink-1.16 provided com.alibaba fastjson 1.2.60 org.projectlombok lombok 1.18.12 - Java代码
package com.wys.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.wys.flink.bean.DataCenterShine; import com.wys.flink.util.DataStreamUtil; import com.wys.flink.util.SourceAndSinkInfo; public class DataStreamMySQLToStarRocks {public static void main(String[] args) throws Exception {// 流执行环境 Configuration conf = new Configuration(); // 设置WebUI绑定的本地端口 conf.setString(RestOptions.BIND_PORT, "8081"); // 使用配置 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.enableCheckpointing(180000l, CheckpointingMode.EXACTLY_ONCE); //设置source和sink的ip端口等信息 SourceAndSinkInfo info=SourceAndSinkInfo.builder() .sourceIp("ip") .sourcePort(3306) .sourceUserName("root") .sourcePassword("****") .sinkIp("ip") .sinkPort(9030) .sinkUserName("root") .sinkPassword("") .build(); //设置DataCenterShine实体类对应表的source和sink DataStreamUtil.setStarRocksSourceAndSink(env, info, DataCenterShine.class); //可以设置多个同步 //DataStreamUtil.setStarRocksSourceAndSink(env, info, Organization.class); //定义任务名称 env.execute("data_center_shine_job"); } }
- SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息
package com.wys.flink.util; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data @Builder @AllArgsConstructor @NoArgsConstructor public class SourceAndSinkInfo {/** * 数据源ip */ private String sourceIp; /** * 数据源端口 */ private int sourcePort; /** * 数据源账号 */ private String sourceUserName; /** * 数据源密码 */ private String sourcePassword; /** * 输出源ip */ private String sinkIp; /** * 输出源端口 */ private int sinkPort; /** * 输出源账号 */ private String sinkUserName; /** * 输出源密码 */ private String sinkPassword; }
- DataCenterShine实体类,字段与数据库一一对应。
package com.wys.flink.bean; import com.wys.flink.annotation.FieldInfo; import com.wys.flink.annotation.TableName; import lombok.Data; import lombok.EqualsAndHashCode; import java.io.Serializable; /** *
* 业务类型映射表 *
* * @author wys * @since 2023-05-23 11:16:24 */ @Data @TableName("wsmg.data_center_shine") @EqualsAndHashCode(callSuper=false) public class DataCenterShine extends StarRocksPrimary implements Serializable { private static final long serialVersionUID = 1L; /** * 主键 */ @FieldInfo(order = 1,isPrimaryKey=true,notNull=true) private Integer id; /** * mapper名称 */ @FieldInfo(order = 2) private String busName; /** * mapper类名 */ @FieldInfo(order = 3) private String mapperClassName; /** * 实体类名称 */ @FieldInfo(order = 4) private String entityClassName; }- StarRocksPrimary 实体类
package com.wys.flink.bean; import org.apache.flink.types.RowKind; import lombok.Data; @Data public class StarRocksPrimary {/** * 用于存储StarRocks数据类型:增、删、改 */ private RowKind rowKind; }
- FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。
package com.wys.flink.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.FIELD}) public @interface FieldInfo { /** * 字段排序:插入的字段顺序。 * @return */ int order(); /** * 是否为主键:StarRocks主键模型时需要使用 * @methodName isPrimaryKey * @return boolean * @author wys * @date 2023-12-12 */ boolean isPrimaryKey() default false; /** * 不为空:字段是否为空 * @methodName notNull * @return boolean * @author wys * @date 2023-12-12 */ boolean notNull() default false; }
- TableName 注解类,用于记录实体类对应的库与表
package com.wys.flink.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface TableName { /*** * 表名:库名.表名称,如:sys.user * @return */ String value(); }
- DataStreamUtil工具类,用于设置source和sink。目前定义了MySQL同步到MySQL以及MySQL同步到StarRocks。
package com.wys.flink.util; import java.util.function.Supplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.wys.flink.annotation.TableName; import com.wys.flink.bean.DataCenterShine; import com.wys.flink.sink.MysqlAndStarRocksSink; public class DataStreamUtil {/** * MySQL同步到MySQL的数据源和输出源设置 * @methodName setMySQLSourceAndSink * @param env * @param info * @param cls void * @author wys * @date 2023-12-12 */ /*@SuppressWarnings({ "unchecked", "rawtypes" }) public static
void setMySQLSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class cls) { setSourceAndSink(env, info, cls, ()->new MysqlAndStarRocksSink(cls,info.getSinkIp(), info.getSinkPort())); }*/ /** * MySQL同步到StarRocks的数据源和输出源设置 * @methodName setStarRocksSourceAndSink * @param env * @param info * @param cls void * @author wys * @date 2023-12-12 */ public static void setStarRocksSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class cls) {setSourceAndSink(env, info, cls, ()->StarRocksSinkUtil.getStarRocksSink(cls, info)); } /** * 数据源和输出源设置 * @methodName setSourceAndSink * @param env * @param info * @param cls * @param sink void * @author wys * @date 2023-12-12 */ @SuppressWarnings({ "unchecked", "rawtypes" }) private static void setSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class cls,Supplier > sink) {if(cls.isAnnotationPresent(TableName.class)){String table=cls.getAnnotation(TableName.class).value(); String[] tableArr=table.split("\\."); // source MySqlSource mySQLSource= MySqlSource. builder() .hostname(info.getSourceIp()) .port(info.getSourcePort()) .databaseList(tableArr[0]) // 设置捕获的数据库, 如果需要同步整个数据库,请将tableList 设置为 ".*". .tableList(table) // 设置捕获的表 .username(info.getSourceUserName()) .password(info.getSourcePassword()) .deserializer(new CustomDebeziumDeserializationSchema(cls)).build(); // 流执行环境添加source DataStreamSource source=env.fromSource(mySQLSource, WatermarkStrategy.noWatermarks(),tableArr[1]+"_source"); // sink source.addSink(sink.get()).name(tableArr[1]+"_sink"); } } } - StarRocksSinkUtil辅助类,用于设置StarRocksSink
package com.wys.flink.util; import java.lang.reflect.Field; import java.math.BigDecimal; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.TableSchema.Builder; import org.apache.flink.table.types.DataType; import com.starrocks.connector.flink.StarRocksSink; import com.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.wys.flink.annotation.FieldInfo; import com.wys.flink.annotation.TableName; import com.wys.flink.bean.StarRocksPrimary; /** * StarRocksSink辅助类 * @className StarRocksSinkUtil * @author wys * @date 2023-12-12 */ public class StarRocksSinkUtil {private static final Pattern TPATTERN = Pattern.compile("[A-Z0-9]"); /** * 获取StarRocksSink * @methodName getStarRocksSink * @param cls * @param info * @return SinkFunction
* @author wys * @date 2023-12-12 */ @SuppressWarnings("serial") public static SinkFunction getStarRocksSink(Class cls, SourceAndSinkInfo info) {Map fieldMap = getFieldMap(cls); return StarRocksSink.sink(getTableSchema(cls), getStarRocksSinkOptions(info, cls), new StarRocksSinkRowBuilder () {@Override public void accept(Object[] objects, T beanDataJava) {try {//反射设置objects for (Entry entry : fieldMap.entrySet()) {Field field = cls.getDeclaredField(entry.getValue()); field.setAccessible(true); Object obj = field.get(beanDataJava); objects[entry.getKey() - 1] = obj; } //设置该数据类型 if(beanDataJava instanceof StarRocksPrimary){objects[objects.length - 1] = ((StarRocksPrimary) beanDataJava).getRowKind().ordinal(); } } catch (Exception e) {e.printStackTrace(); } } }); } /** * 获取FieldMap * * @methodName initFieldMap void * @author wys * @date 2023-12-11 */ private static Map getFieldMap(Class cls) {Map fieldMap = new HashMap<>(); Field[] fields = cls.getDeclaredFields(); for (Field field : fields) {if (field.isAnnotationPresent(FieldInfo.class)) {fieldMap.put(field.getAnnotation(FieldInfo.class).order(), field.getName()); } } return fieldMap; } /** * 获取TableSchema * @methodName getTableSchema * @param cls * @return TableSchema * @author wys * @date 2023-12-12 */ @SuppressWarnings("deprecation") private static TableSchema getTableSchema(Class cls) {Builder builder = TableSchema.builder(); Field[] fields = cls.getDeclaredFields(); //反射设置TableSchema for (Field field : fields) {if (!field.isAnnotationPresent(FieldInfo.class)) {continue; } FieldInfo fi = field.getAnnotation(FieldInfo.class); if (fi.isPrimaryKey()) {builder.primaryKey(field.getName()); } DataType dataType = getDataType(field.getType()); if (fi.notNull()) {dataType = dataType.notNull(); } builder.field(humpToUnderlined(field.getName()), dataType); } return builder.build(); } /** * 获取StarRocksSinkOptions * @methodName getStarRocksSinkOptions * @param info * @param cls * @return StarRocksSinkOptions * @author wys * @date 2023-12-12 */ private static StarRocksSinkOptions getStarRocksSinkOptions(SourceAndSinkInfo info, Class cls) {String table = cls.getAnnotation(TableName.class).value(); String[] tableArr = table.split("\\."); return StarRocksSinkOptions.builder() .withProperty("jdbc-url",String.format("jdbc:mysql://%s:%s/%s", info.getSinkIp(), info.getSinkPort(), tableArr[0])) .withProperty("load-url", info.getSinkIp() + ":8030") .withProperty("username", info.getSinkUserName()) .withProperty("password", info.getSinkPassword()) .withProperty("table-name", tableArr[1]) .withProperty("database-name", tableArr[0]) .withProperty("sink.properties.row_delimiter", "\\x02") .withProperty("sink.properties.column_separator", "\\x01") .withProperty("sink.buffer-flush.interval-ms", "5000").build(); } /** * 驼峰转下划线 * * @methodName humpToUnderlined * @param str * @return String * @author wys * @date 2023-12-12 */ private static String humpToUnderlined(String str) {Matcher matcher = TPATTERN.matcher(str); StringBuffer sb = new StringBuffer(); while (matcher.find()) {matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase()); } matcher.appendTail(sb); return sb.toString(); } /** * 获取数据类型 * @methodName getDataType * @param cls * @return DataType * @author wys * @date 2023-12-12 */ private static DataType getDataType(Class> cls) {if (cls.equals(Integer.class)) {return DataTypes.INT(); } else if (cls.equals(String.class)) {return DataTypes.STRING(); } else if (cls.equals(Date.class)) {return DataTypes.TIMESTAMP(); } else if (cls.equals(BigDecimal.class)) {return DataTypes.DECIMAL(8, 2); } throw new RuntimeException("未找到属性相应类型"); } } - CustomDebeziumDeserializationSchema实体类,自定义反序列化方案
package com.wys.flink.util; import java.util.List; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; /** * 自定义反序列化方案 * @className CustomDebeziumDeserializationSchema * @author wys * @date 2023-12-12 */ public class CustomDebeziumDeserializationSchema
implements DebeziumDeserializationSchema {private static final long serialVersionUID = 1L; private Class cls; public CustomDebeziumDeserializationSchema(Class cls) { this.cls=cls; } /** * 只有after,则表明插入;若只有before,说明删除;若既有before,也有after,则代表更新 * @methodName deserialize * @param sourceRecord * @param collector void * @author wys * @date 2023-12-12 */ @Override public void deserialize(SourceRecord sourceRecord, Collector collector) { JSONObject resJson = new JSONObject(); try { Struct valueStruct = (Struct) sourceRecord.value(); Struct afterStruct = valueStruct.getStruct("after"); Struct beforeStruct = valueStruct.getStruct("before"); // 修改 if (null!=beforeStruct && null!=afterStruct) { setDataContent(afterStruct, resJson); resJson.put("rowKind", RowKind.UPDATE_AFTER); } // 插入 else if (null!= afterStruct) { setDataContent(afterStruct, resJson); resJson.put("rowKind", RowKind.INSERT); } // 删除 else if (null!= beforeStruct ) { setDataContent(beforeStruct, resJson); resJson.put("rowKind", RowKind.UPDATE_BEFORE); } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("反序列化失败"); } T t =resJson.toJavaObject(cls); collector.collect(t); } /** * 设置数据内容 * @methodName setDataContent * @param struct * @param resJson void * @author wys * @date 2023-12-12 */ private void setDataContent(Struct struct,JSONObject resJson){ List fields = struct.schema().fields(); for (Field field : fields) { String name = field.name(); Object value = struct.get(name); resJson.put(name, value); } } @Override public TypeInformation getProducedType() { return BasicTypeInfo.of(cls); } } 三、自定义MySQL同步数据到StarRocks
一、功能描述
- 通过上传jar到Apache Flink Dashboard,输入需要同步的表,可自动生成任务
二、代码实现
package com.wys.flink; import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.wys.flink.annotation.TableName; import com.wys.flink.util.DataStreamUtil; import com.wys.flink.util.SourceAndSinkInfo; /** * 自定义任务:--entity DataCenterShine,Organization * @className CustomStreamCDC * @author wys * @date 2023-12-11 */ public class StarRocksCustomStreamCDC {public static void main(String[] args) throws Exception {List
> clsList=new ArrayList<>(); StringBuilder jobName=new StringBuilder(); ParameterTool parameters = ParameterTool.fromArgs(args); String entitys = parameters.get("entity",null); if(null==entitys){throw new RuntimeException("在Program Arguments中输入需要同步表对应的实体类名称,格式:--entity User,Role..."); } //获取参数内容这里是实体名称的数组 String[] entityArr=entitys.split(","); for(String className:entityArr){Class> cls=getBeanClass(String.format("com.wys.flink.bean.%s", className)); clsList.add(cls); jobName.append(cls.getSimpleName()).append("_"); } jobName.append("job"); // 流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(180000l,CheckpointingMode.EXACTLY_ONCE); SourceAndSinkInfo ssi=SourceAndSinkInfo.builder() .sourceIp("ip") .sourcePort(3306) .sourceUserName("root") .sourcePassword("****") .sinkIp("ip") .sinkPort(9030) .sinkUserName("root") .sinkPassword("****") .build(); //设置输入输出源 clsList.forEach(item->DataStreamUtil.setStarRocksSourceAndSink(env, ssi, item)); env.execute(jobName.toString().toLowerCase()); } /** * 获取class * @methodName getBeanClass * @param className 为全路径 * @return Class> * @author wys * @date 2023-05-18 */ private static Class> getBeanClass(String className) {try {Class> cls= Class.forName(className); if(!cls.isAnnotationPresent(TableName.class)){throw new RuntimeException("同步的实体类不存在@TableName"); } return cls; } catch (ClassNotFoundException e) {//抛出异常:获取Class失败 throw new RuntimeException(String.format("未找到实体类[%s]", className)); } } } 三、Apache Flink Dashboard执行任务
- 在Apache Flink Dashboard的Submit New Job菜单,上传打包的jar,输入执行的主类,以及需要同步的表所对应的实体类(多个逗号分割)
- 点击Submit生成相应任务
- 通过上传jar到Apache Flink Dashboard,输入需要同步的表,可自动生成任务
- CustomDebeziumDeserializationSchema实体类,自定义反序列化方案
- StarRocksSinkUtil辅助类,用于设置StarRocksSink
- DataStreamUtil工具类,用于设置source和sink。目前定义了MySQL同步到MySQL以及MySQL同步到StarRocks。
- TableName 注解类,用于记录实体类对应的库与表
- FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。
- StarRocksPrimary 实体类
- DataCenterShine实体类,字段与数据库一一对应。
- SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息
- Java代码
- pom文件如下