Skip to content

Commit

Permalink
[Feature] Support flow control in zeta (#5502)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Sep 16, 2023
1 parent 9d7fa11 commit 0ce1712
Show file tree
Hide file tree
Showing 14 changed files with 667 additions and 7 deletions.
42 changes: 42 additions & 0 deletions docs/en/concept/speed-limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Speed Control

## Introduction

The SeaTunnel provides a powerful speed control feature that allows you to manage the rate at which data is synchronized.
This functionality is essential when you need to ensure efficient and controlled data transfer between systems.
The speed control is primarily governed by two key parameters: `read_limit.rows_per_second` and `read_limit.bytes_per_second`.
This document will guide you through the usage of these parameters and how to leverage them effectively.

## Support Those Engines

> SeaTunnel Zeta<br/>
## Configuration

To use the speed control feature, you need to configure the `read_limit.rows_per_second` or `read_limit.bytes_per_second` parameters in your job config.

Example env config in your config file:

```hocon
env {
job.mode=STREAMING
job.name=SeaTunnel_Job
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}
source {
MySQL-CDC {
// ignore...
}
}
transform {
}
sink {
Console {
}
}
```

We have placed `read_limit.bytes_per_second` and `read_limit.rows_per_second` in the `env` parameters, completing the speed control configuration.
You can configure both of these parameters simultaneously or choose to configure only one of them. The value of each `value` represents the maximum rate at which each thread is restricted.
Therefore, when configuring the respective values, please take into account the parallelism of your tasks.
3 changes: 2 additions & 1 deletion docs/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ const sidebars = {
"concept/config",
"concept/connector-v2-features",
'concept/schema-feature',
'concept/JobEnvConfig'
'concept/JobEnvConfig',
'concept/speed-limit'
]
},
"Connector-v2-release-state",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ private MetricNames() {}
public static final String RECEIVED_BATCHES = "receivedBatches";

public static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";

