Flink DataStream API CDC同步MySQL数据到StarRocks

一、版本信息

  • Flink:1.16.1

    二代码实现

    • pom文件如下
       4.0.0 com.wys flink 1.0.0 jar  8 8 UTF-8 1.16.1 2.3.0 1.7.30    org.apache.flink flink-java ${flink.version}   org.apache.flink flink-clients ${flink.version}   org.apache.flink flink-streaming-java ${flink.version}   org.apache.flink flink-table-api-java-bridge ${flink.version}   org.apache.flink flink-table-planner-loader ${flink.version}   org.apache.flink flink-table-runtime ${flink.version}   org.apache.flink flink-connector-base ${flink.version}   org.apache.flink flink-connector-jdbc ${flink.version}    com.ververica flink-sql-connector-mysql-cdc ${flink-cdc.version}    org.apache.flink flink-runtime-web ${flink.version}    org.slf4j slf4j-api ${slf4j.version} provided   org.slf4j slf4j-log4j12 ${slf4j.version} provided   org.apache.logging.log4j log4j-to-slf4j 2.14.0 provided  com.starrocksflink-connector-starrocks1.2.9_flink-1.16providedcom.alibabafastjson1.2.60org.projectlomboklombok1.18.12 
      • Java代码
        package com.wys.flink;
        import org.apache.flink.configuration.Configuration;
        import org.apache.flink.configuration.RestOptions;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import com.wys.flink.bean.DataCenterShine;
        import com.wys.flink.util.DataStreamUtil;
        import com.wys.flink.util.SourceAndSinkInfo;
        public class DataStreamMySQLToStarRocks {public static void main(String[] args) throws Exception {// 流执行环境
        		Configuration conf = new Configuration();
        		// 设置WebUI绑定的本地端口
        		conf.setString(RestOptions.BIND_PORT, "8081");
        		// 使用配置
        		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        		env.enableCheckpointing(180000l, CheckpointingMode.EXACTLY_ONCE);
        		//设置source和sink的ip端口等信息
        		SourceAndSinkInfo info=SourceAndSinkInfo.builder()
        				.sourceIp("ip")
        				.sourcePort(3306)
        				.sourceUserName("root")
        				.sourcePassword("****")
        				.sinkIp("ip")
        				.sinkPort(9030)
        				.sinkUserName("root")
        				.sinkPassword("")
        				.build();
        		
        		//设置DataCenterShine实体类对应表的source和sink
        		DataStreamUtil.setStarRocksSourceAndSink(env, info, DataCenterShine.class);
        		//可以设置多个同步
        		//DataStreamUtil.setStarRocksSourceAndSink(env, info, Organization.class);
        		//定义任务名称
        		env.execute("data_center_shine_job");
        		
        	}
        	
        }
        
        • SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息
          package com.wys.flink.util;
          import lombok.AllArgsConstructor;
          import lombok.Builder;
          import lombok.Data;
          import lombok.NoArgsConstructor;
          @Data
          @Builder
          @AllArgsConstructor
          @NoArgsConstructor
          public class SourceAndSinkInfo {/**
          	 * 数据源ip
          	 */
          	private String sourceIp;
          	/**
          	 * 数据源端口
          	 */
          	private int sourcePort;
          	/**
          	 * 数据源账号
          	 */
          	private String sourceUserName;
          	/**
          	 * 数据源密码
          	 */
          	private String sourcePassword;
          	
          	
          	/**
          	 * 输出源ip
          	 */
          	private String sinkIp;
          	/**
          	 * 输出源端口
          	 */
          	private int sinkPort;
          	/**
          	 * 输出源账号
          	 */
          	private String sinkUserName;
          	/**
          	 * 输出源密码
          	 */
          	private String sinkPassword;
          }
          
          • DataCenterShine实体类,字段与数据库一一对应。
            package com.wys.flink.bean;
            import com.wys.flink.annotation.FieldInfo;
            import com.wys.flink.annotation.TableName;
            import lombok.Data;
            import lombok.EqualsAndHashCode;
            import java.io.Serializable;
            /**
             * 

            * 业务类型映射表 *

            * * @author wys * @since 2023-05-23 11:16:24 */ @Data @TableName("wsmg.data_center_shine") @EqualsAndHashCode(callSuper=false) public class DataCenterShine extends StarRocksPrimary implements Serializable { private static final long serialVersionUID = 1L; /** * 主键 */ @FieldInfo(order = 1,isPrimaryKey=true,notNull=true) private Integer id; /** * mapper名称 */ @FieldInfo(order = 2) private String busName; /** * mapper类名 */ @FieldInfo(order = 3) private String mapperClassName; /** * 实体类名称 */ @FieldInfo(order = 4) private String entityClassName; }
            • StarRocksPrimary 实体类
              package com.wys.flink.bean;
              import org.apache.flink.types.RowKind;
              import lombok.Data;
              @Data
              public class StarRocksPrimary {/**
              	 * 用于存储StarRocks数据类型:增、删、改
              	 */
              	private RowKind rowKind;
              }
              
              • FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。
                package com.wys.flink.annotation;
                import java.lang.annotation.ElementType;
                import java.lang.annotation.Retention;
                import java.lang.annotation.RetentionPolicy;
                import java.lang.annotation.Target;
                @Retention(RetentionPolicy.RUNTIME)
                @Target({ElementType.FIELD})
                public @interface FieldInfo { /**
                     * 字段排序:插入的字段顺序。
                     * @return
                     */
                    int order();
                    
                    /**
                     * 是否为主键:StarRocks主键模型时需要使用
                     * @methodName isPrimaryKey
                     * @return boolean
                     * @author wys
                     * @date 2023-12-12
                     */
                    boolean isPrimaryKey() default false;
                    
                    /**
                     * 不为空:字段是否为空
                     * @methodName notNull
                     * @return boolean
                     * @author wys
                     * @date 2023-12-12
                     */
                    boolean notNull() default false;
                }
                
                • TableName 注解类,用于记录实体类对应的库与表
                  package com.wys.flink.annotation;
                  import java.lang.annotation.ElementType;
                  import java.lang.annotation.Retention;
                  import java.lang.annotation.RetentionPolicy;
                  import java.lang.annotation.Target;
                  @Retention(RetentionPolicy.RUNTIME)
                  @Target({ElementType.TYPE})
                  public @interface TableName { /***
                       * 表名:库名.表名称,如:sys.user
                       * @return
                       */
                      String value();
                  }
                  
                  • DataStreamUtil工具类,用于设置source和sink。目前定义了MySQL同步到MySQL以及MySQL同步到StarRocks。
                    package com.wys.flink.util;
                    import java.util.function.Supplier;
                    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
                    import org.apache.flink.streaming.api.datastream.DataStreamSource;
                    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
                    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
                    import com.wys.flink.annotation.TableName;
                    import com.wys.flink.bean.DataCenterShine;
                    import com.wys.flink.sink.MysqlAndStarRocksSink;
                    public class DataStreamUtil {/**
                    	 * MySQL同步到MySQL的数据源和输出源设置
                    	 * @methodName setMySQLSourceAndSink
                    	 * @param env
                    	 * @param info
                    	 * @param cls void
                    	 * @author wys
                    	 * @date 2023-12-12
                    	 */
                    	/*@SuppressWarnings({ "unchecked", "rawtypes" })
                    	public static  void setMySQLSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class cls) {
                    		setSourceAndSink(env, info, cls, ()->new MysqlAndStarRocksSink(cls,info.getSinkIp(), info.getSinkPort()));
                    	}*/
                    	
                    	/**
                    	 * MySQL同步到StarRocks的数据源和输出源设置
                    	 * @methodName setStarRocksSourceAndSink
                    	 * @param env
                    	 * @param info
                    	 * @param cls void
                    	 * @author wys
                    	 * @date 2023-12-12
                    	 */
                    	public static  void setStarRocksSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class cls) {setSourceAndSink(env, info, cls, ()->StarRocksSinkUtil.getStarRocksSink(cls, info));
                    	}
                    	
                    	/**
                    	 * 数据源和输出源设置
                    	 * @methodName setSourceAndSink
                    	 * @param env
                    	 * @param info
                    	 * @param cls
                    	 * @param sink void
                    	 * @author wys
                    	 * @date 2023-12-12
                    	 */
                    	@SuppressWarnings({ "unchecked", "rawtypes" })
                    	private static  void setSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class cls,Supplier> sink) {if(cls.isAnnotationPresent(TableName.class)){String table=cls.getAnnotation(TableName.class).value();
                    			String[] tableArr=table.split("\\.");
                    			// source
                    			MySqlSource mySQLSource= MySqlSource.builder()
                    					.hostname(info.getSourceIp())
                    					.port(info.getSourcePort())
                    					.databaseList(tableArr[0]) // 设置捕获的数据库, 如果需要同步整个数据库,请将tableList 设置为 ".*".
                    					.tableList(table) // 设置捕获的表
                    					.username(info.getSourceUserName())
                    					.password(info.getSourcePassword())
                    					.deserializer(new CustomDebeziumDeserializationSchema(cls)).build();
                    			// 流执行环境添加source
                    			DataStreamSource source=env.fromSource(mySQLSource, WatermarkStrategy.noWatermarks(),tableArr[1]+"_source");
                    			// sink
                    			source.addSink(sink.get()).name(tableArr[1]+"_sink");
                    		}
                    	}
                    		
                    }
                    
                    • StarRocksSinkUtil辅助类,用于设置StarRocksSink
                      package com.wys.flink.util;
                      import java.lang.reflect.Field;
                      import java.math.BigDecimal;
                      import java.util.Date;
                      import java.util.HashMap;
                      import java.util.Map;
                      import java.util.Map.Entry;
                      import java.util.regex.Matcher;
                      import java.util.regex.Pattern;
                      import org.apache.flink.streaming.api.functions.sink.SinkFunction;
                      import org.apache.flink.table.api.DataTypes;
                      import org.apache.flink.table.api.TableSchema;
                      import org.apache.flink.table.api.TableSchema.Builder;
                      import org.apache.flink.table.types.DataType;
                      import com.starrocks.connector.flink.StarRocksSink;
                      import com.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder;
                      import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
                      import com.wys.flink.annotation.FieldInfo;
                      import com.wys.flink.annotation.TableName;
                      import com.wys.flink.bean.StarRocksPrimary;
                      /**
                       * StarRocksSink辅助类
                       * @className StarRocksSinkUtil
                       * @author wys
                       * @date 2023-12-12
                       */
                      public class StarRocksSinkUtil {private static final Pattern TPATTERN = Pattern.compile("[A-Z0-9]");
                      	/**
                      	 * 获取StarRocksSink
                      	 * @methodName getStarRocksSink
                      	 * @param cls
                      	 * @param info
                      	 * @return SinkFunction * @author wys
                      	 * @date 2023-12-12
                      	 */
                      	@SuppressWarnings("serial")
                      	public static  SinkFunction getStarRocksSink(Class cls, SourceAndSinkInfo info) {Map fieldMap = getFieldMap(cls);
                      		return StarRocksSink.sink(getTableSchema(cls), getStarRocksSinkOptions(info, cls),
                      				new StarRocksSinkRowBuilder() {@Override
                      					public void accept(Object[] objects, T beanDataJava) {try {//反射设置objects
                      							for (Entry entry : fieldMap.entrySet()) {Field field = cls.getDeclaredField(entry.getValue());
                      								field.setAccessible(true);
                      								Object obj = field.get(beanDataJava);
                      								objects[entry.getKey() - 1] = obj;
                      							}
                      							//设置该数据类型
                      							if(beanDataJava instanceof StarRocksPrimary){objects[objects.length - 1] = ((StarRocksPrimary) beanDataJava).getRowKind().ordinal();
                      							}
                      						} catch (Exception e) {e.printStackTrace();
                      						}
                      					}
                      				});
                      	}
                      	/**
                      	 * 获取FieldMap
                      	 * 
                      	 * @methodName initFieldMap void
                      	 * @author wys
                      	 * @date 2023-12-11
                      	 */
                      	private static  Map getFieldMap(Class cls) {Map fieldMap = new HashMap<>();
                      		Field[] fields = cls.getDeclaredFields();
                      		for (Field field : fields) {if (field.isAnnotationPresent(FieldInfo.class)) {fieldMap.put(field.getAnnotation(FieldInfo.class).order(), field.getName());
                      			}
                      		}
                      		return fieldMap;
                      	}
                      	/**
                      	 * 获取TableSchema
                      	 * @methodName getTableSchema
                      	 * @param cls
                      	 * @return TableSchema
                      	 * @author wys
                      	 * @date 2023-12-12
                      	 */
                      	@SuppressWarnings("deprecation")
                      	private static  TableSchema getTableSchema(Class cls) {Builder builder = TableSchema.builder();
                      		Field[] fields = cls.getDeclaredFields();
                      		//反射设置TableSchema
                      		for (Field field : fields) {if (!field.isAnnotationPresent(FieldInfo.class)) {continue;
                      			}
                      			FieldInfo fi = field.getAnnotation(FieldInfo.class);
                      			if (fi.isPrimaryKey()) {builder.primaryKey(field.getName());
                      			}
                      			DataType dataType = getDataType(field.getType());
                      			if (fi.notNull()) {dataType = dataType.notNull();
                      			}
                      			builder.field(humpToUnderlined(field.getName()), dataType);
                      		}
                      		return builder.build();
                      	}
                      	
                      	/**
                      	 * 获取StarRocksSinkOptions
                      	 * @methodName getStarRocksSinkOptions
                      	 * @param info
                      	 * @param cls
                      	 * @return StarRocksSinkOptions
                      	 * @author wys
                      	 * @date 2023-12-12
                      	 */
                      	private static  StarRocksSinkOptions getStarRocksSinkOptions(SourceAndSinkInfo info, Class cls) {String table = cls.getAnnotation(TableName.class).value();
                      		String[] tableArr = table.split("\\.");
                      		return StarRocksSinkOptions.builder()
                      				.withProperty("jdbc-url",String.format("jdbc:mysql://%s:%s/%s", info.getSinkIp(), info.getSinkPort(), tableArr[0]))
                      				.withProperty("load-url", info.getSinkIp() + ":8030")
                      				.withProperty("username", info.getSinkUserName())
                      				.withProperty("password", info.getSinkPassword())
                      				.withProperty("table-name", tableArr[1])
                      				.withProperty("database-name", tableArr[0])
                      				.withProperty("sink.properties.row_delimiter", "\\x02")
                      				.withProperty("sink.properties.column_separator", "\\x01")
                      				.withProperty("sink.buffer-flush.interval-ms", "5000").build();
                      	}
                      	/**
                      	 * 驼峰转下划线
                      	 * 
                      	 * @methodName humpToUnderlined
                      	 * @param str
                      	 * @return String
                      	 * @author wys
                      	 * @date 2023-12-12
                      	 */
                      	private static String humpToUnderlined(String str) {Matcher matcher = TPATTERN.matcher(str);
                      		StringBuffer sb = new StringBuffer();
                      		while (matcher.find()) {matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
                      		}
                      		matcher.appendTail(sb);
                      		return sb.toString();
                      	}
                      	/**
                      	 * 获取数据类型
                      	 * @methodName getDataType
                      	 * @param cls
                      	 * @return DataType
                      	 * @author wys
                      	 * @date 2023-12-12
                      	 */
                      	private static DataType getDataType(Class cls) {if (cls.equals(Integer.class)) {return DataTypes.INT();
                      		} else if (cls.equals(String.class)) {return DataTypes.STRING();
                      		} else if (cls.equals(Date.class)) {return DataTypes.TIMESTAMP();
                      		} else if (cls.equals(BigDecimal.class)) {return DataTypes.DECIMAL(8, 2);
                      		}
                      		throw new RuntimeException("未找到属性相应类型");
                      	}
                      	
                      }
                      
                      • CustomDebeziumDeserializationSchema实体类,自定义反序列化方案
                        package com.wys.flink.util;
                        
                        import java.util.List;
                        import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
                        import org.apache.flink.api.common.typeinfo.TypeInformation;
                        import org.apache.flink.types.RowKind;
                        import org.apache.flink.util.Collector;
                        import com.alibaba.fastjson.JSONObject;
                        import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
                        import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
                        import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
                        import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
                        /**
                         * 自定义反序列化方案
                         * @className CustomDebeziumDeserializationSchema
                         * @author wys
                         * @date 2023-12-12
                         */
                        public class CustomDebeziumDeserializationSchema implements DebeziumDeserializationSchema {private static final long serialVersionUID = 1L;
                            private Class cls;
                            public CustomDebeziumDeserializationSchema(Class cls) { this.cls=cls;
                            }
                            /**
                             * 只有after,则表明插入;若只有before,说明删除;若既有before,也有after,则代表更新
                             * @methodName deserialize
                             * @param sourceRecord
                             * @param collector void
                             * @author wys
                             * @date 2023-12-12
                             */
                            @Override
                            public void deserialize(SourceRecord sourceRecord, Collector collector) { JSONObject resJson = new JSONObject();
                                try { Struct valueStruct = (Struct) sourceRecord.value();
                                    Struct afterStruct = valueStruct.getStruct("after");
                                    Struct beforeStruct = valueStruct.getStruct("before");
                                    // 修改
                                    if (null!=beforeStruct &&  null!=afterStruct) { setDataContent(afterStruct, resJson);
                                        resJson.put("rowKind", RowKind.UPDATE_AFTER);
                                    } 
                                    // 插入
                                    else if (null!= afterStruct) { setDataContent(afterStruct, resJson);
                                    	resJson.put("rowKind", RowKind.INSERT);
                                    } 
                                    // 删除
                                    else if (null!= beforeStruct ) { setDataContent(beforeStruct, resJson);
                                        resJson.put("rowKind", RowKind.UPDATE_BEFORE);
                                    }
                                } catch (Exception e) { e.printStackTrace();
                                    throw new RuntimeException("反序列化失败");
                                }
                                T t =resJson.toJavaObject(cls);
                                collector.collect(t);
                            }
                            
                            /**
                             * 设置数据内容
                             * @methodName setDataContent
                             * @param struct
                             * @param resJson void
                             * @author wys
                             * @date 2023-12-12
                             */
                            private void setDataContent(Struct struct,JSONObject resJson){ List fields = struct.schema().fields();
                                  for (Field field : fields) { String name = field.name();
                                  	Object value = struct.get(name);
                                    resJson.put(name, value);
                                  }
                            }
                            @Override
                            public TypeInformation getProducedType() { return BasicTypeInfo.of(cls);
                            }
                        }
                        

                        三、自定义MySQL同步数据到StarRocks

                        一、功能描述

                        • 通过上传jar到Apache Flink Dashboard,输入需要同步的表,可自动生成任务

                          二、代码实现

                          package com.wys.flink;
                          import java.util.ArrayList;
                          import java.util.List;
                          import org.apache.flink.api.java.utils.ParameterTool;
                          import org.apache.flink.streaming.api.CheckpointingMode;
                          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                          import com.wys.flink.annotation.TableName;
                          import com.wys.flink.util.DataStreamUtil;
                          import com.wys.flink.util.SourceAndSinkInfo;
                          /**
                           * 自定义任务:--entity DataCenterShine,Organization
                           * @className CustomStreamCDC
                           * @author wys
                           * @date 2023-12-11
                           */
                          public class StarRocksCustomStreamCDC {public static void main(String[] args) throws Exception {List> clsList=new ArrayList<>();
                          		StringBuilder jobName=new StringBuilder();
                          		ParameterTool parameters = ParameterTool.fromArgs(args);
                          		String entitys = parameters.get("entity",null);
                          		if(null==entitys){throw new RuntimeException("在Program Arguments中输入需要同步表对应的实体类名称,格式:--entity User,Role...");
                          		}
                          		//获取参数内容这里是实体名称的数组
                          		String[] entityArr=entitys.split(",");
                          		for(String className:entityArr){Class cls=getBeanClass(String.format("com.wys.flink.bean.%s", className));
                          			clsList.add(cls);
                          			jobName.append(cls.getSimpleName()).append("_");
                          		}
                          		jobName.append("job");
                          		// 流执行环境
                          		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                          		env.enableCheckpointing(180000l,CheckpointingMode.EXACTLY_ONCE);
                          		SourceAndSinkInfo ssi=SourceAndSinkInfo.builder()
                          				.sourceIp("ip")
                          				.sourcePort(3306)
                          				.sourceUserName("root")
                          				.sourcePassword("****")
                          				.sinkIp("ip")
                          				.sinkPort(9030)
                          				.sinkUserName("root")
                          				.sinkPassword("****")
                          				.build();
                          		
                          		//设置输入输出源
                          		clsList.forEach(item->DataStreamUtil.setStarRocksSourceAndSink(env, ssi, item));
                          		env.execute(jobName.toString().toLowerCase());
                          	}
                          	
                          	/**
                          	 * 获取class
                          	 * @methodName getBeanClass
                          	 * @param className 为全路径
                          	 * @return Class * @author wys
                          	 * @date 2023-05-18
                          	 */
                          	private static Class getBeanClass(String className) {try {Class cls= Class.forName(className);
                          			if(!cls.isAnnotationPresent(TableName.class)){throw new RuntimeException("同步的实体类不存在@TableName");
                          			}
                          			return cls;
                          		} catch (ClassNotFoundException e) {//抛出异常:获取Class失败
                          			throw new RuntimeException(String.format("未找到实体类[%s]", className));
                          		}
                          	}
                          }
                          

                          三、Apache Flink Dashboard执行任务

                          • 在Apache Flink Dashboard的Submit New Job菜单,上传打包的jar,输入执行的主类,以及需要同步的表所对应的实体类(多个逗号分割)

                          • 点击Submit生成相应任务