-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Some refactoring ideas for Storage client library #1943
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,6 @@ | |
import com.google.api.gax.batching.FlowController; | ||
import com.google.auto.value.AutoValue; | ||
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; | ||
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.base.Stopwatch; | ||
import com.google.common.collect.ImmutableList; | ||
|
@@ -29,6 +28,7 @@ | |
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
@@ -67,22 +67,15 @@ public class ConnectionWorkerPool { | |
*/ | ||
private final FlowController.LimitExceededBehavior limitExceededBehavior; | ||
|
||
/** Map from write stream to corresponding connection. */ | ||
private final Map<StreamWriter, ConnectionWorker> streamWriterToConnection = | ||
new ConcurrentHashMap<>(); | ||
|
||
/** Map from a connection to a set of write stream that have sent requests onto it. */ | ||
private final Map<ConnectionWorker, Set<StreamWriter>> connectionToWriteStream = | ||
new ConcurrentHashMap<>(); | ||
|
||
/** Collection of all the created connections. */ | ||
private final Set<ConnectionWorker> connectionWorkerPool = | ||
Collections.synchronizedSet(new HashSet<>()); | ||
|
||
/* | ||
* Contains the mapping from stream name to updated schema. | ||
*/ | ||
private Map<String, TableSchemaAndTimestamp> tableNameToUpdatedSchema = new ConcurrentHashMap<>(); | ||
private Map<String, Set<StreamWriter>> streamNameToStreamWriter = new ConcurrentHashMap(); | ||
private Map<StreamWriter, TableSchema> streamWriterToUpdatedSchema = new ConcurrentHashMap<>(); | ||
|
||
/** Enable test related logic. */ | ||
private static boolean enableTesting = false; | ||
|
@@ -225,6 +218,12 @@ public static void setOptions(Settings settings) { | |
ConnectionWorkerPool.settings = settings; | ||
} | ||
|
||
public void registerStreamWriter(StreamWriter streamWriter) { | ||
streamNameToStreamWriter | ||
.computeIfAbsent(streamWriter.getStreamName(), k -> new HashSet<>()) | ||
.add(streamWriter); | ||
} | ||
|
||
/** Distributes the writing of a message to an underlying connection. */ | ||
public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows) { | ||
return append(streamWriter, rows, -1); | ||
|
@@ -234,47 +233,52 @@ public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows | |
public ApiFuture<AppendRowsResponse> append( | ||
StreamWriter streamWriter, ProtoRows rows, long offset) { | ||
// We are in multiplexing mode after entering the following logic. | ||
ConnectionWorker connectionWorker = | ||
streamWriterToConnection.compute( | ||
streamWriter, | ||
(key, existingStream) -> { | ||
// Though compute on concurrent map is atomic, we still do explicit locking as we | ||
// may have concurrent close(...) triggered. | ||
lock.lock(); | ||
try { | ||
// Stick to the existing stream if it's not overwhelmed. | ||
if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) { | ||
return existingStream; | ||
} | ||
// Try to create or find another existing stream to reuse. | ||
ConnectionWorker createdOrExistingConnection = null; | ||
try { | ||
createdOrExistingConnection = | ||
createOrReuseConnectionWorker(streamWriter, existingStream); | ||
} catch (IOException e) { | ||
throw new IllegalStateException(e); | ||
} | ||
// Update connection to write stream relationship. | ||
connectionToWriteStream.computeIfAbsent( | ||
createdOrExistingConnection, (ConnectionWorker k) -> new HashSet<>()); | ||
connectionToWriteStream.get(createdOrExistingConnection).add(streamWriter); | ||
return createdOrExistingConnection; | ||
} finally { | ||
lock.unlock(); | ||
} | ||
}); | ||
ConnectionWorker currentConnection; | ||
// TODO: Do we need a global lock here? Or is it enough to just lock the StreamWriter? | ||
lock.lock(); | ||
try { | ||
currentConnection = streamWriter.getCurrentConnectionPoolConnection(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we keep multiple (at least 2) connections in order to scale up, and avoid look into the global pool? |
||
// TODO: Experiment with only checking isOverwhelmed less often per StreamWriter (once per | ||
// second?) instead of on every append call. | ||
if (currentConnection == null || currentConnection.getLoad().isOverwhelmed()) { | ||
// Try to create or find another existing stream to reuse. | ||
ConnectionWorker createdOrExistingConnection = null; | ||
try { | ||
createdOrExistingConnection = | ||
createOrReuseConnectionWorker(streamWriter, currentConnection); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we still need global lock here. |
||
} catch (IOException e) { | ||
throw new IllegalStateException(e); | ||
} | ||
currentConnection = createdOrExistingConnection; | ||
streamWriter.setCurrentConnectionPoolConnection(currentConnection); | ||
// Update connection to write stream relationship. | ||
// TODO: What if we simply kept an atomic refcount in ConnectionWorker? We could also | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refcount would be error prone as we streamwriter could be switching back and forth between connection workers meaning one worker could be recording a single stream writer multiple times if using refcount There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that would be fine. The refcount removal would happen in the done callback (below in ApiFutures.transform), so would know exactly which connection worker to decrement even if the stream writer has moved to a different stream. |
||
// manage the refcount in the callback below to precisely track which connections are being | ||
// used. | ||
currentConnection.getCurrentStreamWriters().add(streamWriter); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this lock good enough to protect currentConnection? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we're using it to protect currentConnection.getCurrentStreamWriters() In theory we could put a lock inside of currentConnection, which would create more granular locking. However this would also cause a lot more lock/unlock activity (e.g. every call to append would have to lock at least two locks) so this change would need measurement to see if it better better. |
||
} | ||
} finally { | ||
lock.unlock(); | ||
} | ||
|
||
Stopwatch stopwatch = Stopwatch.createStarted(); | ||
ApiFuture<AppendRowsResponse> responseFuture = | ||
connectionWorker.append( | ||
currentConnection.append( | ||
streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset); | ||
return ApiFutures.transform( | ||
responseFuture, | ||
// Add callback for update schema | ||
(response) -> { | ||
if (response.getWriteStream() != "" && response.hasUpdatedSchema()) { | ||
tableNameToUpdatedSchema.put( | ||
response.getWriteStream(), | ||
TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema())); | ||
Set<StreamWriter> streamWritersToUpdate = | ||
streamNameToStreamWriter.get(response.getWriteStream()); | ||
if (streamWritersToUpdate != null) { | ||
for (StreamWriter updateStream : streamWritersToUpdate) { | ||
// Alternatively, just call a setter on each of these StreamWriters to tell it about | ||
// the new schema. That would eliminate another static map. | ||
streamWriterToUpdatedSchema.put(updateStream, response.getUpdatedSchema()); | ||
} | ||
} | ||
} | ||
return response; | ||
}, | ||
|
@@ -385,26 +389,30 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w | |
public void close(StreamWriter streamWriter) { | ||
lock.lock(); | ||
try { | ||
streamWriterToConnection.remove(streamWriter); | ||
Set<StreamWriter> streamWriters = streamNameToStreamWriter.get(streamWriter.getStreamName()); | ||
if (streamWriters != null) { | ||
streamWriters.remove(streamWriter); | ||
} | ||
|
||
streamWriter.setCurrentConnectionPoolConnection(null); | ||
// Since it's possible some other connections may have served this writeStream, we | ||
// iterate and see whether it's also fine to close other connections. | ||
Set<ConnectionWorker> connectionToRemove = new HashSet<>(); | ||
for (ConnectionWorker connectionWorker : connectionToWriteStream.keySet()) { | ||
if (connectionToWriteStream.containsKey(connectionWorker)) { | ||
connectionToWriteStream.get(connectionWorker).remove(streamWriter); | ||
if (connectionToWriteStream.get(connectionWorker).isEmpty()) { | ||
connectionWorker.close(); | ||
connectionWorkerPool.remove(connectionWorker); | ||
connectionToRemove.add(connectionWorker); | ||
} | ||
int numClosed = 0; | ||
for (Iterator<ConnectionWorker> it = connectionWorkerPool.iterator(); it.hasNext(); ) { | ||
ConnectionWorker connectionWorker = it.next(); | ||
connectionWorker.getCurrentStreamWriters().remove(streamWriter); | ||
if (connectionWorker.getCurrentStreamWriters().isEmpty()) { | ||
connectionWorker.close(); | ||
it.remove(); | ||
++numClosed; | ||
} | ||
} | ||
|
||
log.info( | ||
String.format( | ||
"During closing of writeStream for %s with writer id %s, we decided to close %s " | ||
+ "connections", | ||
streamWriter.getStreamName(), streamWriter.getWriterId(), connectionToRemove.size())); | ||
connectionToWriteStream.keySet().removeAll(connectionToRemove); | ||
streamWriter.getStreamName(), streamWriter.getWriterId(), numClosed)); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
|
@@ -414,7 +422,7 @@ public void close(StreamWriter streamWriter) { | |
public long getInflightWaitSeconds(StreamWriter streamWriter) { | ||
lock.lock(); | ||
try { | ||
ConnectionWorker connectionWorker = streamWriterToConnection.get(streamWriter); | ||
ConnectionWorker connectionWorker = streamWriter.getCurrentConnectionPoolConnection(); | ||
if (connectionWorker == null) { | ||
return 0; | ||
} else { | ||
|
@@ -425,8 +433,8 @@ public long getInflightWaitSeconds(StreamWriter streamWriter) { | |
} | ||
} | ||
|
||
TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) { | ||
return tableNameToUpdatedSchema.getOrDefault(streamWriter.getStreamName(), null); | ||
TableSchema getUpdatedSchema(StreamWriter streamWriter) { | ||
return streamWriterToUpdatedSchema.getOrDefault(streamWriter, null); | ||
} | ||
|
||
/** Enable Test related logic. */ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,6 @@ | |
import com.google.api.gax.rpc.TransportChannelProvider; | ||
import com.google.auto.value.AutoOneOf; | ||
import com.google.auto.value.AutoValue; | ||
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Preconditions; | ||
import io.grpc.Status; | ||
|
@@ -112,6 +111,21 @@ public static ConnectionPoolKey create(String location) { | |
} | ||
} | ||
|
||
private ConnectionWorker currentConnectionPoolConnection; | ||
|
||
/** | ||
* If using a single connection, this returns null. Always accessed under the ConnectionWorkerPool | ||
* lock. | ||
*/ | ||
ConnectionWorker getCurrentConnectionPoolConnection() { | ||
return currentConnectionPoolConnection; | ||
} | ||
|
||
/** Always accessed under the ConnectionWorkerPool lock. */ | ||
void setCurrentConnectionPoolConnection(ConnectionWorker currentConnectionPoolConnection) { | ||
this.currentConnectionPoolConnection = currentConnectionPoolConnection; | ||
} | ||
|
||
/** | ||
* When in single table mode, append directly to connectionWorker. Otherwise append to connection | ||
* pool in multiplexing mode. | ||
|
@@ -155,12 +169,12 @@ long getInflightWaitSeconds(StreamWriter streamWriter) { | |
return connectionWorker().getInflightWaitSeconds(); | ||
} | ||
|
||
TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) { | ||
if (getKind() == Kind.CONNECTION_WORKER_POOL) { | ||
TableSchema getUpdatedSchema(StreamWriter streamWriter) { | ||
if (getKind() == Kind.CONNECTION_WORKER) { | ||
return connectionWorker().getUpdatedSchema(); | ||
} else { | ||
return connectionWorkerPool().getUpdatedSchema(streamWriter); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this breaks the promise of StreamWriter only saw updates when there is a schema update? I think we should be fine to use nano time since it is monotonic on this machine?https://screenshot.googleplex.com/3Qeo9ouZEnehgMR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think nanoTime completely works. it is monotonic, but not strictly increasing - i.e. the current code is broken if the first update has the same nanoTime as the creation time, which is completely possible. |
||
} | ||
// Always populate MIN timestamp to w | ||
return connectionWorker().getUpdatedSchema(); | ||
} | ||
|
||
String getWriterId(String streamWriterId) { | ||
|
@@ -170,6 +184,11 @@ String getWriterId(String streamWriterId) { | |
return connectionWorker().getWriterId(); | ||
} | ||
|
||
public void register(StreamWriter streamWriter) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand the comment? |
||
Preconditions.checkState(getKind() == Kind.CONNECTION_WORKER_POOL); | ||
connectionWorkerPool().registerStreamWriter(streamWriter); | ||
} | ||
|
||
public static SingleConnectionOrConnectionPool ofSingleConnection(ConnectionWorker connection) { | ||
return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorker(connection); | ||
} | ||
|
@@ -259,6 +278,7 @@ private StreamWriter(Builder builder) throws IOException { | |
client, | ||
ownsBigQueryWriteClient); | ||
})); | ||
this.singleConnectionOrConnectionPool.register(this); | ||
validateFetchedConnectonPool(builder); | ||
// Shut down the passed in client. Internally we will create another client inside connection | ||
// pool for every new connection worker. | ||
|
@@ -429,14 +449,7 @@ public static StreamWriter.Builder newBuilder(String streamName) { | |
* than the updated schema. | ||
*/ | ||
public synchronized TableSchema getUpdatedSchema() { | ||
TableSchemaAndTimestamp tableSchemaAndTimestamp = | ||
singleConnectionOrConnectionPool.getUpdatedSchema(this); | ||
if (tableSchemaAndTimestamp == null) { | ||
return null; | ||
} | ||
return creationTimestamp < tableSchemaAndTimestamp.updateTimeStamp() | ||
? tableSchemaAndTimestamp.updatedSchema() | ||
: null; | ||
return singleConnectionOrConnectionPool.getUpdatedSchema(this); | ||
} | ||
|
||
long getCreationTimestamp() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I like this idea. We can reuse the connection more for the same StreamWriter.