Skip to content

Commit

Permalink
[BitSail][Bug]Fixed type info converter (bytedance#415)
Browse files Browse the repository at this point in the history
[BitSail][Bug]Add unique type info converter for the column type info converter.
  • Loading branch information
hk-lrzy authored Feb 28, 2023
1 parent b7a6390 commit f906aa5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,31 @@ public class TypeInfoColumnBridge {
COLUMN_BRIDGE_TYPE_INFO_MAPPING.get(typeInfo));
}

for (TypeInfo<?> typeInfo : COLUMN_BRIDGE_TYPE_INFO_MAPPING.keySet()) {
TYPE_INFO_BRIDGE_CLASS_MAPPING.put(COLUMN_BRIDGE_TYPE_INFO_MAPPING.get(typeInfo).getTypeClass(),
typeInfo);
}
//According to column type info not match with framework type info, so we decide use fixed mapping for it.
//TODO after we deprecated column type info we will delete those logic.
TYPE_INFO_BRIDGE_CLASS_MAPPING.put(
PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO.getTypeClass(),
TypeInfos.LONG_TYPE_INFO);

TYPE_INFO_BRIDGE_CLASS_MAPPING.put(
PrimitiveColumnTypeInfo.DOUBLE_COLUMN_TYPE_INFO.getTypeClass(),
TypeInfos.DOUBLE_TYPE_INFO);

TYPE_INFO_BRIDGE_CLASS_MAPPING.put(
PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO.getTypeClass(),
TypeInfos.STRING_TYPE_INFO);

TYPE_INFO_BRIDGE_CLASS_MAPPING.put(
PrimitiveColumnTypeInfo.BOOL_COLUMN_TYPE_INFO.getTypeClass(),
TypeInfos.BOOLEAN_TYPE_INFO);

TYPE_INFO_BRIDGE_CLASS_MAPPING.put(
PrimitiveColumnTypeInfo.DATE_COLUMN_TYPE_INFO.getTypeClass(),
TypeInfos.SQL_TIMESTAMP_TYPE_INFO);

TYPE_INFO_BRIDGE_CLASS_MAPPING.put(
PrimitiveColumnTypeInfo.BYTES_COLUMN_TYPE_INFO.getTypeClass(),
BasicArrayTypeInfo.BINARY_TYPE_INFO);
}

public static TypeInformation<?> bridgeTypeInfo(TypeInfo<?> bridge) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@

package com.bytedance.bitsail.flink.core.typeutils;

import com.bytedance.bitsail.common.column.LongColumn;
import com.bytedance.bitsail.common.column.StringColumn;
import com.bytedance.bitsail.common.model.ColumnInfo;
import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter;
import com.bytedance.bitsail.common.type.TypeInfoConverter;
import com.bytedance.bitsail.common.typeinfo.BasicArrayTypeInfo;
import com.bytedance.bitsail.common.typeinfo.TypeInfos;
import com.bytedance.bitsail.flink.core.typeinfo.ListColumnTypeInfo;
import com.bytedance.bitsail.flink.core.typeinfo.MapColumnTypeInfo;
import com.bytedance.bitsail.flink.core.typeinfo.PrimitiveColumnTypeInfo;

import com.google.common.collect.ImmutableList;
Expand All @@ -27,6 +33,7 @@
import org.junit.Test;

import java.util.List;
import java.util.Map;

public class ColumnFlinkTypeInfoUtilTest {

Expand All @@ -53,4 +60,36 @@ public void testGetRowTypeInformation() {
Assert.assertEquals("List<StringColumn>", rowTypeInfo.getTypeAt(5).toString());
Assert.assertEquals("Map<StringColumn, StringColumn>", rowTypeInfo.getTypeAt(6).toString());
}

@Test
public void testGetRowTypeInfo() {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO,
PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO,
PrimitiveColumnTypeInfo.DOUBLE_COLUMN_TYPE_INFO,
PrimitiveColumnTypeInfo.DATE_COLUMN_TYPE_INFO,
PrimitiveColumnTypeInfo.BYTES_COLUMN_TYPE_INFO);

com.bytedance.bitsail.common.typeinfo.RowTypeInfo simpleFrameworkRowTypeInfo = ColumnFlinkTypeInfoUtil
.getRowTypeInfo(rowTypeInfo);

Assert.assertEquals(TypeInfos.LONG_TYPE_INFO, simpleFrameworkRowTypeInfo.getTypeInfos()[0]);
Assert.assertEquals(TypeInfos.STRING_TYPE_INFO, simpleFrameworkRowTypeInfo.getTypeInfos()[1]);
Assert.assertEquals(TypeInfos.DOUBLE_TYPE_INFO, simpleFrameworkRowTypeInfo.getTypeInfos()[2]);
Assert.assertEquals(TypeInfos.SQL_TIMESTAMP_TYPE_INFO, simpleFrameworkRowTypeInfo.getTypeInfos()[3]);
Assert.assertEquals(BasicArrayTypeInfo.BINARY_TYPE_INFO, simpleFrameworkRowTypeInfo.getTypeInfos()[4]);

MapColumnTypeInfo<StringColumn, LongColumn> mapColumnTypeInfo =
new MapColumnTypeInfo<>(PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO,
PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO);
ListColumnTypeInfo<StringColumn> listColumnTypeInfo =
new ListColumnTypeInfo<>(PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO);

RowTypeInfo complexRowTypeInfo = new RowTypeInfo(mapColumnTypeInfo, listColumnTypeInfo);
com.bytedance.bitsail.common.typeinfo.RowTypeInfo complexFrameworkRowTypeInfo = ColumnFlinkTypeInfoUtil
.getRowTypeInfo(complexRowTypeInfo);

Assert.assertTrue(complexFrameworkRowTypeInfo.getTypeInfos()[0].getTypeClass().isAssignableFrom(Map.class));
Assert.assertTrue(complexFrameworkRowTypeInfo.getTypeInfos()[1].getTypeClass().isAssignableFrom(List.class));
}
}

0 comments on commit f906aa5

Please sign in to comment.