From 0ce17121ea8ea26be43a55e6e0728b84c80aebda Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sat, 16 Sep 2023 12:02:16 +0800 Subject: [PATCH] [Feature] Support flow control in zeta (#5502) --- docs/en/concept/speed-limit.md | 42 ++++ docs/sidebars.js | 3 +- .../api/common/metrics/MetricNames.java | 5 +- .../seatunnel/api/env/EnvCommonOptions.java | 13 ++ .../seatunnel/api/env/EnvOptionRule.java | 2 + .../api/table/type/SeaTunnelRow.java | 179 +++++++++++++++++- .../api/table/type/SeaTunnelRowTest.java | 90 +++++++++ .../starter/flowcontrol/FlowControlGate.java | 42 ++++ .../flowcontrol/FlowControlStrategy.java | 50 +++++ .../flowcontrol/FlowControlGateTest.java | 112 +++++++++++ .../dag/physical/PhysicalPlanGenerator.java | 5 +- .../server/task/SeaTunnelSourceCollector.java | 69 ++++++- .../server/task/SourceSeaTunnelTask.java | 48 ++++- .../server/task/flow/SinkFlowLifeCycle.java | 14 ++ 14 files changed, 667 insertions(+), 7 deletions(-) create mode 100644 docs/en/concept/speed-limit.md create mode 100644 seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java create mode 100644 seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java create mode 100644 seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java create mode 100644 seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java diff --git a/docs/en/concept/speed-limit.md b/docs/en/concept/speed-limit.md new file mode 100644 index 00000000000..0bb451b38b2 --- /dev/null +++ b/docs/en/concept/speed-limit.md @@ -0,0 +1,42 @@ +# Speed Control + +## Introduction + +The SeaTunnel provides a powerful speed control feature that allows you to manage the rate at which data is synchronized. +This functionality is essential when you need to ensure efficient and controlled data transfer between systems. +The speed control is primarily governed by two key parameters: `read_limit.rows_per_second` and `read_limit.bytes_per_second`. +This document will guide you through the usage of these parameters and how to leverage them effectively. + +## Support Those Engines + +> SeaTunnel Zeta
+ +## Configuration + +To use the speed control feature, you need to configure the `read_limit.rows_per_second` or `read_limit.bytes_per_second` parameters in your job config. + +Example env config in your config file: + +```hocon +env { + job.mode=STREAMING + job.name=SeaTunnel_Job + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} +source { + MySQL-CDC { + // ignore... + } +} +transform { +} +sink { + Console { + } +} +``` + +We have placed `read_limit.bytes_per_second` and `read_limit.rows_per_second` in the `env` parameters, completing the speed control configuration. +You can configure both of these parameters simultaneously or choose to configure only one of them. The value of each `value` represents the maximum rate at which each thread is restricted. +Therefore, when configuring the respective values, please take into account the parallelism of your tasks. diff --git a/docs/sidebars.js b/docs/sidebars.js index a8f2527413b..764cb34f57a 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -89,7 +89,8 @@ const sidebars = { "concept/config", "concept/connector-v2-features", 'concept/schema-feature', - 'concept/JobEnvConfig' + 'concept/JobEnvConfig', + 'concept/speed-limit' ] }, "Connector-v2-release-state", diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java index a3511d92b42..b1fc60e0f16 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java @@ -26,8 +26,11 @@ private MetricNames() {} public static final String RECEIVED_BATCHES = "receivedBatches"; public static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount"; - + public static final String SOURCE_RECEIVED_BYTES = "SourceReceivedBytes"; public static final String SOURCE_RECEIVED_QPS = "SourceReceivedQPS"; + public static final String SOURCE_RECEIVED_BYTES_PER_SECONDS = "SourceReceivedBytesPerSeconds"; public static final String SINK_WRITE_COUNT = "SinkWriteCount"; + public static final String SINK_WRITE_BYTES = "SinkWriteBytes"; public static final String SINK_WRITE_QPS = "SinkWriteQPS"; + public static final String SINK_WRITE_BYTES_PER_SECONDS = "SinkWriteBytesPerSeconds"; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java index d076cd5367b..0c010bfb846 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java @@ -51,6 +51,19 @@ public interface EnvCommonOptions { .withDescription( "The interval (in milliseconds) between two consecutive checkpoints."); + Option READ_LIMIT_ROW_PER_SECOND = + Options.key("read_limit.rows_per_second") + .intType() + .noDefaultValue() + .withDescription( + "The each parallelism row limit per second for read data from source."); + + Option READ_LIMIT_BYTES_PER_SECOND = + Options.key("read_limit.bytes_per_second") + .intType() + .noDefaultValue() + .withDescription( + "The each parallelism bytes limit per second for read data from source."); Option CHECKPOINT_TIMEOUT = Options.key("checkpoint.timeout") .longType() diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java index 09310f080c5..d4caa710d89 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java @@ -31,6 +31,8 @@ public static OptionRule getEnvOptionRules() { EnvCommonOptions.JARS, EnvCommonOptions.CHECKPOINT_INTERVAL, EnvCommonOptions.CHECKPOINT_TIMEOUT, + EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND, + EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND, EnvCommonOptions.CUSTOM_PARAMETERS) .build(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 1966d3142c5..bd05e0808d8 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.Map; import java.util.Objects; /** SeaTunnel row type. */ @@ -33,6 +34,8 @@ public final class SeaTunnelRow implements Serializable { /** The array to store the actual internal format values. */ private final Object[] fields; + private volatile int size; + public SeaTunnelRow(int arity) { this.fields = new Object[arity]; } @@ -97,6 +100,180 @@ public boolean isNullAt(int pos) { return this.fields[pos] == null; } + public int getBytesSize(SeaTunnelRowType rowType) { + if (size == 0) { + int s = 0; + for (int i = 0; i < fields.length; i++) { + s += getBytesForValue(fields[i], rowType.getFieldType(i)); + } + size = s; + } + return size; + } + + /** faster version of {@link #getBytesSize(SeaTunnelRowType)}. */ + private int getBytesForValue(Object v, SeaTunnelDataType dataType) { + if (v == null) { + return 0; + } + SqlType sqlType = dataType.getSqlType(); + switch (sqlType) { + case STRING: + return ((String) v).length(); + case BOOLEAN: + case TINYINT: + return 1; + case SMALLINT: + return 2; + case INT: + case FLOAT: + return 4; + case BIGINT: + case DOUBLE: + return 8; + case DECIMAL: + return 36; + case NULL: + return 0; + case BYTES: + return ((byte[]) v).length; + case DATE: + return 24; + case TIME: + return 12; + case TIMESTAMP: + return 48; + case ARRAY: + return getBytesForArray(v, ((ArrayType) dataType).getElementType()); + case MAP: + int size = 0; + MapType mapType = ((MapType) dataType); + for (Map.Entry entry : ((Map) v).entrySet()) { + size += + getBytesForValue(entry.getKey(), mapType.getKeyType()) + + getBytesForValue(entry.getValue(), mapType.getValueType()); + } + return size; + case ROW: + int rowSize = 0; + SeaTunnelRowType rowType = ((SeaTunnelRowType) dataType); + SeaTunnelDataType[] types = rowType.getFieldTypes(); + SeaTunnelRow row = (SeaTunnelRow) v; + for (int i = 0; i < types.length; i++) { + rowSize += getBytesForValue(row.fields[i], types[i]); + } + return rowSize; + default: + throw new UnsupportedOperationException("Unsupported type: " + sqlType); + } + } + + private int getBytesForArray(Object v, BasicType dataType) { + switch (dataType.getSqlType()) { + case STRING: + int s = 0; + for (String i : ((String[]) v)) { + s += i.length(); + } + return s; + case BOOLEAN: + return ((Boolean[]) v).length; + case TINYINT: + return ((Byte[]) v).length; + case SMALLINT: + return ((Short[]) v).length * 2; + case INT: + return ((Integer[]) v).length * 4; + case FLOAT: + return ((Float[]) v).length * 4; + case BIGINT: + return ((Long[]) v).length * 8; + case DOUBLE: + return ((Double[]) v).length * 8; + case NULL: + default: + return 0; + } + } + + public int getBytesSize() { + if (size == 0) { + int s = 0; + for (Object field : fields) { + s += getBytesForValue(field); + } + size = s; + } + return size; + } + + private int getBytesForValue(Object v) { + if (v == null) { + return 0; + } + String clazz = v.getClass().getSimpleName(); + switch (clazz) { + case "String": + return ((String) v).length(); + case "Boolean": + case "Byte": + return 1; + case "Short": + return 2; + case "Integer": + case "Float": + return 4; + case "Long": + case "Double": + return 8; + case "BigDecimal": + return 36; + case "byte[]": + return ((byte[]) v).length; + case "LocalDate": + return 24; + case "LocalTime": + return 12; + case "LocalDateTime": + return 48; + case "String[]": + int s = 0; + for (String i : ((String[]) v)) { + s += i.length(); + } + return s; + case "Boolean[]": + return ((Boolean[]) v).length; + case "Byte[]": + return ((Byte[]) v).length; + case "Short[]": + return ((Short[]) v).length * 2; + case "Integer[]": + return ((Integer[]) v).length * 4; + case "Long[]": + return ((Long[]) v).length * 8; + case "Float[]": + return ((Float[]) v).length * 4; + case "Double[]": + return ((Double[]) v).length * 8; + case "HashMap": + int size = 0; + for (Map.Entry entry : ((Map) v).entrySet()) { + size += getBytesForValue(entry.getKey()) + getBytesForValue(entry.getValue()); + } + return size; + case "SeaTunnelRow": + int rowSize = 0; + SeaTunnelRow row = (SeaTunnelRow) v; + for (int i = 0; i < row.fields.length; i++) { + rowSize += getBytesForValue(row.fields[i]); + } + return rowSize; + default: + throw new UnsupportedOperationException("Unsupported type: " + clazz); + } + } + @Override public boolean equals(Object o) { if (this == o) { @@ -106,7 +283,7 @@ public boolean equals(Object o) { return false; } SeaTunnelRow that = (SeaTunnelRow) o; - return tableId == that.tableId + return Objects.equals(tableId, that.tableId) && kind == that.kind && Arrays.deepEquals(fields, that.fields); } diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java new file mode 100644 index 00000000000..eaad7f95767 --- /dev/null +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; + +public class SeaTunnelRowTest { + + @Test + void testForRowSize() { + Map map = new HashMap<>(); + map.put( + "key1", + new SeaTunnelRow( + new Object[] { + 1, "test", 1L, new BigDecimal("3333.333"), + })); + map.put( + "key2", + new SeaTunnelRow( + new Object[] { + 1, "test", 1L, new BigDecimal("3333.333"), + })); + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + 1, + "test", + 1L, + map, + new BigDecimal("3333.333"), + new String[] {"test2", "test", "3333.333"} + }); + + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"f0", "f1", "f2", "f3", "f4", "f5"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, + BasicType.STRING_TYPE, + BasicType.LONG_TYPE, + new MapType<>( + BasicType.STRING_TYPE, + new SeaTunnelRowType( + new String[] {"f0", "f1", "f2", "f3"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, + BasicType.STRING_TYPE, + BasicType.LONG_TYPE, + new DecimalType(10, 3) + })), + new DecimalType(10, 3), + ArrayType.STRING_ARRAY_TYPE + }); + + Assertions.assertEquals(181, row.getBytesSize(rowType)); + + SeaTunnelRow row2 = + new SeaTunnelRow( + new Object[] { + 1, + "test", + 1L, + map, + new BigDecimal("3333.333"), + new String[] {"test2", "test", "3333.333"} + }); + Assertions.assertEquals(181, row2.getBytesSize()); + } +} diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java new file mode 100644 index 00000000000..10b23013290 --- /dev/null +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.core.starter.flowcontrol; + +import org.apache.seatunnel.shade.com.google.common.util.concurrent.RateLimiter; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +public class FlowControlGate { + + private final RateLimiter bytesRateLimiter; + private final RateLimiter countRateLimiter; + + private FlowControlGate(FlowControlStrategy flowControlStrategy) { + this.bytesRateLimiter = RateLimiter.create(flowControlStrategy.getBytesPerSecond()); + this.countRateLimiter = RateLimiter.create(flowControlStrategy.getCountPreSecond()); + } + + public void audit(SeaTunnelRow row) { + bytesRateLimiter.acquire(row.getBytesSize()); + countRateLimiter.acquire(); + } + + public static FlowControlGate create(FlowControlStrategy flowControlStrategy) { + return new FlowControlGate(flowControlStrategy); + } +} diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java new file mode 100644 index 00000000000..2547d7061e2 --- /dev/null +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.core.starter.flowcontrol; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class FlowControlStrategy { + + int bytesPerSecond; + int countPreSecond; + + public FlowControlStrategy(int bytesPerSecond, int countPreSecond) { + if (bytesPerSecond <= 0 || countPreSecond <= 0) { + throw new IllegalArgumentException( + "bytesPerSecond and countPreSecond must be positive"); + } + this.bytesPerSecond = bytesPerSecond; + this.countPreSecond = countPreSecond; + } + + public static FlowControlStrategy of(int bytesPerSecond, int countPreSecond) { + return new FlowControlStrategy(bytesPerSecond, countPreSecond); + } + + public static FlowControlStrategy ofBytes(int bytesPerSecond) { + return new FlowControlStrategy(bytesPerSecond, Integer.MAX_VALUE); + } + + public static FlowControlStrategy ofCount(int countPreSecond) { + return new FlowControlStrategy(Integer.MAX_VALUE, countPreSecond); + } +} diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java new file mode 100644 index 00000000000..1bcb695bf52 --- /dev/null +++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.core.starter.flowcontrol; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FlowControlGateTest { + + private static final int rowSize = 181; + + @Test + public void testWithBytes() { + Clock clock = Clock.systemDefaultZone(); + FlowControlGate flowControlGate = FlowControlGate.create(FlowControlStrategy.ofBytes(100)); + List rows = getRows(10); + long start = clock.millis(); + for (SeaTunnelRow row : rows) { + flowControlGate.audit(row); + } + long end = clock.millis(); + long useTime = rowSize * 10 / 100 * 1000; + + Assertions.assertTrue(end - start > useTime * 0.8 && end - start < useTime * 1.2); + } + + @Test + public void testWithCount() { + Clock clock = Clock.systemDefaultZone(); + FlowControlGate flowControlGate = FlowControlGate.create(FlowControlStrategy.ofCount(2)); + List rows = getRows(10); + long start = clock.millis(); + for (SeaTunnelRow row : rows) { + flowControlGate.audit(row); + } + long end = clock.millis(); + long useTime = 10 / 2 * 1000; + + Assertions.assertTrue(end - start > useTime * 0.8 && end - start < useTime * 1.2); + } + + @Test + public void testWithBytesAndCount() { + Clock clock = Clock.systemDefaultZone(); + FlowControlGate flowControlGate = FlowControlGate.create(FlowControlStrategy.of(100, 2)); + List rows = getRows(10); + long start = clock.millis(); + for (SeaTunnelRow row : rows) { + flowControlGate.audit(row); + } + long end = clock.millis(); + long useTime = rowSize * 10 / 100 * 1000; + + Assertions.assertTrue(end - start > useTime * 0.8 && end - start < useTime * 1.2); + } + + /** return row list with size, each row size is 181 */ + private List getRows(int size) { + Map map = new HashMap<>(); + map.put( + "key1", + new SeaTunnelRow( + new Object[] { + 1, "test", 1L, new BigDecimal("3333.333"), + })); + map.put( + "key2", + new SeaTunnelRow( + new Object[] { + 1, "test", 1L, new BigDecimal("3333.333"), + })); + + List rows = new ArrayList<>(); + for (int i = 0; i < size; i++) { + rows.add( + new SeaTunnelRow( + new Object[] { + 1, + "test", + 1L, + map, + new BigDecimal("3333.333"), + new String[] {"test2", "test", "3333.333"} + })); + } + return rows; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java index c6173cf0647..5d0f571d497 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java @@ -555,7 +555,10 @@ private List getSourceTask( (PhysicalExecutionFlow< SourceAction, SourceConfig>) - f); + f, + jobImmutableInformation + .getJobConfig() + .getEnvOptions()); } else { return new TransformSeaTunnelTask( jobImmutableInformation diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index 0514d83c86d..6e44089e8fc 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -22,15 +22,28 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; +import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; +import org.apache.seatunnel.api.table.type.MultipleRowType; import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate; +import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS; @@ -48,22 +61,65 @@ public class SeaTunnelSourceCollector implements Collector { private final Counter sourceReceivedCount; private final Meter sourceReceivedQPS; + private final Counter sourceReceivedBytes; + + private final Meter sourceReceivedBytesPerSeconds; private volatile boolean emptyThisPollNext; + private final DataTypeChangeEventHandler dataTypeChangeEventHandler = + new DataTypeChangeEventDispatcher(); + private Map rowTypeMap = new HashMap<>(); + private SeaTunnelDataType rowType; + private FlowControlGate flowControlGate; public SeaTunnelSourceCollector( Object checkpointLock, List>> outputs, - MetricsContext metricsContext) { + MetricsContext metricsContext, + FlowControlStrategy flowControlStrategy, + SeaTunnelDataType rowType) { this.checkpointLock = checkpointLock; this.outputs = outputs; + this.rowType = rowType; + if (rowType instanceof MultipleRowType) { + ((MultipleRowType) rowType) + .iterator() + .forEachRemaining( + type -> { + this.rowTypeMap.put(type.getKey(), type.getValue()); + }); + } sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT); sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS); + sourceReceivedBytes = metricsContext.counter(SOURCE_RECEIVED_BYTES); + sourceReceivedBytesPerSeconds = metricsContext.meter(SOURCE_RECEIVED_BYTES_PER_SECONDS); + if (flowControlStrategy != null) { + flowControlGate = FlowControlGate.create(flowControlStrategy); + } } @Override public void collect(T row) { try { + if (row instanceof SeaTunnelRow) { + int size; + if (rowType instanceof SeaTunnelRowType) { + size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType); + } else if (rowType instanceof MultipleRowType) { + size = + ((SeaTunnelRow) row) + .getBytesSize( + rowTypeMap.get(((SeaTunnelRow) row).getTableId())); + } else { + throw new SeaTunnelEngineException( + "Unsupported row type: " + rowType.getClass().getName()); + } + sourceReceivedBytes.inc(size); + sourceReceivedBytesPerSeconds.markEvent(size); + if (flowControlGate != null) { + flowControlGate.audit((SeaTunnelRow) row); + } + } sendRecordToNext(new Record<>(row)); emptyThisPollNext = false; sourceReceivedCount.inc(); @@ -76,6 +132,17 @@ public void collect(T row) { @Override public void collect(SchemaChangeEvent event) { try { + if (rowType instanceof SeaTunnelRowType) { + rowType = dataTypeChangeEventHandler.reset((SeaTunnelRowType) rowType).apply(event); + } else if (rowType instanceof MultipleRowType) { + String tableId = event.tablePath().toString(); + rowTypeMap.put( + tableId, + dataTypeChangeEventHandler.reset(rowTypeMap.get(tableId)).apply(event)); + } else { + throw new SeaTunnelEngineException( + "Unsupported row type: " + rowType.getClass().getName()); + } sendRecordToNext(new Record<>(event)); } catch (IOException e) { throw new RuntimeException(e); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index 8650dc7f2a6..80a0dff04e4 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -18,8 +18,10 @@ package org.apache.seatunnel.engine.server.task; import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig; import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow; @@ -34,6 +36,7 @@ import lombok.NonNull; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; public class SourceSeaTunnelTask extends SeaTunnelTask { @@ -44,15 +47,18 @@ public class SourceSeaTunnelTask extends SeaTunne private transient Object checkpointLock; @Getter private transient Serializer splitSerializer; + private final Map envOption; private final PhysicalExecutionFlow sourceFlow; public SourceSeaTunnelTask( long jobID, TaskLocation taskID, int indexID, - PhysicalExecutionFlow executionFlow) { + PhysicalExecutionFlow executionFlow, + Map envOption) { super(jobID, taskID, indexID, executionFlow); this.sourceFlow = executionFlow; + this.envOption = envOption; } @Override @@ -69,7 +75,11 @@ public void init() throws Exception { } else { this.collector = new SeaTunnelSourceCollector<>( - checkpointLock, outputs, this.getMetricsContext()); + checkpointLock, + outputs, + this.getMetricsContext(), + getFlowControlStrategy(), + sourceFlow.getAction().getSource().getProducedType()); ((SourceFlowLifeCycle) startFlowLifeCycle).setCollector(collector); } } @@ -111,4 +121,38 @@ public void triggerBarrier(Barrier barrier) throws Exception { (SourceFlowLifeCycle) startFlowLifeCycle; sourceFlow.triggerBarrier(barrier); } + + private FlowControlStrategy getFlowControlStrategy() { + FlowControlStrategy strategy; + if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key()) + && envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) { + strategy = + FlowControlStrategy.of( + Integer.parseInt( + envOption + .get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key()) + .toString()), + Integer.parseInt( + envOption + .get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key()) + .toString())); + } else if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())) { + strategy = + FlowControlStrategy.ofBytes( + Integer.parseInt( + envOption + .get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key()) + .toString())); + } else if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) { + strategy = + FlowControlStrategy.ofCount( + Integer.parseInt( + envOption + .get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key()) + .toString())); + } else { + strategy = null; + } + return strategy; + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index c51e3483c09..202e0c2e8b6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey; @@ -53,6 +54,8 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS; import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; @@ -87,6 +90,10 @@ public class SinkFlowLifeCycle record) { writer.write((T) record.getData()); sinkWriteCount.inc(); sinkWriteQPS.markEvent(); + if (record.getData() instanceof SeaTunnelRow) { + long size = ((SeaTunnelRow) record.getData()).getBytesSize(); + sinkWriteBytes.inc(size); + sinkWriteBytesPerSeconds.markEvent(size); + } } } catch (Exception e) { throw new RuntimeException(e);