Skip to content

Commit

Permalink
Improve Stream consumer and testing (#200)
Browse files Browse the repository at this point in the history
* Improve storage classes and add test using bigquery emulator docker container

* Improve storage classes and add test using bigquery emulator docker container
  • Loading branch information
ismailsimsek authored Oct 30, 2024
1 parent c4b9c45 commit b869977
Show file tree
Hide file tree
Showing 14 changed files with 405 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
Expand Down Expand Up @@ -156,23 +157,41 @@ public static TableResult executeQuery(BigQuery bqClient, String query) throws S
return ConsumerUtil.executeQuery(bqClient, query, null);
}

public static InstantiatingGrpcChannelProvider bigQueryTransportChannelProvider(Boolean isBigqueryDevEmulator, Optional<String> bigQueryCustomGRPCHost) {

InstantiatingGrpcChannelProvider.Builder builder = BigQueryWriteSettings.defaultGrpcTransportProviderBuilder();
if (isBigqueryDevEmulator) {
builder.setChannelConfigurator(ManagedChannelBuilder::usePlaintext);
}

if (!bigQueryCustomGRPCHost.orElse("").isEmpty()) {
builder.setEndpoint(bigQueryCustomGRPCHost.get());
}

builder
.setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
.setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
.setKeepAliveWithoutCalls(true)
.setChannelsPerCpu(2)
;
return builder.build();
}

public static BigQueryWriteSettings bigQueryWriteSettings(Boolean isBigqueryDevEmulator, BigQuery bqClient, Optional<String> bigQueryCustomGRPCHost) throws IOException {
BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder();

if (isBigqueryDevEmulator) {
// it is bigquery emulator
builder.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
.build()
);
builder.setCredentialsProvider(NoCredentialsProvider.create());
} else {
builder.setCredentialsProvider(FixedCredentialsProvider.create(bqClient.getOptions().getCredentials()));
}

if (!bigQueryCustomGRPCHost.orElse("").isEmpty()) {
builder.setEndpoint(bigQueryCustomGRPCHost.get());
}

