Skip to content

Commit

Permalink
[Feature-#1784][connector][starrocks] Support running pre-execution a…
Browse files Browse the repository at this point in the history
…nd post-execution sql (#1785)
  • Loading branch information
libailin authored Aug 9, 2023
1 parent df8d8bb commit 74c1591
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,7 @@ public class StarRocksConfig extends CommonConfig {
private long beFetchMaxBytes = 1024 * 1024 * 1024;

private Map<String, String> beSocketProperties = new HashMap<>();

protected List<String> preSql;
protected List<String> postSql;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,16 @@ public class StreamLoadOptions {
.mapType()
.defaultValue(new HashMap<>())
.withDescription("");

public static final ConfigOption<String> SINK_PRE_SQL =
ConfigOptions.key("sink.pre-sql")
.stringType()
.noDefaultValue()
.withDescription("sink.pre-sql");

public static final ConfigOption<String> SINK_POST_SQL =
ConfigOptions.key("sink.post-sql")
.stringType()
.noDefaultValue()
.withDescription("sink.post-sql");
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,30 @@

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.connector.starrocks.config.StarRocksConfig;
import com.dtstack.chunjun.connector.starrocks.connection.StarRocksJdbcConnectionProvider;
import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksQueryVisitor;
import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksSinkBufferEntity;
import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadFailedException;
import com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager;
import com.dtstack.chunjun.constants.Metrics;
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.util.GsonUtil;

import org.apache.flink.table.data.RowData;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
public class StarRocksOutputFormat extends BaseRichOutputFormat {

private static final long serialVersionUID = -72510119599895395L;
Expand All @@ -43,6 +52,53 @@ public class StarRocksOutputFormat extends BaseRichOutputFormat {
private StarRocksConfig starRocksConfig;
private StarRocksWriteProcessor writeProcessor;

@Override
public void initializeGlobal(int parallelism) {
executeBatch(starRocksConfig.getPreSql());
}

@Override
public void finalizeGlobal(int parallelism) {
executeBatch(starRocksConfig.getPostSql());
}

/**
* 执行pre、post SQL
*
* @param sqlList
*/
protected void executeBatch(List<String> sqlList) {
if (CollectionUtils.isNotEmpty(sqlList)) {
StarRocksQueryVisitor starrocksQueryVisitor =
new StarRocksQueryVisitor(starRocksConfig);
StarRocksJdbcConnectionProvider jdbcConnProvider =
starrocksQueryVisitor.getJdbcConnProvider();
try {
jdbcConnProvider.checkValid();
Statement stmt = jdbcConnProvider.getConnection().createStatement();
for (String sql : sqlList) {
// 兼容多条SQL写在同一行的情况
String[] strings = sql.split(";");
for (String s : strings) {
if (StringUtils.isNotBlank(s)) {
log.info("add sql to batch, sql = {}", s);
stmt.addBatch(s);
}
}
}
stmt.executeBatch();
} catch (ClassNotFoundException se) {
throw new IllegalArgumentException(
"Failed to find jdbc driver." + se.getMessage(), se);
} catch (SQLException e) {
throw new RuntimeException(
"execute sql failed, sqlList = " + GsonUtil.GSON.toJson(sqlList), e);
} finally {
jdbcConnProvider.close();
}
}
}

@Override
protected void openInternal(int taskNumber, int numTasks) {
List<String> columnNameList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public StarRocksQueryVisitor(StarRocksConfig starRocksConfig) {
this.table = starRocksConfig.getTable();
}

public StarRocksJdbcConnectionProvider getJdbcConnProvider() {
return jdbcConnProvider;
}

public List<Map<String, Object>> getTableColumnsMetaData() {
return getTableColumnsMetaData(database, table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;

import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,6 +66,8 @@
import static com.dtstack.chunjun.connector.starrocks.options.StreamLoadOptions.QUEUE_POLL_TIMEOUT;
import static com.dtstack.chunjun.connector.starrocks.options.StreamLoadOptions.SINK_BATCH_MAX_BYTES;
import static com.dtstack.chunjun.connector.starrocks.options.StreamLoadOptions.SINK_BATCH_MAX_ROWS;
import static com.dtstack.chunjun.connector.starrocks.options.StreamLoadOptions.SINK_POST_SQL;
import static com.dtstack.chunjun.connector.starrocks.options.StreamLoadOptions.SINK_PRE_SQL;
import static com.dtstack.chunjun.connector.starrocks.options.StreamLoadOptions.STREAM_LOAD_HEAD_PROPERTIES;
import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_ASYNC_TIMEOUT;
import static com.dtstack.chunjun.lookup.options.LookupOptions.LOOKUP_CACHE_MAX_ROWS;
Expand Down Expand Up @@ -154,6 +159,12 @@ private StarRocksConfig createSinkConfByOptions(ReadableConfig options) {
sinkConf.setBatchSize(batchSize);
sinkConf.setFlushIntervalMills(sinkInternal);
sinkConf.setLoadConfig(loadConfig);
if (StringUtils.isNotEmpty(options.get(SINK_PRE_SQL))) {
sinkConf.setPreSql(Arrays.asList(options.get(SINK_PRE_SQL).split(";")));
}
if (StringUtils.isNotEmpty(options.get(SINK_POST_SQL))) {
sinkConf.setPostSql(Arrays.asList(options.get(SINK_POST_SQL).split(";")));
}
return sinkConf;
}

Expand Down

0 comments on commit 74c1591

Please sign in to comment.