Skip to content

Commit

Permalink
[hotfix-1908][chunjun-connector-starrocks]: starrocks data size check
Browse files Browse the repository at this point in the history
  • Loading branch information
kinoxyz1 committed Jul 30, 2024
1 parent 849a052 commit 48d0a88
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class ConstantValue {
public static final Integer HTTP_CHECK_TIMEOUT_DEFAULT = 10 * 1000;
public static final Integer QUEUE_OFFER_TIMEOUT_DEFAULT = 60 * 1000;
public static final Integer QUEUE_POLL_TIMEOUT_DEFAULT = 60 * 1000;
public static final Long SINK_BATCH_MAX_BYTES_DEFAULT = 2 * 1024 * 1024 * 1024L;
// 50mb, If you need to set a larger value, you need to set a larger taskmanager memory,
// otherwise OOM may occur.
public static final Long SINK_BATCH_MAX_BYTES_DEFAULT = 50 * 1024 * 1024L;
public static final Long SINK_BATCH_MAX_ROWS_DEFAULT = 2048 * 100L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.connector.starrocks.config.StarRocksConfig;
import com.dtstack.chunjun.connector.starrocks.connection.StarRocksJdbcConnectionProvider;
import com.dtstack.chunjun.connector.starrocks.options.ConstantValue;
import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksQueryVisitor;
import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksSinkBufferEntity;
import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadFailedException;
import com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager;
import com.dtstack.chunjun.constants.Metrics;
import com.dtstack.chunjun.element.ColumnRowData;
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.util.GsonUtil;
Expand Down Expand Up @@ -142,13 +144,18 @@ public synchronized void writeRecord(RowData rowData) {
checkTimerWriteException();
int size = 0;
rows.add(rowData);
if (rows.size() >= batchSize) {
if (rowData instanceof ColumnRowData) {
batchMaxByteSize += ((ColumnRowData) rowData).getByteSize();
} else {
batchMaxByteSize += rowSizeCalculator.getObjectSize(rowData);
}
if (rows.size() >= starRocksConfig.getBatchSize()
|| batchMaxByteSize >= ConstantValue.SINK_BATCH_MAX_BYTES_DEFAULT) {
writeRecordInternal();
size = batchSize;
}

updateDuration();
bytesWriteCounter.add(rowSizeCalculator.getObjectSize(rowData));
if (checkpointEnabled) {
snapshotWriteCounter.add(size);
}
Expand All @@ -160,6 +167,7 @@ protected synchronized void writeRecordInternal() {
try {
writeMultipleRecordsInternal();
numWriteCounter.add(rows.size());
bytesWriteCounter.add(batchMaxByteSize);
} catch (Exception e) {
if (e instanceof StarRocksStreamLoadFailedException) {
StarRocksStreamLoadFailedException exception =
Expand All @@ -178,6 +186,7 @@ protected synchronized void writeRecordInternal() {
} finally {
// Data is either recorded dirty data or written normally
rows.clear();
batchMaxByteSize = 0;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import java.util.stream.Collectors;

import static com.dtstack.chunjun.connector.starrocks.options.ConstantValue.SINK_BATCH_MAX_ROWS_DEFAULT;

public class StarRocksSinkFactory extends SinkFactory {

private final StarRocksConfig starRocksConfig;
Expand All @@ -47,7 +49,8 @@ public StarRocksSinkFactory(SyncConfig syncConfig) {
JsonUtil.toJson(syncConfig.getWriter().getParameter()),
StarRocksConfig.class);

int batchSize = syncConfig.getWriter().getIntVal("batchSize", 1024);
int batchSize =
(int) syncConfig.getWriter().getLongVal("batchSize", SINK_BATCH_MAX_ROWS_DEFAULT);
starRocksConfig.setBatchSize(batchSize);
super.initCommonConf(starRocksConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ private boolean tryHttpConnection(String host) {
}

private byte[] joinRows(List<byte[]> rows, int totalBytes) {
if (totalBytes < 0) {
throw new RuntimeException(
"The ByteBuffer limit has been exceeded, json assembly may fail");
}
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : 1 - rows.size()));
bos.put("[".getBytes(StandardCharsets.UTF_8));
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ public abstract class BaseRichOutputFormat extends RichOutputFormat<RowData>
/** 最新读取的数据 */
protected RowData lastRow = null;

/** 存储用于批量写入的数据 */
/** 存储用于批量写入的数据行数 */
protected transient List<RowData> rows;
/** 存储用于批量写入的数据字节数 */
protected transient long batchMaxByteSize;
/** 数据类型转换器 */
protected AbstractRowConverter rowConverter;
/** 是否需要初始化脏数据和累加器,目前只有hive插件该参数设置为false */
Expand Down

0 comments on commit 48d0a88

Please sign in to comment.