From 439fa702096bd303dae5dfec4f5775a5d02f083a Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 5 Nov 2024 17:49:16 +0800 Subject: [PATCH] [Feature][Transform-V2] Jsonpath support dest column and delete src field --- .../catalog/schema/ReadonlyConfigParser.java | 82 ++++++------ .../common/MultipleFieldOutputTransform.java | 7 + .../transform/jsonpath/ColumnConfig.java | 7 + .../JsonPathMultiCatalogTransform.java | 49 +++++++ .../transform/jsonpath/JsonPathTransform.java | 9 ++ .../jsonpath/JsonPathTransformConfig.java | 124 +++++++++++++++--- .../jsonpath/JsonPathTransformFactory.java | 8 +- 7 files changed, 219 insertions(+), 67 deletions(-) create mode 100644 seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java index ab85455b34e..64f2d6bb3da 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java @@ -109,54 +109,46 @@ private static class ColumnParser implements TableSchemaParser.ColumnParser parse(ReadonlyConfig schemaConfig) { return schemaConfig.get(TableSchemaOptions.ColumnOptions.COLUMNS).stream() .map(ReadonlyConfig::fromMap) - .map( - columnConfig -> { - String name = - columnConfig - .getOptional(TableSchemaOptions.ColumnOptions.NAME) - .orElseThrow( - () -> - new IllegalArgumentException( - "schema.columns.* config need option [name], please correct your config first")); - SeaTunnelDataType seaTunnelDataType = - columnConfig - .getOptional(TableSchemaOptions.ColumnOptions.TYPE) - .map( - column -> - SeaTunnelDataTypeConvertorUtil - .deserializeSeaTunnelDataType( - name, column)) - .orElseThrow( - () -> - new IllegalArgumentException( - "schema.columns.* config need option [type], please correct your config first")); + .map(ReadonlyConfigParser::parsePhysicalColumn) + .collect(Collectors.toList()); + } + } + + public static PhysicalColumn parsePhysicalColumn(ReadonlyConfig columnConfig) { + String name = + columnConfig + .getOptional(TableSchemaOptions.ColumnOptions.NAME) + .orElseThrow( + () -> + new IllegalArgumentException( + "schema.columns.* config need option [name], please correct your config first")); + SeaTunnelDataType seaTunnelDataType = + columnConfig + .getOptional(TableSchemaOptions.ColumnOptions.TYPE) + .map( + column -> + SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType( + name, column)) + .orElseThrow( + () -> + new IllegalArgumentException( + "schema.columns.* config need option [type], please correct your config first")); - Integer columnLength = - columnConfig.get( - TableSchemaOptions.ColumnOptions.COLUMN_LENGTH); + Integer columnLength = columnConfig.get(TableSchemaOptions.ColumnOptions.COLUMN_LENGTH); - Integer columnScale = - columnConfig.get( - TableSchemaOptions.ColumnOptions.COLUMN_SCALE); + Integer columnScale = columnConfig.get(TableSchemaOptions.ColumnOptions.COLUMN_SCALE); - Boolean nullable = - columnConfig.get(TableSchemaOptions.ColumnOptions.NULLABLE); - Object defaultValue = - columnConfig.get( - TableSchemaOptions.ColumnOptions.DEFAULT_VALUE); - String comment = - columnConfig.get(TableSchemaOptions.ColumnOptions.COMMENT); - return PhysicalColumn.of( - name, - seaTunnelDataType, - Long.valueOf(columnLength), - columnScale, - nullable, - defaultValue, - comment); - }) - .collect(Collectors.toList()); - } + Boolean nullable = columnConfig.get(TableSchemaOptions.ColumnOptions.NULLABLE); + Object defaultValue = columnConfig.get(TableSchemaOptions.ColumnOptions.DEFAULT_VALUE); + String comment = columnConfig.get(TableSchemaOptions.ColumnOptions.COMMENT); + return PhysicalColumn.of( + name, + seaTunnelDataType, + Long.valueOf(columnLength), + columnScale, + nullable, + defaultValue, + comment); } private static class ConstraintKeyParser diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java index 51e11724123..ea405cd21d8 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java @@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -86,8 +87,10 @@ protected TableSchema transformTableSchema() { builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy()); } builder.constraintKey(copiedConstraintKeys); + List deletedColumns = getDeletedColumns(); List columns = inputCatalogTable.getTableSchema().getColumns().stream() + .filter(c -> !deletedColumns.contains(c.getName())) .map(Column::copy) .collect(Collectors.toList()); @@ -157,4 +160,8 @@ protected TableIdentifier transformTableIdentifier() { } protected abstract Column[] getOutputColumns(); + + protected List getDeletedColumns() { + return Collections.emptyList(); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java index 4d56be1582f..3e1af3ccf74 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java @@ -30,6 +30,7 @@ public class ColumnConfig implements Serializable { private final String srcField; private final String destField; + private final boolean deleteSrcField; private final SeaTunnelDataType destType; private final ErrorHandleWay errorHandleWay; @@ -37,16 +38,22 @@ public class ColumnConfig implements Serializable { public ColumnConfig( String path, String srcField, + boolean deleteSrcField, String destField, SeaTunnelDataType destType, ErrorHandleWay errorHandleWay) { this.path = path; this.srcField = srcField; + this.deleteSrcField = deleteSrcField; this.destField = destField; this.destType = destType; this.errorHandleWay = errorHandleWay; } + public boolean isDeleteSrcField() { + return deleteSrcField; + } + public String getPath() { return path; } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java new file mode 100644 index 00000000000..67d24066465 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.jsonpath; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; +import java.util.Optional; + +public class JsonPathMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + + public JsonPathMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return JsonPathTransform.PLUGIN_NAME; + } + + @Override + protected Optional> buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return JsonPathTransformConfig.ofOptional(config, inputCatalogTable) + .map( + jsonPathTransformConfig -> + new JsonPathTransform(jsonPathTransformConfig, inputCatalogTable)); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java index 3e14e8488d3..bee4273619e 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import static org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.JSON_PATH_COMPILE_ERROR; @@ -198,4 +199,12 @@ protected Column[] getOutputColumns() { } return columns; } + + @Override + protected List getDeletedColumns() { + return this.config.getColumnConfigs().stream() + .filter(ColumnConfig::isDeleteSrcField) + .map(ColumnConfig::getSrcField) + .collect(Collectors.toList()); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java index 51a0d6ac34a..44e0e7cb31b 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java @@ -16,12 +16,14 @@ */ package org.apache.seatunnel.transform.jsonpath; +import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias; import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil; +import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.transform.common.CommonOptions; import org.apache.seatunnel.transform.common.ErrorHandleWay; @@ -29,10 +31,12 @@ import org.apache.commons.lang3.StringUtils; +import lombok.Data; import lombok.Getter; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -56,6 +60,12 @@ public class JsonPathTransformConfig implements Serializable { .noDefaultValue() .withDescription("JSON source field."); + public static final Option DELETE_SRC_FIELD = + Options.key("delete_src_field") + .booleanType() + .defaultValue(false) + .withDescription("JSON source field."); + public static final Option DEST_FIELD = Options.key("dest_field") .stringType() @@ -68,12 +78,42 @@ public class JsonPathTransformConfig implements Serializable { .defaultValue("string") .withDescription("output field type,default string"); - public static final Option>> COLUMNS = + public static final Option> DEST_COLUMN = + Options.key("dest_column") + .type(new TypeReference>() {}) + .noDefaultValue() + .withDescription("output field default value."); + + public static final Option>> COLUMNS = Options.key("columns") - .type(new TypeReference>>() {}) + .type(new TypeReference>>() {}) .noDefaultValue() .withDescription("columns"); + public static final Option IGNORE_ERROR = + Options.key("ignore_error") + .booleanType() + .defaultValue(false) + .withDescription("Ignore error when json path extract error."); + + public static final Option> MULTI_TABLES = + Options.key("table_transform") + .listType(TableTransforms.class) + .noDefaultValue() + .withDescription(""); + + @Data + public static class TableTransforms implements Serializable { + @JsonAlias("table_path") + private String tablePath; + + @JsonAlias("columns") + private List> columns; + + @JsonAlias("row_error_handle_way") + private ErrorHandleWay errorHandleWay; + } + private final List columnConfigs; @Getter private final ErrorHandleWay errorHandleWay; @@ -93,43 +133,93 @@ public static JsonPathTransformConfig of(ReadonlyConfig config) { COLUMNS_MUST_NOT_EMPTY, COLUMNS_MUST_NOT_EMPTY.getErrorMessage()); } ErrorHandleWay rowErrorHandleWay = config.get(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION); - List> columns = config.get(COLUMNS); + List> columns = config.get(COLUMNS); List configs = new ArrayList<>(columns.size()); - for (Map map : columns) { - checkColumnConfig(map); - String path = map.get(PATH.key()); - String srcField = map.get(SRC_FIELD.key()); - String destField = map.get(DEST_FIELD.key()); - String type = map.getOrDefault(DEST_TYPE.key(), DEST_TYPE.defaultValue()); + for (Map map : columns) { + ReadonlyConfig subConfig = ReadonlyConfig.fromMap(map); + checkColumnConfig(subConfig); + String path = subConfig.get(PATH); + String srcField = subConfig.get(SRC_FIELD); + String destField = subConfig.get(DEST_FIELD); + String type = subConfig.get(DEST_TYPE); + boolean deleteSrcField = subConfig.get(DELETE_SRC_FIELD); ErrorHandleWay columnErrorHandleWay = - Optional.ofNullable(map.get(CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key())) - .map(ErrorHandleWay::valueOf) - .orElse(null); + subConfig.get(CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION); SeaTunnelDataType dataType = SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(srcField, type); + if (!table.getTableSchema().contains(srcField)) { + throw TransformCommonError.cannotFindInputFieldError("JsonPath", srcField); + } + Column srcFieldColumn = table.getTableSchema().getColumn(srcField); + Column destFieldColumn = + subConfig.getOptional(DEST_COLUMN).isPresent() + ? ReadonlyConfigParser.parsePhysicalColumn( + ReadonlyConfig.fromMap(subConfig.get(DEST_COLUMN))) + : PhysicalColumn.of( + destField, + dataType, + srcFieldColumn.getColumnLength(), + true, + null, + null); ColumnConfig columnConfig = - new ColumnConfig(path, srcField, destField, dataType, columnErrorHandleWay); + new ColumnConfig( + path, + srcField, + deleteSrcField, + destField, + destFieldColumn, + columnErrorHandleWay); configs.add(columnConfig); } return new JsonPathTransformConfig(configs, rowErrorHandleWay); } - private static void checkColumnConfig(Map map) { - String path = map.get(PATH.key()); + private static void checkColumnConfig(ReadonlyConfig config) { + String path = config.get(PATH); if (StringUtils.isBlank(path)) { throw new TransformException( PATH_MUST_NOT_EMPTY, PATH_MUST_NOT_EMPTY.getErrorMessage()); } - String srcField = map.get(SRC_FIELD.key()); + String srcField = config.get(SRC_FIELD); if (StringUtils.isBlank(srcField)) { throw new TransformException( SRC_FIELD_MUST_NOT_EMPTY, SRC_FIELD_MUST_NOT_EMPTY.getErrorMessage()); } - String destField = map.get(DEST_FIELD.key()); + String destField = config.get(DEST_FIELD); if (StringUtils.isBlank(destField)) { throw new TransformException( DEST_FIELD_MUST_NOT_EMPTY, DEST_FIELD_MUST_NOT_EMPTY.getErrorMessage()); } } + + public static Optional ofOptional( + ReadonlyConfig config, CatalogTable catalogTable) { + String tablePath = catalogTable.getTableId().toTablePath().getFullName(); + if (null != config.get(MULTI_TABLES)) { + return config.get(MULTI_TABLES).stream() + .filter(tableTransforms -> tableTransforms.getTablePath().equals(tablePath)) + .map( + tableTransforms -> + of( + ReadonlyConfig.fromMap( + new HashMap() { + { + put( + "columns", + tableTransforms.getColumns()); + put( + CommonOptions + .ROW_ERROR_HANDLE_WAY_OPTION + .key(), + tableTransforms + .getErrorHandleWay()); + } + }), + catalogTable)) + .findFirst(); + } + return Optional.of(of(config, catalogTable)); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java index def17f2564d..e70eaec05bc 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; @@ -38,16 +37,15 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(JsonPathTransformConfig.COLUMNS) + .optional(JsonPathTransformConfig.COLUMNS) + .optional(JsonPathTransformConfig.MULTI_TABLES) .optional(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTables().get(0); ReadonlyConfig options = context.getOptions(); - JsonPathTransformConfig jsonPathTransformConfig = JsonPathTransformConfig.of(options); - return () -> new JsonPathTransform(jsonPathTransformConfig, catalogTable); + return () -> new JsonPathMultiCatalogTransform(context.getCatalogTables(), options); } }