RowData与Row区别
(0)都代表了一条记录。都可以设置RowKind,和列数量Aritry。
(1)RowData 属于Table API,而Row属于Stream API
(2)RowData 属于Table内部接口,对用户不友好。而Row使用简单。
(3)RowData 要拿到field值必须提供列索引和LogicalType类型。而Row只需要提供列名或列索引即可。
请自己阅读注释内容。
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import java.util.*; import java.util.stream.Collectors; /** * @author: lisai * @create: 2023-03-15 16:51 * @Description: */ public class RowUtils { public interface TypedMapFuncextends MapFunction , ResultTypeQueryable { DataType getProducedDataType(); } public static List getRowDataFieldGetters(DataType rowDataType) { Preconditions.checkArgument(rowDataType.getLogicalType().getTypeRoot() == LogicalTypeRoot.ROW); return getRowDataFieldGetters(rowDataType.getChildren().stream().map(DataType::getLogicalType).collect(Collectors.toList())); } public static List getRowDataFieldGetters(RowType rowType) { return getRowDataFieldGetters(rowType.getFields().stream().map(RowType.RowField::getType).collect(Collectors.toList())); } public static List getRowDataFieldGetters(List logicalTypes) { List fieldGetterList = new ArrayList<>(); for (int i = 0; i < logicalTypes.size(); i++) { final RowData.FieldGetter fieldGetter = RowData.createFieldGetter(logicalTypes.get(i), i); fieldGetterList.add(fieldGetter); } return fieldGetterList; } public static void copyRowData(RowData input, GenericRowData output, List fieldGetters) { for (int i = 0; i < input.getArity() && i < output.getArity(); i++) { if (input instanceof GenericRowData) { output.setField(i, ((GenericRowData) input).getField(i)); } else { Preconditions.checkArgument(fieldGetters != null); Object value = fieldGetters.get(i).getFieldOrNull(input); output.setField(i, value); } } } public static TypedMapFunc getRowDataToRowMapFunc(DataType rowDataType) { LogicalType logicalType = rowDataType.getLogicalType(); Preconditions.checkArgument(logicalType.getTypeRoot() == LogicalTypeRoot.ROW); return new TypedMapFunc () { private RowData.FieldGetter[] fieldGetters = getRowDataFieldGetters(rowDataType).toArray(new RowData.FieldGetter[0]); @Override public TypeInformation getProducedType() { RowType rowType = (RowType) logicalType; List
rowFields = rowType.getFields(); List rowDataTypes = rowDataType.getChildren(); TypeInformation>[] fieldTypeInfos = rowDataTypes.stream().map(t -> InternalTypeInfo.of(t.getLogicalType())).toArray(TypeInformation[]::new); String[] fieldNames = rowFields.stream().map(RowType.RowField::getName).toArray(String[]::new); return new RowTypeInfo(fieldTypeInfos, fieldNames); } @Override public DataType getProducedDataType() { return rowDataType.bridgedTo(Row.class); } @Override public Row map(RowData rowData) throws Exception { Row row = new Row(rowData.getRowKind(), rowData.getArity()); for (int i = 0; i < rowData.getArity(); i++) { RowData.FieldGetter fieldGetter = fieldGetters[i]; row.setField(i, fieldGetter.getFieldOrNull(rowData)); } return row; } }; } public static TypedMapFunc getRowToRowRowMapFunc(DataType rowDataType) { Preconditions.checkArgument(rowDataType.getLogicalType().getTypeRoot() == LogicalTypeRoot.ROW); return new TypedMapFunc
() { /** * @Description: 注意input Row中所有的数据类型必须是Flink Table API规定的内部类型。具体参考 {@DataTypeUtils.toInternalDataType()} * @param * @return TypeInformation
*/ @Override public TypeInformation getProducedType() { return InternalTypeInfo.of((RowType)rowDataType.getLogicalType()); } @Override public DataType getProducedDataType() { return rowDataType.bridgedTo(RowData.class); } @Override public RowData map(Row row) throws Exception { GenericRowData rowData = new GenericRowData(row.getKind(), row.getArity()); for (int i = 0; i < rowData.getArity(); i++) { rowData.setField(i, row.getField(i)); } return rowData; } }; } }