Skip to content

Commit

Permalink
add more debug info for TypeInfoConverter
Browse files Browse the repository at this point in the history
  • Loading branch information
beyond-up committed Feb 28, 2023
1 parent f906aa5 commit 6d34b49
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created 2022/5/11
*/
public class HiveTypeInfoConverter extends FileMappingTypeInfoConverter {
private static final Logger LOG = LoggerFactory.getLogger(HiveTypeInfoConverter.class);

public static final String DEFAULT_STAGE_NAME = "hive";
private static final String LIST_TYPE_NAME = "array";
private static final String MAP_TYPE_NAME = "map";
Expand All @@ -51,7 +55,10 @@ private static void expect(String engineType, int index, char expect) {

@Override
public TypeInfo<?> fromTypeString(String typeString) {
Preconditions.checkNotNull(typeString,
String.format("Type string %s can not be null.", typeString));
typeString = trim(getBaseName(typeString));
LOG.debug("type string = {}.", typeString);
if (isArrayType(typeString)) {
return internalRecursion(typeString, Category.LIST);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@
import com.bytedance.bitsail.common.typeinfo.MapTypeInfo;
import com.bytedance.bitsail.common.typeinfo.TypeInfo;
import com.bytedance.bitsail.common.typeinfo.TypeInfos;
import com.bytedance.bitsail.common.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

/**
* Created 2022/5/11
*
* @author ke.hao
*/
public class MongoTypeInfoConverter extends FileMappingTypeInfoConverter {
private static final Logger LOG = LoggerFactory.getLogger(MongoTypeInfoConverter.class);

private static final String OBJECT_TYPE = "object";
private static final String ARRAY_TYPE = "array";
Expand All @@ -39,7 +46,10 @@ public MongoTypeInfoConverter() {

@Override
public TypeInfo<?> fromTypeString(String engineType) {
Preconditions.checkNotNull(engineType,
String.format("Type string %s can not be null.", engineType));
engineType = trim(engineType);
LOG.debug("type string = {}.", engineType);
if (isBasicType(engineType)) {
return getBasicTypeInfoFromMongoDBType(engineType);
} else if (isArrayType(engineType)) {
Expand All @@ -48,7 +58,7 @@ public TypeInfo<?> fromTypeString(String engineType) {
return getMapTypeInfoFromMongoDBType(engineType);
} else {
throw BitSailException.asBitSailException(CommonErrorCode.CONVERT_NOT_SUPPORT,
"MongDB engine, invalid MongoDB type: " + engineType);
"MongoDB engine, invalid MongoDB type: " + engineType);
}
}

Expand Down Expand Up @@ -111,7 +121,12 @@ public TypeInfo<?> getTypeInfo(String type) {
}

public TypeInfo<?> getBasicTypeInfoFromMongoDBType(String mongoType) {
return reader.getToTypeInformation().get(mongoType);
TypeInfo<?> typeInfo = reader.getToTypeInformation().get(mongoType);
if (Objects.isNull(typeInfo)) {
throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE,
String.format("MongoDB engine, not support type string %s.", mongoType));
}
return typeInfo;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

Expand All @@ -35,10 +37,12 @@
*/

public class ColumnFlinkTypeInfoUtil {
private static final Logger LOG = LoggerFactory.getLogger(ColumnFlinkTypeInfoUtil.class);

public static RowTypeInfo getRowTypeInformation(TypeInfoConverter converter,
List<ColumnInfo> columnInfos) {

LOG.debug("TypeInfoConverter = {}.", converter);
String[] fieldNames = new String[columnInfos.size()];
TypeInformation<?>[] fieldTypes = new TypeInformation[columnInfos.size()];
for (int index = 0; index < columnInfos.size(); index++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class NativeFlinkTypeInfoUtil {
private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkTypeInfoUtil.class);

public static RowTypeInfo getRowTypeInformation(List<ColumnInfo> columnInfos) {
return getRowTypeInformation(columnInfos, new BitSailTypeInfoConverter());
}

public static RowTypeInfo getRowTypeInformation(List<ColumnInfo> columnInfos,
TypeInfoConverter typeInfoConverter) {

LOG.debug("TypeInfoConverter = {}.", typeInfoConverter);
String[] fieldNames = new String[columnInfos.size()];
TypeInformation<?>[] fieldTypes = new TypeInformation[columnInfos.size()];

Expand Down

0 comments on commit 6d34b49

Please sign in to comment.