public static final String SOURCE_RECEIVED_BYTES = "SourceReceivedBytes";
public static final String SOURCE_RECEIVED_QPS = "SourceReceivedQPS";
public static final String SOURCE_RECEIVED_BYTES_PER_SECONDS = "SourceReceivedBytesPerSeconds";
public static final String SINK_WRITE_COUNT = "SinkWriteCount";
public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
public static final String SINK_WRITE_QPS = "SinkWriteQPS";
public static final String SINK_WRITE_BYTES_PER_SECONDS = "SinkWriteBytesPerSeconds";
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ public interface EnvCommonOptions {
.withDescription(
"The interval (in milliseconds) between two consecutive checkpoints.");

Option<Integer> READ_LIMIT_ROW_PER_SECOND =
Options.key("read_limit.rows_per_second")
.intType()
.noDefaultValue()
.withDescription(
"The each parallelism row limit per second for read data from source.");

Option<Integer> READ_LIMIT_BYTES_PER_SECOND =
Options.key("read_limit.bytes_per_second")
.intType()
.noDefaultValue()
.withDescription(
"The each parallelism bytes limit per second for read data from source.");
Option<Long> CHECKPOINT_TIMEOUT =
Options.key("checkpoint.timeout")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public static OptionRule getEnvOptionRules() {
EnvCommonOptions.JARS,
EnvCommonOptions.CHECKPOINT_INTERVAL,
EnvCommonOptions.CHECKPOINT_TIMEOUT,
EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
EnvCommonOptions.CUSTOM_PARAMETERS)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

/** SeaTunnel row type. */
Expand All @@ -33,6 +34,8 @@ public final class SeaTunnelRow implements Serializable {
/** The array to store the actual internal format values. */
private final Object[] fields;

private volatile int size;

public SeaTunnelRow(int arity) {
this.fields = new Object[arity];
}
Expand Down Expand Up @@ -97,6 +100,180 @@ public boolean isNullAt(int pos) {
return this.fields[pos] == null;
}

public int getBytesSize(SeaTunnelRowType rowType) {
if (size == 0) {
int s = 0;
for (int i = 0; i < fields.length; i++) {
s += getBytesForValue(fields[i], rowType.getFieldType(i));
}
size = s;
}
return size;
}

/** faster version of {@link #getBytesSize(SeaTunnelRowType)}. */
private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
if (v == null) {
return 0;
}
SqlType sqlType = dataType.getSqlType();
switch (sqlType) {
case STRING:
return ((String) v).length();
case BOOLEAN:
case TINYINT:
return 1;
case SMALLINT:
return 2;
case INT:
case FLOAT:
return 4;
case BIGINT:
case DOUBLE:
return 8;
case DECIMAL:
return 36;
case NULL:
return 0;
case BYTES:
return ((byte[]) v).length;
case DATE:
return 24;
case TIME:
return 12;
case TIMESTAMP:
return 48;
case ARRAY:
return getBytesForArray(v, ((ArrayType) dataType).getElementType());
case MAP:
int size = 0;
MapType<?, ?> mapType = ((MapType<?, ?>) dataType);
for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {
size +=
getBytesForValue(entry.getKey(), mapType.getKeyType())
+ getBytesForValue(entry.getValue(), mapType.getValueType());
}
return size;
case ROW:
int rowSize = 0;
SeaTunnelRowType rowType = ((SeaTunnelRowType) dataType);
SeaTunnelDataType<?>[] types = rowType.getFieldTypes();
SeaTunnelRow row = (SeaTunnelRow) v;
for (int i = 0; i < types.length; i++) {
rowSize += getBytesForValue(row.fields[i], types[i]);
}
return rowSize;
default:
throw new UnsupportedOperationException("Unsupported type: " + sqlType);
}
}

private int getBytesForArray(Object v, BasicType<?> dataType) {
switch (dataType.getSqlType()) {
case STRING:
int s = 0;
for (String i : ((String[]) v)) {
s += i.length();
}
return s;
case BOOLEAN:
return ((Boolean[]) v).length;
case TINYINT:
return ((Byte[]) v).length;
case SMALLINT:
return ((Short[]) v).length * 2;
case INT:
return ((Integer[]) v).length * 4;
case FLOAT:
return ((Float[]) v).length * 4;
case BIGINT:
return ((Long[]) v).length * 8;
case DOUBLE:
return ((Double[]) v).length * 8;
case NULL:
default:
return 0;
}
}

public int getBytesSize() {
if (size == 0) {
int s = 0;
for (Object field : fields) {
s += getBytesForValue(field);
}
size = s;
}
return size;
}

private int getBytesForValue(Object v) {
if (v == null) {
return 0;
}
String clazz = v.getClass().getSimpleName();
switch (clazz) {
case "String":
return ((String) v).length();
case "Boolean":
case "Byte":
return 1;
case "Short":
return 2;
case "Integer":
case "Float":
return 4;
case "Long":
case "Double":
return 8;
case "BigDecimal":
return 36;
case "byte[]":
return ((byte[]) v).length;
case "LocalDate":
return 24;
case "LocalTime":
return 12;
case "LocalDateTime":
return 48;
case "String[]":
int s = 0;
for (String i : ((String[]) v)) {
s += i.length();
}
return s;
case "Boolean[]":
return ((Boolean[]) v).length;
case "Byte[]":
return ((Byte[]) v).length;
case "Short[]":
return ((Short[]) v).length * 2;
case "Integer[]":
return ((Integer[]) v).length * 4;
case "Long[]":
return ((Long[]) v).length * 8;
case "Float[]":
return ((Float[]) v).length * 4;
case "Double[]":
return ((Double[]) v).length * 8;
case "HashMap":
int size = 0;
for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {
size += getBytesForValue(entry.getKey()) + getBytesForValue(entry.getValue());
}
return size;
case "SeaTunnelRow":
int rowSize = 0;
SeaTunnelRow row = (SeaTunnelRow) v;
for (int i = 0; i < row.fields.length; i++) {
rowSize += getBytesForValue(row.fields[i]);
}
return rowSize;
default:
throw new UnsupportedOperationException("Unsupported type: " + clazz);
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -106,7 +283,7 @@ public boolean equals(Object o) {
return false;
}
SeaTunnelRow that = (SeaTunnelRow) o;
return tableId == that.tableId
return Objects.equals(tableId, that.tableId)
&& kind == that.kind
&& Arrays.deepEquals(fields, that.fields);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.type;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;

public class SeaTunnelRowTest {

@Test
void testForRowSize() {
Map<String, Object> map = new HashMap<>();
map.put(
"key1",
new SeaTunnelRow(
new Object[] {
1, "test", 1L, new BigDecimal("3333.333"),
}));
map.put(
"key2",
new SeaTunnelRow(
new Object[] {
1, "test", 1L, new BigDecimal("3333.333"),
}));
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
1,
"test",
1L,
map,
new BigDecimal("3333.333"),
new String[] {"test2", "test", "3333.333"}
});

SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"f0", "f1", "f2", "f3", "f4", "f5"},
new SeaTunnelDataType<?>[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.LONG_TYPE,
new MapType<>(
BasicType.STRING_TYPE,
new SeaTunnelRowType(
new String[] {"f0", "f1", "f2", "f3"},
new SeaTunnelDataType<?>[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.LONG_TYPE,
new DecimalType(10, 3)
})),
new DecimalType(10, 3),
ArrayType.STRING_ARRAY_TYPE
});

Assertions.assertEquals(181, row.getBytesSize(rowType));

SeaTunnelRow row2 =
new SeaTunnelRow(
new Object[] {
1,
"test",
1L,
map,
new BigDecimal("3333.333"),
new String[] {"test2", "test", "3333.333"}
});
Assertions.assertEquals(181, row2.getBytesSize());
}
}
Loading

0 comments on commit 0ce1712

Please sign in to comment.