Skip to content

Commit

Permalink
[Fix][Connector-V2] Fixed clickhouse connectors cannot stop under mul…
Browse files Browse the repository at this point in the history
…tiple parallelism (#7921)
  • Loading branch information
YOMO-Lee authored Oct 29, 2024
1 parent 72ab38f commit 8d9c6a3
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;

import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -28,13 +29,15 @@
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

@Slf4j
public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, ClickhouseSourceSplit> {

private final List<ClickHouseNode> servers;
Expand All @@ -43,6 +46,7 @@ public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, Clickh
private final SourceReader.Context readerContext;
private ClickHouseRequest<?> request;
private final String sql;
private volatile boolean noMoreSplit;

private final List<ClickhouseSourceSplit> splits;

Expand Down Expand Up @@ -75,31 +79,43 @@ public void close() throws IOException {

@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
if (!splits.isEmpty()) {
try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) {
response.stream()
.forEach(
record -> {
Object[] values =
new Object[this.rowTypeInfo.getFieldNames().length];
for (int i = 0; i < record.size(); i++) {
if (record.getValue(i).isNullOrEmpty()) {
values[i] = null;
} else {
values[i] =
TypeConvertUtil.valueUnwrap(
this.rowTypeInfo.getFieldType(i),
record.getValue(i));
synchronized (output.getCheckpointLock()) {
if (!splits.isEmpty()) {
try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) {
response.stream()
.forEach(
record -> {
Object[] values =
new Object[this.rowTypeInfo.getFieldNames().length];
for (int i = 0; i < record.size(); i++) {
if (record.getValue(i).isNullOrEmpty()) {
values[i] = null;
} else {
values[i] =
TypeConvertUtil.valueUnwrap(
this.rowTypeInfo.getFieldType(i),
record.getValue(i));
}
}
}
output.collect(new SeaTunnelRow(values));
});
output.collect(new SeaTunnelRow(values));
});
}
signalNoMoreElement();
}
if (noMoreSplit
&& splits.isEmpty()
&& Boundedness.BOUNDED.equals(readerContext.getBoundedness())) {
signalNoMoreElement();
}
this.readerContext.signalNoMoreElement();
this.splits.clear();
}
}

private void signalNoMoreElement() {
log.info("Closed the bounded ClickHouse source");
this.readerContext.signalNoMoreElement();
this.splits.clear();
}

@Override
public List<ClickhouseSourceSplit> snapshotState(long checkpointId) throws Exception {
return Collections.emptyList();
Expand All @@ -111,7 +127,9 @@ public void addSplits(List<ClickhouseSourceSplit> splits) {
}

@Override
public void handleNoMoreSplits() {}
public void handleNoMoreSplits() {
noMoreSplit = true;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void registerReader(int subtaskId) {
assigned = subtaskId;
context.assignSplit(subtaskId, new ClickhouseSourceSplit());
}
context.signalNoMoreSplits(subtaskId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public void testClickhouse(TestContainer container) throws Exception {
clearSinkTable();
}

@TestTemplate
public void testSourceParallelism(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/clickhouse_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@BeforeAll
@Override
public void startUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 3
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from source_table"
username = "default"
password = ""
result_table_name = "source_table"
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource
}

sink {
console {
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ seatunnel:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
Expand Down

0 comments on commit 8d9c6a3

Please sign in to comment.