diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/options/ConstantValue.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/options/ConstantValue.java index f22ba611fc..2d252227e0 100644 --- a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/options/ConstantValue.java +++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/options/ConstantValue.java @@ -36,6 +36,8 @@ public class ConstantValue { public static final Integer HTTP_CHECK_TIMEOUT_DEFAULT = 10 * 1000; public static final Integer QUEUE_OFFER_TIMEOUT_DEFAULT = 60 * 1000; public static final Integer QUEUE_POLL_TIMEOUT_DEFAULT = 60 * 1000; - public static final Long SINK_BATCH_MAX_BYTES_DEFAULT = 2 * 1024 * 1024 * 1024L; + // 50mb, If you need to set a larger value, you need to set a larger taskmanager memory, + // otherwise OOM may occur. + public static final Long SINK_BATCH_MAX_BYTES_DEFAULT = 50 * 1024 * 1024L; public static final Long SINK_BATCH_MAX_ROWS_DEFAULT = 2048 * 100L; } diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksOutputFormat.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksOutputFormat.java index d999b0f1e1..69f304874d 100644 --- a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksOutputFormat.java @@ -21,11 +21,13 @@ import com.dtstack.chunjun.config.FieldConfig; import com.dtstack.chunjun.connector.starrocks.config.StarRocksConfig; import com.dtstack.chunjun.connector.starrocks.connection.StarRocksJdbcConnectionProvider; +import com.dtstack.chunjun.connector.starrocks.options.ConstantValue; import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksQueryVisitor; import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksSinkBufferEntity; import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadFailedException; import com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager; import com.dtstack.chunjun.constants.Metrics; +import com.dtstack.chunjun.element.ColumnRowData; import com.dtstack.chunjun.sink.format.BaseRichOutputFormat; import com.dtstack.chunjun.throwable.ChunJunRuntimeException; import com.dtstack.chunjun.util.GsonUtil; @@ -142,13 +144,18 @@ public synchronized void writeRecord(RowData rowData) { checkTimerWriteException(); int size = 0; rows.add(rowData); - if (rows.size() >= batchSize) { + if (rowData instanceof ColumnRowData) { + batchMaxByteSize += ((ColumnRowData) rowData).getByteSize(); + } else { + batchMaxByteSize += rowSizeCalculator.getObjectSize(rowData); + } + if (rows.size() >= starRocksConfig.getBatchSize() + || batchMaxByteSize >= ConstantValue.SINK_BATCH_MAX_BYTES_DEFAULT) { writeRecordInternal(); size = batchSize; } updateDuration(); - bytesWriteCounter.add(rowSizeCalculator.getObjectSize(rowData)); if (checkpointEnabled) { snapshotWriteCounter.add(size); } @@ -160,6 +167,7 @@ protected synchronized void writeRecordInternal() { try { writeMultipleRecordsInternal(); numWriteCounter.add(rows.size()); + bytesWriteCounter.add(batchMaxByteSize); } catch (Exception e) { if (e instanceof StarRocksStreamLoadFailedException) { StarRocksStreamLoadFailedException exception = @@ -178,6 +186,7 @@ protected synchronized void writeRecordInternal() { } finally { // Data is either recorded dirty data or written normally rows.clear(); + batchMaxByteSize = 0; } } } diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksSinkFactory.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksSinkFactory.java index 2416c1228c..1dafbbe490 100644 --- a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksSinkFactory.java @@ -36,6 +36,8 @@ import java.util.stream.Collectors; +import static com.dtstack.chunjun.connector.starrocks.options.ConstantValue.SINK_BATCH_MAX_ROWS_DEFAULT; + public class StarRocksSinkFactory extends SinkFactory { private final StarRocksConfig starRocksConfig; @@ -47,7 +49,8 @@ public StarRocksSinkFactory(SyncConfig syncConfig) { JsonUtil.toJson(syncConfig.getWriter().getParameter()), StarRocksConfig.class); - int batchSize = syncConfig.getWriter().getIntVal("batchSize", 1024); + int batchSize = + (int) syncConfig.getWriter().getLongVal("batchSize", SINK_BATCH_MAX_ROWS_DEFAULT); starRocksConfig.setBatchSize(batchSize); super.initCommonConf(starRocksConfig); } diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/streamload/StarRocksStreamLoadVisitor.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/streamload/StarRocksStreamLoadVisitor.java index ad1eb285a5..1936c07264 100644 --- a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/streamload/StarRocksStreamLoadVisitor.java +++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/streamload/StarRocksStreamLoadVisitor.java @@ -250,6 +250,10 @@ private boolean tryHttpConnection(String host) { } private byte[] joinRows(List rows, int totalBytes) { + if (totalBytes < 0) { + throw new RuntimeException( + "The ByteBuffer limit has been exceeded, json assembly may fail"); + } ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : 1 - rows.size())); bos.put("[".getBytes(StandardCharsets.UTF_8)); byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseRichOutputFormat.java b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseRichOutputFormat.java index d73a78f1d4..8975ce7ebb 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseRichOutputFormat.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseRichOutputFormat.java @@ -132,8 +132,10 @@ public abstract class BaseRichOutputFormat extends RichOutputFormat /** 最新读取的数据 */ protected RowData lastRow = null; - /** 存储用于批量写入的数据 */ + /** 存储用于批量写入的数据行数 */ protected transient List rows; + /** 存储用于批量写入的数据字节数 */ + protected transient long batchMaxByteSize; /** 数据类型转换器 */ protected AbstractRowConverter rowConverter; /** 是否需要初始化脏数据和累加器,目前只有hive插件该参数设置为false */