Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Some refactoring ideas for Storage client library #1943

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -164,7 +165,7 @@ public class ConnectionWorker implements AutoCloseable {
* Contains the updated TableSchema.
*/
@GuardedBy("lock")
private TableSchemaAndTimestamp updatedSchema;
private TableSchema updatedSchema;

/*
* A client used to interact with BigQuery.
Expand Down Expand Up @@ -196,6 +197,12 @@ public class ConnectionWorker implements AutoCloseable {
*/
private final String writerId = UUID.randomUUID().toString();

/**
* When using connection pooling, this is the set of StreamWriters that are currently using this
* connection. This object is only accessed by ConnectionWorkerPool when the pool's lock is held.
*/
private final Set<StreamWriter> streamWriters;

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
Expand Down Expand Up @@ -241,6 +248,7 @@ public void run() {
}
});
this.appendThread.start();
this.streamWriters = new HashSet<>();
}

private void resetConnection() {
Expand Down Expand Up @@ -392,6 +400,10 @@ public void close() {
}
}

Set<StreamWriter> getCurrentStreamWriters() {
return streamWriters;
}

/*
* This loop is executed in a separate thread.
*
Expand Down Expand Up @@ -616,8 +628,7 @@ private void requestCallback(AppendRowsResponse response) {
AppendRequestAndResponse requestWrapper;
this.lock.lock();
if (response.hasUpdatedSchema()) {
this.updatedSchema =
TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema());
this.updatedSchema = response.getUpdatedSchema();
}
try {
// Had a successful connection with at least one result, reset retries.
Expand Down Expand Up @@ -742,7 +753,7 @@ private AppendRequestAndResponse pollInflightRequestQueue() {
}

/** Thread-safe getter of updated TableSchema */
synchronized TableSchemaAndTimestamp getUpdatedSchema() {
synchronized TableSchema getUpdatedSchema() {
return this.updatedSchema;
}

Expand Down Expand Up @@ -840,17 +851,4 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) {
overwhelmedInflightCount = newThreshold;
}
}

