Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-workspace-seatunnel] upgrade some seatunnel connectors to 2.3.5 version #715

Merged
merged 17 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/release-manual-docker-seatunnel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ on:
seatunnelVersion:
description: 'seatunnel version'
required: true
default: '2.3.4'
default: '2.3.5'
type: choice
options:
- 2.3.4
- 2.3.5
flinkVersion:
description: 'flink version'
required: true
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Scaleph is driven by personal interest and evolves actively through faithful dev
* Data Integration
* Web-ui click-and-drag data integration ways backended by [Apache SeaTunnel](https://seatunnel.apache.org/) on Flink engine.

* Support the latest 2.3.4 V2 out-of-the-box connectors and transforms.
* Support the latest 2.3.5 V2 out-of-the-box connectors and transforms.

* DataSource management.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public enum FlinkImageMapping {
SQL_1_17(FlinkJobType.SQL, OperatorFlinkVersion.v1_17, FlinkVersionMapping.V_1_17, "ghcr.io/flowerfine/scaleph-sql-template:1.17"),
SQL_1_18(FlinkJobType.SQL, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "ghcr.io/flowerfine/scaleph-sql-template:1.18"),

SEATUNNEL_1_15(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_15, FlinkVersionMapping.V_1_15, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.4-flink-1.15"),
SEATUNNEL_1_16(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_16, FlinkVersionMapping.V_1_16, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.4-flink-1.16"),
SEATUNNEL_1_16(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_16, FlinkVersionMapping.V_1_16, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.5-flink-1.16"),
FLINK_CDC_1_18(FlinkJobType.FLINK_CDC, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "ghcr.io/flowerfine/scaleph-flink-cdc:3.0.0-flink-1.18"),
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public enum SeaTunnelPluginMapping {
SINK_REDIS(SEATUNNEL, SINK, REDIS, "connector-redis", BETA),
SOURCE_ELASTICSEARCH(SEATUNNEL, SOURCE, ELASTICSEARCH, "connector-elasticsearch", UNKNOWN, BATCH, SCHEMA_PROJECTION),
SINK_ELASTICSEARCH(SEATUNNEL, SINK, ELASTICSEARCH, "connector-elasticsearch", GA, CDC),
SOURCE_EASYSEARCH(SEATUNNEL, SOURCE, EASYSEARCH, "connector-easysearch", UNKNOWN, BATCH, SCHEMA_PROJECTION),
SINK_EASYSEARCH(SEATUNNEL, SINK, EASYSEARCH, "connector-easysearch", UNKNOWN, CDC),
SOURCE_MONGODB(SEATUNNEL, SOURCE, MONGODB, "connector-mongodb", BETA, BATCH, EXACTLY_ONCE, SCHEMA_PROJECTION, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
SINK_MONGODB(SEATUNNEL, SINK, MONGODB, "connector-mongodb", BETA, EXACTLY_ONCE, CDC),
SOURCE_AMAZON_DYNAMODB(SEATUNNEL, SOURCE, AMAZON_DYNAMODB, "connector-amazondynamodb", BETA, BATCH, SCHEMA_PROJECTION),
Expand Down Expand Up @@ -120,6 +122,7 @@ public enum SeaTunnelPluginMapping {
SINK_STARROCKS(SEATUNNEL, SINK, STARROCKS, "connector-starrocks", ALPHA),
SOURCE_HUDI(SEATUNNEL, SOURCE, HUDI, "connector-hudi", BETA, BATCH, EXACTLY_ONCE, PARALLELISM),
SOURCE_ICEBERG(SEATUNNEL, SOURCE, ICEBERG, "connector-iceberg", BETA, BATCH, STREAM, EXACTLY_ONCE, SCHEMA_PROJECTION, PARALLELISM),
SINK_ICEBERG(SEATUNNEL, SINK, ICEBERG, "connector-iceberg", UNKNOWN, CDC),
SOURCE_PAIMON(SEATUNNEL, SOURCE, PAIMON, "connector-paimon", UNKNOWN, BATCH),
SINK_PAIMON(SEATUNNEL, SINK, PAIMON, "connector-paimon", UNKNOWN, EXACTLY_ONCE),
SINK_S3REDSHIFT(SEATUNNEL, SINK, S3REDSHIFT, "connector-s3-redshift", GA, EXACTLY_ONCE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public enum SeaTunnelPluginName implements DictInstance {
JDBC("Jdbc", "Jdbc"),
REDIS("Redis", "Redis"),
ELASTICSEARCH("Elasticsearch", "Elasticsearch"),
EASYSEARCH("Easysearch", "Easysearch"),
MONGODB("MongoDB", "MongoDB"),
AMAZON_DYNAMODB("AmazonDynamodb", "AmazonDynamodb"),
CASSANDRA("Cassandra", "Cassandra"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum SeaTunnelVersion implements DictInstance {

V_2_3_3("2.3.3", "2.3.3"),
V_2_3_4("2.3.4", "2.3.4"),
V_2_3_5("2.3.5", "2.3.5"),
;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.util.StringUtils;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -49,7 +50,7 @@ public class DorisDataSource extends AbstractDataSource {
@Schema(description = "password")
private String password;

@NotBlank
@NotNull
@Schema(description = "fenodes query port")
private Integer queryPort;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public enum CDCSourceProperties {
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> TABLE_CONFIG = new PropertyDescriptor.Builder()
.name("table-names")
.description("Table name of the database to monitor.")
.name("table-names-config")
.description("Table config list.")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
Expand All @@ -86,6 +86,9 @@ public enum CDCSourceProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

/**
* fixme support timestamp?
*/
public static final PropertyDescriptor<Long> STARTUP_TIMESTAMP = new PropertyDescriptor.Builder()
.name("startup.timestamp")
.description("Start from the specified epoch timestamp (in milliseconds).")
Expand Down Expand Up @@ -143,6 +146,9 @@ public enum CDCSourceProperties {
.addValidator(Validators.POSITIVE_LONG_VALIDATOR)
.validateAndBuild();

/**
* fixme does all jdbc cdc support this?
*/
public static final PropertyDescriptor<Integer> INCREMENTAL_PARALLELISM = new PropertyDescriptor.Builder()
.name("incremental.parallelism")
.description("The number of parallel readers in the incremental phase.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.cdc.mongodb.source;

import cn.sliew.scaleph.plugin.framework.property.*;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.List;

public enum MongoDBCDCSourceProperties {
;
Expand Down Expand Up @@ -49,7 +50,7 @@ public enum MongoDBCDCSourceProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> DATABASE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor<List<String>> DATABASE = new PropertyDescriptor.Builder()
.name("database")
.description("Name of the database to watch for changes")
.type(PropertyType.OBJECT)
Expand All @@ -58,7 +59,7 @@ public enum MongoDBCDCSourceProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> COLLECTION = new PropertyDescriptor.Builder()
public static final PropertyDescriptor<List<String>> COLLECTION = new PropertyDescriptor.Builder()
.name("collection")
.description("Name of the collection in the database to watch for changes")
.type(PropertyType.OBJECT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.framework.property.PropertyType;
import cn.sliew.scaleph.plugin.framework.property.Validators;
import com.fasterxml.jackson.databind.JsonNode;

public enum DynamoDBSinkProperties {
;
Expand All @@ -35,12 +34,15 @@ public enum DynamoDBSinkProperties {
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

/**
* fixme seatunnel support this?
*/
public static final PropertyDescriptor<Integer> BATCH_INTERVAL_MS = new PropertyDescriptor.Builder()
.name("batch_interval_ms")
.description("The batch interval of Amazon DynamoDB")
.type(PropertyType.INT)
.parser(Parsers.INTEGER_PARSER)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public enum ElasticsearchSourceProperties {
.defaultValue(100)
.validateAndBuild();

/**
* fixme support schema?
*/
public static final PropertyDescriptor<JsonNode> SCHEMA = new PropertyDescriptor.Builder()
.name("schema")
.description("The structure of the data, including field names and field types")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public enum FakeProperties {
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> ROWS = new PropertyDescriptor.Builder<JsonNode>()
.name("rows")
.description(
"The row list of fake data output per degree of parallelism")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> SPLIT_NUM = new PropertyDescriptor.Builder<Integer>()
.name("split.num")
.description("the number of splits generated by the enumerator for each degree of parallelism")
Expand Down Expand Up @@ -106,15 +115,6 @@ public enum FakeProperties {
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> ROWS = new PropertyDescriptor.Builder<JsonNode>()
.name("rows")
.description(
"The row list of fake data output per degree of parallelism")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> STRING_FAKE_MODE = new PropertyDescriptor.Builder()
.name("string.fake.mode")
.description("The fake mode of generating string data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ public FakeSourcePlugin() {
props.add(TABLES_CONFIGS);
props.add(SCHEMA);
props.add(ROW_NUM);
props.add(ROWS);
props.add(SPLIT_NUM);
props.add(SPLIT_READ_INTERVAL);
props.add(MAP_SIZE);
props.add(ARRAY_SIZE);
props.add(BYTES_SIZE);
props.add(STRING_SIZE);
props.add(ROWS);

props.add(STRING_FAKE_MODE);
props.add(STRING_TEMPLATE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.iceberg;

import cn.sliew.scaleph.plugin.framework.property.*;
import com.fasterxml.jackson.databind.JsonNode;

public enum IcebergProperties {
;

public static final PropertyDescriptor<String> CATALOG_NAME = new PropertyDescriptor.Builder<String>()
.name("catalog_name")
.description("catalog name")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> NAMESPACE = new PropertyDescriptor.Builder<String>()
.name("namespace")
.description("database name in the backend catalog")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> TABLE = new PropertyDescriptor.Builder<String>()
.name("table")
.description("iceberg table name in the backend catalog")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> ICEBERG_CATALOG_CONFIG = new PropertyDescriptor.Builder()
.name("iceberg.catalog.config")
.description("Specify the properties for initializing the Iceberg catalog.")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.properties(Property.Required)
.validateAndBuild();

public static final PropertyDescriptor<String> ICEBERG_HADOOP_CONF_PATH = new PropertyDescriptor.Builder()
.name("iceberg.hadoop-conf-path")
.description("The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files.")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> HADOOP_CONFIG = new PropertyDescriptor.Builder()
.name("hadoop.config")
.description("Properties passed through to the Hadoop configuration.")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.validateAndBuild();

public static final PropertyDescriptor<Boolean> CASE_SENSITIVE = new PropertyDescriptor.Builder<Boolean>()
.name("case_sensitive")
.description("If data columns where selected via fields(Collection), controls whether the match to the schema will be done with case sensitivity.")
.type(PropertyType.BOOLEAN)
.parser(Parsers.BOOLEAN_PARSER)
.addValidator(Validators.BOOLEAN_VALIDATOR)
.validateAndBuild();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.iceberg.sink;

import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginMapping;
import cn.sliew.scaleph.plugin.framework.core.PluginInfo;
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.seatunnel.flink.SeaTunnelConnectorPlugin;
import cn.sliew.scaleph.plugin.seatunnel.flink.env.CommonProperties;
import com.google.auto.service.AutoService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.SaveModeProperties.DATA_SAVE_MODE;
import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.SaveModeProperties.SCHEMA_SAVE_MODE;
import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.iceberg.IcebergProperties.*;
import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.iceberg.sink.IcebergSinkProperties.*;

@AutoService(SeaTunnelConnectorPlugin.class)
public class IcebergSinkPlugin extends SeaTunnelConnectorPlugin {

public IcebergSinkPlugin() {
this.pluginInfo = new PluginInfo(getIdentity(),
"Apache Iceberg is an open table format for huge analytic datasets",
IcebergSinkPlugin.class.getName());

final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CATALOG_NAME);
props.add(NAMESPACE);
props.add(TABLE);
props.add(ICEBERG_CATALOG_CONFIG);
props.add(ICEBERG_HADOOP_CONF_PATH);
props.add(HADOOP_CONFIG);
props.add(CASE_SENSITIVE);
props.add(ICEBERG_TABEL_WRITE_PROPS);
props.add(ICEBERG_TABEL_AUTO_CREATE_PROPS);
props.add(ICEBERG_TABEL_PRIMARY_KEYS);
props.add(ICEBERG_TABEL_PARTITION_KEYS);
props.add(ICEBERG_TABEL_SCHEMA_EVOLUTION_ENABLED);
props.add(ICEBERG_TABEL_UPSERT_MODE_ENABLED);
props.add(ICEBERG_TABEL_COMMIT_BRANCH);
props.add(SCHEMA_SAVE_MODE);
props.add(DATA_SAVE_MODE);
props.add(CommonProperties.PARALLELISM);
props.add(CommonProperties.RESULT_TABLE_NAME);
supportedProperties = Collections.unmodifiableList(props);
}

@Override
protected SeaTunnelPluginMapping getPluginMapping() {
return SeaTunnelPluginMapping.SINK_ICEBERG;
}
}
Loading
Loading