builder.setTransportChannelProvider(bigQueryTransportChannelProvider(isBigqueryDevEmulator, bigQueryCustomGRPCHost));
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@
package io.debezium.server.bigquery;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.debezium.DebeziumException;
import io.grpc.Status;
import io.grpc.Status.Code;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
Expand All @@ -33,7 +31,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
Expand All @@ -45,18 +42,8 @@
@Dependent
@Beta
public class StreamBigqueryChangeConsumer extends AbstractChangeConsumer {
protected static final ConcurrentHashMap<String, DataWriter> jsonStreamWriters = new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<String, StreamDataWriter> jsonStreamWriters = new ConcurrentHashMap<>();
static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
private static final int MAX_RETRY_COUNT = 3;
private static final ImmutableList<Code> RETRIABLE_ERROR_CODES =
ImmutableList.of(
Code.INTERNAL,
Code.ABORTED,
Code.CANCELLED,
Code.FAILED_PRECONDITION,
Code.DEADLINE_EXCEEDED,
Code.UNAVAILABLE);

public static BigQueryWriteClient bigQueryWriteClient;
@ConfigProperty(name = "debezium.sink.batch.destination-regexp", defaultValue = "")
protected Optional<String> destinationRegexp;
Expand Down Expand Up @@ -107,7 +94,7 @@ void connect() throws InterruptedException {

@PreDestroy
void closeStreams() {
for (Map.Entry<String, DataWriter> sw : jsonStreamWriters.entrySet()) {
for (Map.Entry<String, StreamDataWriter> sw : jsonStreamWriters.entrySet()) {
try {
sw.getValue().close(bigQueryWriteClient);
} catch (Exception e) {
Expand All @@ -131,13 +118,16 @@ public void initizalize() throws InterruptedException {
}
}

private DataWriter getDataWriter(Table table) {
private StreamDataWriter getDataWriter(Table table) {
try {
return new DataWriter(
StreamDataWriter writer = new StreamDataWriter(
TableName.of(table.getTableId().getProject(), table.getTableId().getDataset(), table.getTableId().getTable()),
bigQueryWriteClient,
ignoreUnknownFields
ignoreUnknownFields,
ConsumerUtil.bigQueryTransportChannelProvider(isBigqueryDevEmulator, bigQueryCustomGRPCHost)
);
writer.initialize();
return writer;
} catch (DescriptorValidationException | IOException | InterruptedException e) {
throw new DebeziumException("Failed to initialize stream writer for table " + table.getTableId(), e);
}
Expand All @@ -148,7 +138,7 @@ public long uploadDestination(String destination, List<RecordConverter> data) {
long numRecords = data.size();
Table table = getTable(destination, data.get(0));
// get stream writer create if not yet exists!
DataWriter writer = jsonStreamWriters.computeIfAbsent(destination, k -> getDataWriter(table));
StreamDataWriter writer = jsonStreamWriters.computeIfAbsent(destination, k -> getDataWriter(table));
try {
// running with upsert mode deduplicate data! for the tables having Primary Key
// for the tables without primary key run append mode
Expand Down Expand Up @@ -305,61 +295,6 @@ private Table updateTableSchema(Table table, Schema updatedSchema, String destin
return table;
}

protected static class DataWriter {
private final JsonStreamWriter streamWriter;

public DataWriter(TableName parentTable, BigQueryWriteClient client,
Boolean ignoreUnknownFields)
throws DescriptorValidationException, IOException, InterruptedException {

// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter = JsonStreamWriter
.newBuilder(parentTable.toString(), client)
.setIgnoreUnknownFields(ignoreUnknownFields)
.build();
}

private void appendSync(JSONArray data, int retryCount) throws DescriptorValidationException,
IOException {
ApiFuture<AppendRowsResponse> future = streamWriter.append(data);
try {
AppendRowsResponse response = future.get();
if (response.hasError()) {
throw new DebeziumException("Failed to append data to stream. " + response.getError().getMessage());
}
} catch (InterruptedException | ExecutionException throwable) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
// see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
Status status = Status.fromThrowable(throwable);
if (retryCount < MAX_RETRY_COUNT
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
// Since default stream appends are not ordered, we can simply retry the appends.
// Retrying with exclusive streams requires more careful consideration.
this.appendSync(data, ++retryCount);
// Mark the existing attempt as done since it's being retried.
} else {
throw new DebeziumException("Failed to append data to stream " + streamWriter.getStreamName() + "\n" + throwable.getMessage(),
throwable);
}
}

}

public void appendSync(JSONArray data) throws DescriptorValidationException, IOException {
this.appendSync(data, 0);
}

public void close(BigQueryWriteClient client) {
if (streamWriter != null) {
streamWriter.close();
client.finalizeWriteStream(streamWriter.getStreamName());
}
}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.debezium.server.bigquery;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.debezium.DebeziumException;
import org.json.JSONArray;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


public class StreamDataWriter {
private static final int MAX_RECREATE_COUNT = 3;
private final BigQueryWriteClient client;
private final Boolean ignoreUnknownFields;
private final InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider;
private final TableName parentTable;
private final Object lock = new Object();
JsonStreamWriter streamWriter;
private AtomicInteger recreateCount = new AtomicInteger(0);


public StreamDataWriter(TableName parentTable, BigQueryWriteClient client,
Boolean ignoreUnknownFields, InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider)
throws DescriptorValidationException, IOException, InterruptedException {
this.client = client;
this.ignoreUnknownFields = ignoreUnknownFields;
this.instantiatingGrpcChannelProvider = instantiatingGrpcChannelProvider;
this.parentTable = parentTable;
}

public void initialize()
throws DescriptorValidationException, IOException, InterruptedException {
streamWriter = createStreamWriter(this.parentTable.toString());
}

private JsonStreamWriter createStreamWriter(String parentTableName)
throws DescriptorValidationException, IOException, InterruptedException {
// https://cloud.google.com/bigquery/docs/write-api-streaming
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
return JsonStreamWriter.newBuilder(parentTableName, client)
.setIgnoreUnknownFields(ignoreUnknownFields)
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setChannelProvider(instantiatingGrpcChannelProvider)
.setEnableConnectionPool(true)
// If value is missing in json and there is a default value configured on bigquery
// column, apply the default value to the missing value field.
.setDefaultMissingValueInterpretation(AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE)
.setRetrySettings(retrySettings)
.build();
}


public void appendSync(JSONArray data) throws DescriptorValidationException, IOException {
try {
synchronized (this.lock) {
if (!streamWriter.isUserClosed() && streamWriter.isClosed() && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
streamWriter = createStreamWriter(streamWriter.getStreamName());
}
}

ApiFuture<AppendRowsResponse> future = streamWriter.append(data);
AppendRowsResponse response = future.get();
if (response.hasError()) {
throw new DebeziumException("Failed to append data to stream. Error Code:" + response.getError().getCode() + " Error Message:" + response.getError().getMessage());
}
} catch (Exception throwable) {
throw new DebeziumException("Failed to append data to stream " + streamWriter.getStreamName() + "\n" + throwable.getMessage(),
throwable);
}
}

public void close(BigQueryWriteClient client) {
if (streamWriter != null) {
streamWriter.close();
client.finalizeWriteStream(streamWriter.getStreamName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,15 @@

package io.debezium.server.bigquery;

import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.*;
import io.debezium.DebeziumException;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;

import static io.debezium.server.bigquery.shared.BigQueryDB.*;
import static io.debezium.server.bigquery.shared.BigQueryDB.BQ_DATASET;

/**
*
Expand All @@ -32,41 +27,6 @@ public class BaseBigqueryTest {
public static final Logger LOGGER = LoggerFactory.getLogger(BaseBigqueryTest.class);
public static BigQuery bqClient;

static {
GoogleCredentials credentials;
try {
if (BQ_CRED_FILE != null && !BQ_CRED_FILE.isEmpty()) {
credentials = GoogleCredentials.fromStream(new FileInputStream(BQ_CRED_FILE));
} else {
credentials = GoogleCredentials.getApplicationDefault();
}
} catch (IOException e) {
throw new DebeziumException("Failed to initialize google credentials", e);
}

bqClient = BigQueryOptions.newBuilder()
.setCredentials(credentials)
//.setProjectId(BQ_PROJECT) // use project from cred file!?
.setLocation(BQ_LOCATION)
.setRetrySettings(
RetrySettings.newBuilder()
// Set the max number of attempts
.setMaxAttempts(5)
// InitialRetryDelay controls the delay before the first retry.
// Subsequent retries will use this value adjusted according to the RetryDelayMultiplier.
.setInitialRetryDelay(org.threeten.bp.Duration.ofSeconds(5))
.setMaxRetryDelay(org.threeten.bp.Duration.ofSeconds(60))
// Set the backoff multiplier
.setRetryDelayMultiplier(2.0)
// Set the max duration of all attempts
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5))
.build()
)
.build()
.getService();
LOGGER.warn("Using BQ project {}", bqClient.getOptions().getProjectId());
}

public static Schema getTableSchema(String destination) throws InterruptedException {
TableId tableId = getTableId(destination);
return bqClient.getTable(tableId).getDefinition().getSchema();
Expand Down Expand Up @@ -149,28 +109,6 @@ && getTableField(dest, "__deleted").getType() == LegacySQLTypeName.BOOLEAN
});
}

public void loadVariousDataTypeConversion() {
//SourcePostgresqlDB.runSQL("DELETE FROM inventory.test_data_types WHERE c_id>0;");
String dest = "testc.inventory.test_data_types";
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
//getTableData(dest).iterateAll().forEach(System.out::println);
return getTableData(dest).getTotalRows() >= 3
// '2019-07-09 02:28:10.123456+01' --> hour is UTC in BQ
&& getTableData(dest, "c_timestamptz = TIMESTAMP('2019-07-09T01:28:10.123456Z')").getTotalRows() == 1
// '2019-07-09 02:28:20.666666+01' --> hour is UTC in BQ
&& getTableData(dest, "c_timestamptz = TIMESTAMP('2019-07-09T01:28:20.666666Z')").getTotalRows() == 1
&& getTableData(dest, "DATE(c_timestamptz) = DATE('2019-07-09')").getTotalRows() >= 2
&& getTableField(dest, "c_timestamptz").getType() == LegacySQLTypeName.TIMESTAMP
&& getTableData(dest, "c_date = DATE('2017-09-15')").getTotalRows() == 1
&& getTableData(dest, "c_date = DATE('2017-02-10')").getTotalRows() == 1
;
} catch (Exception e) {
return false;
}
});
}

public static TableResult getTableData(String destination) throws InterruptedException {
return getTableData(destination, "1=1");
}
Expand Down
Loading

0 comments on commit b869977

Please sign in to comment.