Skip to content

Commit

Permalink
[Improve][API] Unified tables_configs and table_list (#8100)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 authored Nov 25, 2024
1 parent 3d3f7ba commit 84c0b8d
Show file tree
Hide file tree
Showing 30 changed files with 295 additions and 116 deletions.
40 changes: 40 additions & 0 deletions docs/en/concept/schema-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,46 @@ constraintKeys = [
| INDEX_KEY | key |
| UNIQUE_KEY | unique key |

## Multi table schemas

```
tables_configs = [
{
schema {
table = "database.schema.table1"
schema_first = false
comment = "comment"
columns = [
...
]
primaryKey {
...
}
constraintKeys {
...
}
}
},
{
schema = {
table = "database.schema.table2"
schema_first = false
comment = "comment"
columns = [
...
]
primaryKey {
...
}
constraintKeys {
...
}
}
}
]
```

## How to use schema

### Recommended
Expand Down
18 changes: 18 additions & 0 deletions docs/en/connector-v2/source/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ Source plugin common parameters, please refer to [Source Common Options](../sour
```

### Example 2: Multiple tables
> Note: Hive is a structured data source and should be use 'table_list', and 'tables_configs' will be removed in the future.
```bash

Hive {
table_list = [
{
table_name = "default.seatunnel_orc_1"
metastore_uri = "thrift://namenode001:9083"
},
{
table_name = "default.seatunnel_orc_2"
metastore_uri = "thrift://namenode001:9083"
}
]
}

```

```bash

Expand Down
59 changes: 59 additions & 0 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,65 @@ source {

> This is written to the same pg table according to different formats and topics of parsing kafka Perform upsert operations based on the id
> Note: Kafka is an unstructured data source and should be use 'tables_configs', and 'table_list' will be removed in the future.
```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
Kafka {
bootstrap.servers = "kafka_e2e:9092"
tables_configs = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = ogg_json
},
{
topic = "test-cdc_mds"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = canal_json
}
]
}
}
sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
table = public.sink
primary_keys = ["id"]
}
}
```

```hocon
env {
Expand Down
40 changes: 40 additions & 0 deletions docs/zh/concept/schema-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,46 @@ constraintKeys = [
| INDEX_KEY ||
| UNIQUE_KEY | 唯一键 |

## 多表Schema

```
tables_configs = [
{
schema {
table = "database.schema.table1"
schema_first = false
comment = "comment"
columns = [
...
]
primaryKey {
...
}
constraintKeys {
...
}
}
},
{
schema = {
table = "database.schema.table2"
schema_first = false
comment = "comment"
columns = [
...
]
primaryKey {
...
}
constraintKeys {
...
}
}
}
]
```

## 如何使用schema

### 推荐
Expand Down
59 changes: 59 additions & 0 deletions docs/zh/connector-v2/source/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,65 @@ source {

> 根据不同的 Kafka 主题和格式解析数据,并基于 ID 执行 upsert 操作。
> 注意: Kafka是一个非结构化数据源,应该使用`tables_configs`,将来会删除`table_list`
```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
Kafka {
bootstrap.servers = "kafka_e2e:9092"
tables_configs = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = ogg_json
},
{
topic = "test-cdc_mds"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = canal_json
}
]
}
}
sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
table = public.sink
primary_keys = ["id"]
}
}
```

```hocon
env {
execution.parallelism = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.api.table.catalog;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

Expand Down Expand Up @@ -56,4 +58,12 @@ public interface CatalogOptions {
.withDescription(
"The table names RegEx of the database to capture."
+ "The table name needs to include the database name, for example: database_.*\\.table_.*");

Option<List<Map<String, Object>>> TABLE_LIST =
Options.key("table_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription(
"SeaTunnel Multi Table Schema, acts on structed data sources. "
+ "such as jdbc, paimon, doris, etc");
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public static class TableIdentifierOptions {
.noDefaultValue()
.withDescription("SeaTunnel Schema");

public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
Options.key("tables_configs")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription(
"SeaTunnel Multi Table Schema, acts on unstructed data sources. "
+ "such as file, assert, mongodb, etc");

// We should use ColumnOptions instead of FieldOptions
@Deprecated
public static class FieldOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.util.List;
import java.util.Map;

public class AssertConfig {
Expand Down Expand Up @@ -85,13 +84,6 @@ public static class TableIdentifierRule {
.withDescription(
"Rule definition of user's available data. Each rule represents one field validation or row num validation.");

public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
Options.key("tables_configs")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription(
"Table configuration for the sink. Each table configuration contains the table name and the rules for the table.");

public static final Option<String> TABLE_PATH =
Options.key("table_path")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TABLE_CONFIGS;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_RULES;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_CONFIGS;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_PATH;

public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@

public class FakeOption {

public static final Option<List<Map<String, Object>>> TABLES_CONFIGS =
Options.key("tables_configs")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("The multiple table config list of fake source");

public static final Option<List<Map<String, Object>>> ROWS =
Options.key("rows")
.type(new TypeReference<List<Map<String, Object>>>() {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.fake.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;

import org.apache.commons.collections4.CollectionUtils;

Expand All @@ -36,7 +37,7 @@ public class MultipleTableFakeSourceConfig implements Serializable {
@Getter private List<FakeConfig> fakeConfigs;

public MultipleTableFakeSourceConfig(ReadonlyConfig fakeSourceRootConfig) {
if (fakeSourceRootConfig.getOptional(FakeOption.TABLES_CONFIGS).isPresent()) {
if (fakeSourceRootConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent()) {
parseFromConfigs(fakeSourceRootConfig);
} else {
parseFromConfig(fakeSourceRootConfig);
Expand All @@ -56,7 +57,7 @@ public MultipleTableFakeSourceConfig(ReadonlyConfig fakeSourceRootConfig) {

private void parseFromConfigs(ReadonlyConfig readonlyConfig) {
List<ReadonlyConfig> readonlyConfigs =
readonlyConfig.getOptional(FakeOption.TABLES_CONFIGS).get().stream()
readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).get().stream()
.map(ReadonlyConfig::fromMap)
.collect(Collectors.toList());
// Use the config outside if it's not set in sub config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_FAKE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_TEMPLATE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TABLES_CONFIGS;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_HOUR_TEMPLATE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_MINUTE_TEMPLATE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_SECOND_TEMPLATE;
Expand All @@ -72,7 +71,7 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.optional(TABLES_CONFIGS)
.optional(TableSchemaOptions.TABLE_CONFIGS)
.optional(TableSchemaOptions.SCHEMA)
.optional(STRING_FAKE_MODE)
.conditional(STRING_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, STRING_TEMPLATE)
Expand Down
Loading

0 comments on commit 84c0b8d

Please sign in to comment.