diff --git a/docs/en/concept/speed-limit.md b/docs/en/concept/speed-limit.md
new file mode 100644
index 00000000000..0bb451b38b2
--- /dev/null
+++ b/docs/en/concept/speed-limit.md
@@ -0,0 +1,42 @@
+# Speed Control
+
+## Introduction
+
+The SeaTunnel provides a powerful speed control feature that allows you to manage the rate at which data is synchronized.
+This functionality is essential when you need to ensure efficient and controlled data transfer between systems.
+The speed control is primarily governed by two key parameters: `read_limit.rows_per_second` and `read_limit.bytes_per_second`.
+This document will guide you through the usage of these parameters and how to leverage them effectively.
+
+## Support Those Engines
+
+> SeaTunnel Zeta
+
+## Configuration
+
+To use the speed control feature, you need to configure the `read_limit.rows_per_second` or `read_limit.bytes_per_second` parameters in your job config.
+
+Example env config in your config file:
+
+```hocon
+env {
+ job.mode=STREAMING
+ job.name=SeaTunnel_Job
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+source {
+ MySQL-CDC {
+ // ignore...
+ }
+}
+transform {
+}
+sink {
+ Console {
+ }
+}
+```
+
+We have placed `read_limit.bytes_per_second` and `read_limit.rows_per_second` in the `env` parameters, completing the speed control configuration.
+You can configure both of these parameters simultaneously or choose to configure only one of them. The value of each `value` represents the maximum rate at which each thread is restricted.
+Therefore, when configuring the respective values, please take into account the parallelism of your tasks.
diff --git a/docs/sidebars.js b/docs/sidebars.js
index a8f2527413b..764cb34f57a 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -89,7 +89,8 @@ const sidebars = {
"concept/config",
"concept/connector-v2-features",
'concept/schema-feature',
- 'concept/JobEnvConfig'
+ 'concept/JobEnvConfig',
+ 'concept/speed-limit'
]
},
"Connector-v2-release-state",
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
index a3511d92b42..b1fc60e0f16 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
@@ -26,8 +26,11 @@ private MetricNames() {}
public static final String RECEIVED_BATCHES = "receivedBatches";
public static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
-
+ public static final String SOURCE_RECEIVED_BYTES = "SourceReceivedBytes";
public static final String SOURCE_RECEIVED_QPS = "SourceReceivedQPS";
+ public static final String SOURCE_RECEIVED_BYTES_PER_SECONDS = "SourceReceivedBytesPerSeconds";
public static final String SINK_WRITE_COUNT = "SinkWriteCount";
+ public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
public static final String SINK_WRITE_QPS = "SinkWriteQPS";
+ public static final String SINK_WRITE_BYTES_PER_SECONDS = "SinkWriteBytesPerSeconds";
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index d076cd5367b..0c010bfb846 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -51,6 +51,19 @@ public interface EnvCommonOptions {
.withDescription(
"The interval (in milliseconds) between two consecutive checkpoints.");
+ Option READ_LIMIT_ROW_PER_SECOND =
+ Options.key("read_limit.rows_per_second")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The each parallelism row limit per second for read data from source.");
+
+ Option READ_LIMIT_BYTES_PER_SECOND =
+ Options.key("read_limit.bytes_per_second")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The each parallelism bytes limit per second for read data from source.");
Option CHECKPOINT_TIMEOUT =
Options.key("checkpoint.timeout")
.longType()
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index 09310f080c5..d4caa710d89 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -31,6 +31,8 @@ public static OptionRule getEnvOptionRules() {
EnvCommonOptions.JARS,
EnvCommonOptions.CHECKPOINT_INTERVAL,
EnvCommonOptions.CHECKPOINT_TIMEOUT,
+ EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
+ EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
EnvCommonOptions.CUSTOM_PARAMETERS)
.build();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 1966d3142c5..bd05e0808d8 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Arrays;
+import java.util.Map;
import java.util.Objects;
/** SeaTunnel row type. */
@@ -33,6 +34,8 @@ public final class SeaTunnelRow implements Serializable {
/** The array to store the actual internal format values. */
private final Object[] fields;
+ private volatile int size;
+
public SeaTunnelRow(int arity) {
this.fields = new Object[arity];
}
@@ -97,6 +100,180 @@ public boolean isNullAt(int pos) {
return this.fields[pos] == null;
}
+ public int getBytesSize(SeaTunnelRowType rowType) {
+ if (size == 0) {
+ int s = 0;
+ for (int i = 0; i < fields.length; i++) {
+ s += getBytesForValue(fields[i], rowType.getFieldType(i));
+ }
+ size = s;
+ }
+ return size;
+ }
+
+ /** faster version of {@link #getBytesSize(SeaTunnelRowType)}. */
+ private int getBytesForValue(Object v, SeaTunnelDataType> dataType) {
+ if (v == null) {
+ return 0;
+ }
+ SqlType sqlType = dataType.getSqlType();
+ switch (sqlType) {
+ case STRING:
+ return ((String) v).length();
+ case BOOLEAN:
+ case TINYINT:
+ return 1;
+ case SMALLINT:
+ return 2;
+ case INT:
+ case FLOAT:
+ return 4;
+ case BIGINT:
+ case DOUBLE:
+ return 8;
+ case DECIMAL:
+ return 36;
+ case NULL:
+ return 0;
+ case BYTES:
+ return ((byte[]) v).length;
+ case DATE:
+ return 24;
+ case TIME:
+ return 12;
+ case TIMESTAMP:
+ return 48;
+ case ARRAY:
+ return getBytesForArray(v, ((ArrayType) dataType).getElementType());
+ case MAP:
+ int size = 0;
+ MapType, ?> mapType = ((MapType, ?>) dataType);
+ for (Map.Entry, ?> entry : ((Map, ?>) v).entrySet()) {
+ size +=
+ getBytesForValue(entry.getKey(), mapType.getKeyType())
+ + getBytesForValue(entry.getValue(), mapType.getValueType());
+ }
+ return size;
+ case ROW:
+ int rowSize = 0;
+ SeaTunnelRowType rowType = ((SeaTunnelRowType) dataType);
+ SeaTunnelDataType>[] types = rowType.getFieldTypes();
+ SeaTunnelRow row = (SeaTunnelRow) v;
+ for (int i = 0; i < types.length; i++) {
+ rowSize += getBytesForValue(row.fields[i], types[i]);
+ }
+ return rowSize;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + sqlType);
+ }
+ }
+
+ private int getBytesForArray(Object v, BasicType> dataType) {
+ switch (dataType.getSqlType()) {
+ case STRING:
+ int s = 0;
+ for (String i : ((String[]) v)) {
+ s += i.length();
+ }
+ return s;
+ case BOOLEAN:
+ return ((Boolean[]) v).length;
+ case TINYINT:
+ return ((Byte[]) v).length;
+ case SMALLINT:
+ return ((Short[]) v).length * 2;
+ case INT:
+ return ((Integer[]) v).length * 4;
+ case FLOAT:
+ return ((Float[]) v).length * 4;
+ case BIGINT:
+ return ((Long[]) v).length * 8;
+ case DOUBLE:
+ return ((Double[]) v).length * 8;
+ case NULL:
+ default:
+ return 0;
+ }
+ }
+
+ public int getBytesSize() {
+ if (size == 0) {
+ int s = 0;
+ for (Object field : fields) {
+ s += getBytesForValue(field);
+ }
+ size = s;
+ }
+ return size;
+ }
+
+ private int getBytesForValue(Object v) {
+ if (v == null) {
+ return 0;
+ }
+ String clazz = v.getClass().getSimpleName();
+ switch (clazz) {
+ case "String":
+ return ((String) v).length();
+ case "Boolean":
+ case "Byte":
+ return 1;
+ case "Short":
+ return 2;
+ case "Integer":
+ case "Float":
+ return 4;
+ case "Long":
+ case "Double":
+ return 8;
+ case "BigDecimal":
+ return 36;
+ case "byte[]":
+ return ((byte[]) v).length;
+ case "LocalDate":
+ return 24;
+ case "LocalTime":
+ return 12;
+ case "LocalDateTime":
+ return 48;
+ case "String[]":
+ int s = 0;
+ for (String i : ((String[]) v)) {
+ s += i.length();
+ }
+ return s;
+ case "Boolean[]":
+ return ((Boolean[]) v).length;
+ case "Byte[]":
+ return ((Byte[]) v).length;
+ case "Short[]":
+ return ((Short[]) v).length * 2;
+ case "Integer[]":
+ return ((Integer[]) v).length * 4;
+ case "Long[]":
+ return ((Long[]) v).length * 8;
+ case "Float[]":
+ return ((Float[]) v).length * 4;
+ case "Double[]":
+ return ((Double[]) v).length * 8;
+ case "HashMap":
+ int size = 0;
+ for (Map.Entry, ?> entry : ((Map, ?>) v).entrySet()) {
+ size += getBytesForValue(entry.getKey()) + getBytesForValue(entry.getValue());
+ }
+ return size;
+ case "SeaTunnelRow":
+ int rowSize = 0;
+ SeaTunnelRow row = (SeaTunnelRow) v;
+ for (int i = 0; i < row.fields.length; i++) {
+ rowSize += getBytesForValue(row.fields[i]);
+ }
+ return rowSize;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + clazz);
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -106,7 +283,7 @@ public boolean equals(Object o) {
return false;
}
SeaTunnelRow that = (SeaTunnelRow) o;
- return tableId == that.tableId
+ return Objects.equals(tableId, that.tableId)
&& kind == that.kind
&& Arrays.deepEquals(fields, that.fields);
}
diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java
new file mode 100644
index 00000000000..eaad7f95767
--- /dev/null
+++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SeaTunnelRowTest {
+
+ @Test
+ void testForRowSize() {
+ Map map = new HashMap<>();
+ map.put(
+ "key1",
+ new SeaTunnelRow(
+ new Object[] {
+ 1, "test", 1L, new BigDecimal("3333.333"),
+ }));
+ map.put(
+ "key2",
+ new SeaTunnelRow(
+ new Object[] {
+ 1, "test", 1L, new BigDecimal("3333.333"),
+ }));
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ "test",
+ 1L,
+ map,
+ new BigDecimal("3333.333"),
+ new String[] {"test2", "test", "3333.333"}
+ });
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"f0", "f1", "f2", "f3", "f4", "f5"},
+ new SeaTunnelDataType>[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.LONG_TYPE,
+ new MapType<>(
+ BasicType.STRING_TYPE,
+ new SeaTunnelRowType(
+ new String[] {"f0", "f1", "f2", "f3"},
+ new SeaTunnelDataType>[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.LONG_TYPE,
+ new DecimalType(10, 3)
+ })),
+ new DecimalType(10, 3),
+ ArrayType.STRING_ARRAY_TYPE
+ });
+
+ Assertions.assertEquals(181, row.getBytesSize(rowType));
+
+ SeaTunnelRow row2 =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ "test",
+ 1L,
+ map,
+ new BigDecimal("3333.333"),
+ new String[] {"test2", "test", "3333.333"}
+ });
+ Assertions.assertEquals(181, row2.getBytesSize());
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
new file mode 100644
index 00000000000..10b23013290
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flowcontrol;
+
+import org.apache.seatunnel.shade.com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public class FlowControlGate {
+
+ private final RateLimiter bytesRateLimiter;
+ private final RateLimiter countRateLimiter;
+
+ private FlowControlGate(FlowControlStrategy flowControlStrategy) {
+ this.bytesRateLimiter = RateLimiter.create(flowControlStrategy.getBytesPerSecond());
+ this.countRateLimiter = RateLimiter.create(flowControlStrategy.getCountPreSecond());
+ }
+
+ public void audit(SeaTunnelRow row) {
+ bytesRateLimiter.acquire(row.getBytesSize());
+ countRateLimiter.acquire();
+ }
+
+ public static FlowControlGate create(FlowControlStrategy flowControlStrategy) {
+ return new FlowControlGate(flowControlStrategy);
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
new file mode 100644
index 00000000000..2547d7061e2
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flowcontrol;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class FlowControlStrategy {
+
+ int bytesPerSecond;
+ int countPreSecond;
+
+ public FlowControlStrategy(int bytesPerSecond, int countPreSecond) {
+ if (bytesPerSecond <= 0 || countPreSecond <= 0) {
+ throw new IllegalArgumentException(
+ "bytesPerSecond and countPreSecond must be positive");
+ }
+ this.bytesPerSecond = bytesPerSecond;
+ this.countPreSecond = countPreSecond;
+ }
+
+ public static FlowControlStrategy of(int bytesPerSecond, int countPreSecond) {
+ return new FlowControlStrategy(bytesPerSecond, countPreSecond);
+ }
+
+ public static FlowControlStrategy ofBytes(int bytesPerSecond) {
+ return new FlowControlStrategy(bytesPerSecond, Integer.MAX_VALUE);
+ }
+
+ public static FlowControlStrategy ofCount(int countPreSecond) {
+ return new FlowControlStrategy(Integer.MAX_VALUE, countPreSecond);
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java
new file mode 100644
index 00000000000..1bcb695bf52
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flowcontrol;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlowControlGateTest {
+
+ private static final int rowSize = 181;
+
+ @Test
+ public void testWithBytes() {
+ Clock clock = Clock.systemDefaultZone();
+ FlowControlGate flowControlGate = FlowControlGate.create(FlowControlStrategy.ofBytes(100));
+ List rows = getRows(10);
+ long start = clock.millis();
+ for (SeaTunnelRow row : rows) {
+ flowControlGate.audit(row);
+ }
+ long end = clock.millis();
+ long useTime = rowSize * 10 / 100 * 1000;
+
+ Assertions.assertTrue(end - start > useTime * 0.8 && end - start < useTime * 1.2);
+ }
+
+ @Test
+ public void testWithCount() {
+ Clock clock = Clock.systemDefaultZone();
+ FlowControlGate flowControlGate = FlowControlGate.create(FlowControlStrategy.ofCount(2));
+ List rows = getRows(10);
+ long start = clock.millis();
+ for (SeaTunnelRow row : rows) {
+ flowControlGate.audit(row);
+ }
+ long end = clock.millis();
+ long useTime = 10 / 2 * 1000;
+
+ Assertions.assertTrue(end - start > useTime * 0.8 && end - start < useTime * 1.2);
+ }
+
+ @Test
+ public void testWithBytesAndCount() {
+ Clock clock = Clock.systemDefaultZone();
+ FlowControlGate flowControlGate = FlowControlGate.create(FlowControlStrategy.of(100, 2));
+ List rows = getRows(10);
+ long start = clock.millis();
+ for (SeaTunnelRow row : rows) {
+ flowControlGate.audit(row);
+ }
+ long end = clock.millis();
+ long useTime = rowSize * 10 / 100 * 1000;
+
+ Assertions.assertTrue(end - start > useTime * 0.8 && end - start < useTime * 1.2);
+ }
+
+ /** return row list with size, each row size is 181 */
+ private List getRows(int size) {
+ Map map = new HashMap<>();
+ map.put(
+ "key1",
+ new SeaTunnelRow(
+ new Object[] {
+ 1, "test", 1L, new BigDecimal("3333.333"),
+ }));
+ map.put(
+ "key2",
+ new SeaTunnelRow(
+ new Object[] {
+ 1, "test", 1L, new BigDecimal("3333.333"),
+ }));
+
+ List rows = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ rows.add(
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ "test",
+ 1L,
+ map,
+ new BigDecimal("3333.333"),
+ new String[] {"test2", "test", "3333.333"}
+ }));
+ }
+ return rows;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index c6173cf0647..5d0f571d497 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -555,7 +555,10 @@ private List getSourceTask(
(PhysicalExecutionFlow<
SourceAction,
SourceConfig>)
- f);
+ f,
+ jobImmutableInformation
+ .getJobConfig()
+ .getEnvOptions());
} else {
return new TransformSeaTunnelTask(
jobImmutableInformation
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 0514d83c86d..6e44089e8fc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -22,15 +22,28 @@
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
+import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
+import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
@@ -48,22 +61,65 @@ public class SeaTunnelSourceCollector implements Collector {
private final Counter sourceReceivedCount;
private final Meter sourceReceivedQPS;
+ private final Counter sourceReceivedBytes;
+
+ private final Meter sourceReceivedBytesPerSeconds;
private volatile boolean emptyThisPollNext;
+ private final DataTypeChangeEventHandler dataTypeChangeEventHandler =
+ new DataTypeChangeEventDispatcher();
+ private Map rowTypeMap = new HashMap<>();
+ private SeaTunnelDataType rowType;
+ private FlowControlGate flowControlGate;
public SeaTunnelSourceCollector(
Object checkpointLock,
List>> outputs,
- MetricsContext metricsContext) {
+ MetricsContext metricsContext,
+ FlowControlStrategy flowControlStrategy,
+ SeaTunnelDataType rowType) {
this.checkpointLock = checkpointLock;
this.outputs = outputs;
+ this.rowType = rowType;
+ if (rowType instanceof MultipleRowType) {
+ ((MultipleRowType) rowType)
+ .iterator()
+ .forEachRemaining(
+ type -> {
+ this.rowTypeMap.put(type.getKey(), type.getValue());
+ });
+ }
sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
+ sourceReceivedBytes = metricsContext.counter(SOURCE_RECEIVED_BYTES);
+ sourceReceivedBytesPerSeconds = metricsContext.meter(SOURCE_RECEIVED_BYTES_PER_SECONDS);
+ if (flowControlStrategy != null) {
+ flowControlGate = FlowControlGate.create(flowControlStrategy);
+ }
}
@Override
public void collect(T row) {
try {
+ if (row instanceof SeaTunnelRow) {
+ int size;
+ if (rowType instanceof SeaTunnelRowType) {
+ size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType);
+ } else if (rowType instanceof MultipleRowType) {
+ size =
+ ((SeaTunnelRow) row)
+ .getBytesSize(
+ rowTypeMap.get(((SeaTunnelRow) row).getTableId()));
+ } else {
+ throw new SeaTunnelEngineException(
+ "Unsupported row type: " + rowType.getClass().getName());
+ }
+ sourceReceivedBytes.inc(size);
+ sourceReceivedBytesPerSeconds.markEvent(size);
+ if (flowControlGate != null) {
+ flowControlGate.audit((SeaTunnelRow) row);
+ }
+ }
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
sourceReceivedCount.inc();
@@ -76,6 +132,17 @@ public void collect(T row) {
@Override
public void collect(SchemaChangeEvent event) {
try {
+ if (rowType instanceof SeaTunnelRowType) {
+ rowType = dataTypeChangeEventHandler.reset((SeaTunnelRowType) rowType).apply(event);
+ } else if (rowType instanceof MultipleRowType) {
+ String tableId = event.tablePath().toString();
+ rowTypeMap.put(
+ tableId,
+ dataTypeChangeEventHandler.reset(rowTypeMap.get(tableId)).apply(event));
+ } else {
+ throw new SeaTunnelEngineException(
+ "Unsupported row type: " + rowType.getClass().getName());
+ }
sendRecordToNext(new Record<>(event));
} catch (IOException e) {
throw new RuntimeException(e);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 8650dc7f2a6..80a0dff04e4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -18,8 +18,10 @@
package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
@@ -34,6 +36,7 @@
import lombok.NonNull;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class SourceSeaTunnelTask extends SeaTunnelTask {
@@ -44,15 +47,18 @@ public class SourceSeaTunnelTask extends SeaTunne
private transient Object checkpointLock;
@Getter private transient Serializer splitSerializer;
+ private final Map envOption;
private final PhysicalExecutionFlow sourceFlow;
public SourceSeaTunnelTask(
long jobID,
TaskLocation taskID,
int indexID,
- PhysicalExecutionFlow executionFlow) {
+ PhysicalExecutionFlow executionFlow,
+ Map envOption) {
super(jobID, taskID, indexID, executionFlow);
this.sourceFlow = executionFlow;
+ this.envOption = envOption;
}
@Override
@@ -69,7 +75,11 @@ public void init() throws Exception {
} else {
this.collector =
new SeaTunnelSourceCollector<>(
- checkpointLock, outputs, this.getMetricsContext());
+ checkpointLock,
+ outputs,
+ this.getMetricsContext(),
+ getFlowControlStrategy(),
+ sourceFlow.getAction().getSource().getProducedType());
((SourceFlowLifeCycle) startFlowLifeCycle).setCollector(collector);
}
}
@@ -111,4 +121,38 @@ public void triggerBarrier(Barrier barrier) throws Exception {
(SourceFlowLifeCycle) startFlowLifeCycle;
sourceFlow.triggerBarrier(barrier);
}
+
+ private FlowControlStrategy getFlowControlStrategy() {
+ FlowControlStrategy strategy;
+ if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
+ && envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
+ strategy =
+ FlowControlStrategy.of(
+ Integer.parseInt(
+ envOption
+ .get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
+ .toString()),
+ Integer.parseInt(
+ envOption
+ .get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
+ .toString()));
+ } else if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())) {
+ strategy =
+ FlowControlStrategy.ofBytes(
+ Integer.parseInt(
+ envOption
+ .get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
+ .toString()));
+ } else if (envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
+ strategy =
+ FlowControlStrategy.ofCount(
+ Integer.parseInt(
+ envOption
+ .get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
+ .toString()));
+ } else {
+ strategy = null;
+ }
+ return strategy;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index c51e3483c09..202e0c2e8b6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -25,6 +25,7 @@
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -53,6 +54,8 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
@@ -87,6 +90,10 @@ public class SinkFlowLifeCycle record) {
writer.write((T) record.getData());
sinkWriteCount.inc();
sinkWriteQPS.markEvent();
+ if (record.getData() instanceof SeaTunnelRow) {
+ long size = ((SeaTunnelRow) record.getData()).getBytesSize();
+ sinkWriteBytes.inc(size);
+ sinkWriteBytesPerSeconds.markEvent(size);
+ }
}
} catch (Exception e) {
throw new RuntimeException(e);