@AutoValue
abstract static class TableSchemaAndTimestamp {
// Shows the timestamp updated schema is reported from response
abstract long updateTimeStamp();

// The updated schema returned from server.
abstract TableSchema updatedSchema();

static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedSchema) {
return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.api.gax.batching.FlowController;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
Expand All @@ -29,6 +28,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -67,22 +67,15 @@ public class ConnectionWorkerPool {
*/
private final FlowController.LimitExceededBehavior limitExceededBehavior;

/** Map from write stream to corresponding connection. */
private final Map<StreamWriter, ConnectionWorker> streamWriterToConnection =
new ConcurrentHashMap<>();

/** Map from a connection to a set of write stream that have sent requests onto it. */
private final Map<ConnectionWorker, Set<StreamWriter>> connectionToWriteStream =
new ConcurrentHashMap<>();

/** Collection of all the created connections. */
private final Set<ConnectionWorker> connectionWorkerPool =
Collections.synchronizedSet(new HashSet<>());

/*
* Contains the mapping from stream name to updated schema.
*/
private Map<String, TableSchemaAndTimestamp> tableNameToUpdatedSchema = new ConcurrentHashMap<>();
private Map<String, Set<StreamWriter>> streamNameToStreamWriter = new ConcurrentHashMap();
private Map<StreamWriter, TableSchema> streamWriterToUpdatedSchema = new ConcurrentHashMap<>();

/** Enable test related logic. */
private static boolean enableTesting = false;
Expand Down Expand Up @@ -225,6 +218,12 @@ public static void setOptions(Settings settings) {
ConnectionWorkerPool.settings = settings;
}

public void registerStreamWriter(StreamWriter streamWriter) {
streamNameToStreamWriter
.computeIfAbsent(streamWriter.getStreamName(), k -> new HashSet<>())
.add(streamWriter);
}

/** Distributes the writing of a message to an underlying connection. */
public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows) {
return append(streamWriter, rows, -1);
Expand All @@ -234,47 +233,52 @@ public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows
public ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows rows, long offset) {
// We are in multiplexing mode after entering the following logic.
ConnectionWorker connectionWorker =
streamWriterToConnection.compute(
streamWriter,
(key, existingStream) -> {
// Though compute on concurrent map is atomic, we still do explicit locking as we
// may have concurrent close(...) triggered.
lock.lock();
try {
// Stick to the existing stream if it's not overwhelmed.
if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) {
return existingStream;
}
// Try to create or find another existing stream to reuse.
ConnectionWorker createdOrExistingConnection = null;
try {
createdOrExistingConnection =
createOrReuseConnectionWorker(streamWriter, existingStream);
} catch (IOException e) {
throw new IllegalStateException(e);
}
// Update connection to write stream relationship.
connectionToWriteStream.computeIfAbsent(
createdOrExistingConnection, (ConnectionWorker k) -> new HashSet<>());
connectionToWriteStream.get(createdOrExistingConnection).add(streamWriter);
return createdOrExistingConnection;
} finally {
lock.unlock();
}
});
ConnectionWorker currentConnection;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I like this idea. We can reuse the connection more for the same StreamWriter.

// TODO: Do we need a global lock here? Or is it enough to just lock the StreamWriter?
lock.lock();
try {
currentConnection = streamWriter.getCurrentConnectionPoolConnection();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep multiple (at least 2) connections in order to scale up, and avoid look into the global pool?

// TODO: Experiment with only checking isOverwhelmed less often per StreamWriter (once per
// second?) instead of on every append call.
if (currentConnection == null || currentConnection.getLoad().isOverwhelmed()) {
// Try to create or find another existing stream to reuse.
ConnectionWorker createdOrExistingConnection = null;
try {
createdOrExistingConnection =
createOrReuseConnectionWorker(streamWriter, currentConnection);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need global lock here.

} catch (IOException e) {
throw new IllegalStateException(e);
}
currentConnection = createdOrExistingConnection;
streamWriter.setCurrentConnectionPoolConnection(currentConnection);
// Update connection to write stream relationship.
// TODO: What if we simply kept an atomic refcount in ConnectionWorker? We could also
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refcount would be error prone as we streamwriter could be switching back and forth between connection workers meaning one worker could be recording a single stream writer multiple times if using refcount

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be fine. The refcount removal would happen in the done callback (below in ApiFutures.transform), so would know exactly which connection worker to decrement even if the stream writer has moved to a different stream.

// manage the refcount in the callback below to precisely track which connections are being
// used.
currentConnection.getCurrentStreamWriters().add(streamWriter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this lock good enough to protect currentConnection?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're using it to protect currentConnection.getCurrentStreamWriters()

In theory we could put a lock inside of currentConnection, which would create more granular locking. However this would also cause a lot more lock/unlock activity (e.g. every call to append would have to lock at least two locks) so this change would need measurement to see if it better better.

}
} finally {
lock.unlock();
}

Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<AppendRowsResponse> responseFuture =
connectionWorker.append(
currentConnection.append(
streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset);
return ApiFutures.transform(
responseFuture,
// Add callback for update schema
(response) -> {
if (response.getWriteStream() != "" && response.hasUpdatedSchema()) {
tableNameToUpdatedSchema.put(
response.getWriteStream(),
TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema()));
Set<StreamWriter> streamWritersToUpdate =
streamNameToStreamWriter.get(response.getWriteStream());
if (streamWritersToUpdate != null) {
for (StreamWriter updateStream : streamWritersToUpdate) {
// Alternatively, just call a setter on each of these StreamWriters to tell it about
// the new schema. That would eliminate another static map.
streamWriterToUpdatedSchema.put(updateStream, response.getUpdatedSchema());
}
}
}
return response;
},
Expand Down Expand Up @@ -385,26 +389,30 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
public void close(StreamWriter streamWriter) {
lock.lock();
try {
streamWriterToConnection.remove(streamWriter);
Set<StreamWriter> streamWriters = streamNameToStreamWriter.get(streamWriter.getStreamName());
if (streamWriters != null) {
streamWriters.remove(streamWriter);
}

streamWriter.setCurrentConnectionPoolConnection(null);
// Since it's possible some other connections may have served this writeStream, we
// iterate and see whether it's also fine to close other connections.
Set<ConnectionWorker> connectionToRemove = new HashSet<>();
for (ConnectionWorker connectionWorker : connectionToWriteStream.keySet()) {
if (connectionToWriteStream.containsKey(connectionWorker)) {
connectionToWriteStream.get(connectionWorker).remove(streamWriter);
if (connectionToWriteStream.get(connectionWorker).isEmpty()) {
connectionWorker.close();
connectionWorkerPool.remove(connectionWorker);
connectionToRemove.add(connectionWorker);
}
int numClosed = 0;
for (Iterator<ConnectionWorker> it = connectionWorkerPool.iterator(); it.hasNext(); ) {
ConnectionWorker connectionWorker = it.next();
connectionWorker.getCurrentStreamWriters().remove(streamWriter);
if (connectionWorker.getCurrentStreamWriters().isEmpty()) {
connectionWorker.close();
it.remove();
++numClosed;
}
}

log.info(
String.format(
"During closing of writeStream for %s with writer id %s, we decided to close %s "
+ "connections",
streamWriter.getStreamName(), streamWriter.getWriterId(), connectionToRemove.size()));
connectionToWriteStream.keySet().removeAll(connectionToRemove);
streamWriter.getStreamName(), streamWriter.getWriterId(), numClosed));
} finally {
lock.unlock();
}
Expand All @@ -414,7 +422,7 @@ public void close(StreamWriter streamWriter) {
public long getInflightWaitSeconds(StreamWriter streamWriter) {
lock.lock();
try {
ConnectionWorker connectionWorker = streamWriterToConnection.get(streamWriter);
ConnectionWorker connectionWorker = streamWriter.getCurrentConnectionPoolConnection();
if (connectionWorker == null) {
return 0;
} else {
Expand All @@ -425,8 +433,8 @@ public long getInflightWaitSeconds(StreamWriter streamWriter) {
}
}

TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) {
return tableNameToUpdatedSchema.getOrDefault(streamWriter.getStreamName(), null);
TableSchema getUpdatedSchema(StreamWriter streamWriter) {
return streamWriterToUpdatedSchema.getOrDefault(streamWriter, null);
}

/** Enable Test related logic. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Status;
Expand Down Expand Up @@ -112,6 +111,21 @@ public static ConnectionPoolKey create(String location) {
}
}

private ConnectionWorker currentConnectionPoolConnection;

/**
* If using a single connection, this returns null. Always accessed under the ConnectionWorkerPool
* lock.
*/
ConnectionWorker getCurrentConnectionPoolConnection() {
return currentConnectionPoolConnection;
}

/** Always accessed under the ConnectionWorkerPool lock. */
void setCurrentConnectionPoolConnection(ConnectionWorker currentConnectionPoolConnection) {
this.currentConnectionPoolConnection = currentConnectionPoolConnection;
}

/**
* When in single table mode, append directly to connectionWorker. Otherwise append to connection
* pool in multiplexing mode.
Expand Down Expand Up @@ -155,12 +169,12 @@ long getInflightWaitSeconds(StreamWriter streamWriter) {
return connectionWorker().getInflightWaitSeconds();
}

TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) {
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
TableSchema getUpdatedSchema(StreamWriter streamWriter) {
if (getKind() == Kind.CONNECTION_WORKER) {
return connectionWorker().getUpdatedSchema();
} else {
return connectionWorkerPool().getUpdatedSchema(streamWriter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this breaks the promise of StreamWriter only saw updates when there is a schema update? I think we should be fine to use nano time since it is monotonic on this machine?https://screenshot.googleplex.com/3Qeo9ouZEnehgMR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think nanoTime completely works. it is monotonic, but not strictly increasing - i.e. the current code is broken if the first update has the same nanoTime as the creation time, which is completely possible.

}
// Always populate MIN timestamp to w
return connectionWorker().getUpdatedSchema();
}

String getWriterId(String streamWriterId) {
Expand All @@ -170,6 +184,11 @@ String getWriterId(String streamWriterId) {
return connectionWorker().getWriterId();
}

public void register(StreamWriter streamWriter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be removed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the comment?

Preconditions.checkState(getKind() == Kind.CONNECTION_WORKER_POOL);
connectionWorkerPool().registerStreamWriter(streamWriter);
}

public static SingleConnectionOrConnectionPool ofSingleConnection(ConnectionWorker connection) {
return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorker(connection);
}
Expand Down Expand Up @@ -259,6 +278,7 @@ private StreamWriter(Builder builder) throws IOException {
client,
ownsBigQueryWriteClient);
}));
this.singleConnectionOrConnectionPool.register(this);
validateFetchedConnectonPool(builder);
// Shut down the passed in client. Internally we will create another client inside connection
// pool for every new connection worker.
Expand Down Expand Up @@ -429,14 +449,7 @@ public static StreamWriter.Builder newBuilder(String streamName) {
* than the updated schema.
*/
public synchronized TableSchema getUpdatedSchema() {
TableSchemaAndTimestamp tableSchemaAndTimestamp =
singleConnectionOrConnectionPool.getUpdatedSchema(this);
if (tableSchemaAndTimestamp == null) {
return null;
}
return creationTimestamp < tableSchemaAndTimestamp.updateTimeStamp()
? tableSchemaAndTimestamp.updatedSchema()
: null;
return singleConnectionOrConnectionPool.getUpdatedSchema(this);
}

long getCreationTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ private void testUpdatedSchemaFetch(boolean enableMultiplexing)
.setWriteStream(TEST_STREAM_1)
.build());

assertEquals(writer.getUpdatedSchema(), null);
assertEquals(null, writer.getUpdatedSchema());
AppendRowsResponse response =
writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0).get();
assertEquals(writer.getUpdatedSchema(), UPDATED_TABLE_SCHEMA);
assertEquals(UPDATED_TABLE_SCHEMA, writer.getUpdatedSchema());

// Create another writer, although it's the same stream name but the time stamp is newer, thus
// the old updated schema won't get returned.
Expand All @@ -325,7 +325,7 @@ private void testUpdatedSchemaFetch(boolean enableMultiplexing)
.setEnableConnectionPool(enableMultiplexing)
.setLocation("us")
.build();
assertEquals(writer2.getUpdatedSchema(), null);
assertEquals(null, writer2.getUpdatedSchema());
}

@Test
Expand Down
Loading