Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: send header with connected client #1980

Draft
wants to merge 1 commit into
base: postgresql-dialect
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.google.cloud.spanner.connection;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.auth.Credentials;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
Expand All @@ -33,4 +34,10 @@ public static Builder setCredentials(Builder connectionOptionsBuilder, Credentia
public static Spanner getSpanner(Connection connection) {
return ((ConnectionImpl) connection).getSpanner();
}

public static Builder setInterceptorProvider(
Builder connectionOptionsBuilder, GrpcInterceptorProvider provider) {
connectionOptionsBuilder.setConfigurator(options -> options.setInterceptorProvider(provider));
return connectionOptionsBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.spanner.admin.database.v1.InstanceName;
import com.google.spanner.v1.DatabaseName;
import java.io.DataOutputStream;
Expand Down Expand Up @@ -144,6 +145,8 @@ public class ConnectionHandler implements Runnable {
*/
private final LinkedList<ParseMessage> skippedAutoDetectParseMessages = new LinkedList<>();

private Map<String, List<String>> extraHeaders = ImmutableMap.of();

private ExtendedQueryProtocolHandler extendedQueryProtocolHandler;
private CopyStatement activeCopyStatement;

Expand Down Expand Up @@ -788,13 +791,24 @@ public synchronized void setStatus(ConnectionStatus status) {
this.status = status;
}

boolean hasExtraHeaders() {
return !this.extraHeaders.isEmpty();
}

Map<String, List<String>> getExtraHeaders() {
return this.extraHeaders;
}

public WellKnownClient getWellKnownClient() {
return wellKnownClient;
}

public void setWellKnownClient(WellKnownClient wellKnownClient) {
this.wellKnownClient = wellKnownClient;
if (this.wellKnownClient != WellKnownClient.UNSPECIFIED) {
// Include the well-known client in a header that we send to Spanner.
this.extraHeaders =
ImmutableMap.of("pgadapter-well-known-client", ImmutableList.of(wellKnownClient.name()));
logger.log(
Level.INFO,
Logging.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

import com.google.api.core.AbstractApiService;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.ThreadFactoryUtil;
import com.google.cloud.spanner.connection.SpannerPool;
import com.google.cloud.spanner.pgadapter.ConnectionHandler.QueryMode;
Expand All @@ -28,6 +30,10 @@
import com.google.cloud.spanner.pgadapter.utils.Metrics;
import com.google.cloud.spanner.pgadapter.wireprotocol.WireMessage;
import com.google.common.collect.ImmutableList;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import java.io.Closeable;
Expand Down Expand Up @@ -59,6 +65,11 @@
public class ProxyServer extends AbstractApiService {

private static final Logger logger = Logger.getLogger(ProxyServer.class.getName());
private static final CallOptions.Key<Map<Metadata.Key<String>, String>>
DYNAMIC_HEADERS_CALL_OPTION_KEY =
CallOptions.Key.createWithDefault(
"gax_dynamic_headers", Collections.<Metadata.Key<String>, String>emptyMap());

private final OptionsMetadata options;
private final OpenTelemetry openTelemetry;
private final Metrics metrics;
Expand Down Expand Up @@ -343,7 +354,24 @@ void createConnectionHandler(Socket socket) throws SocketException {
socket.setTcpNoDelay(true);
ConnectionHandler handler = new ConnectionHandler(this, socket);
register(handler);
Thread thread = threadFactory.newThread(handler);
// Create a gRPC context for the connection, so we can assign custom headers specifically for
// that connection. This allows us to for example include the name of the client that is
// connected in the user-agent header.
Context context =
Context.current()
.withValue(
SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY,
new SpannerOptions.CallContextConfigurator() {
@Override
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
if (handler.hasExtraHeaders()) {
return context.withExtraHeaders(handler.getExtraHeaders());
}
return context;
}
});
Thread thread = threadFactory.newThread(context.wrap(handler));
handler.setThread(thread);
handler.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Logger;
Expand Down Expand Up @@ -1040,6 +1043,8 @@ protected static ResultSetMetadata createAllArrayTypesResultSetMetadata(String c
protected static MockInstanceAdminImpl mockInstanceAdmin;
protected static Server spannerServer;
protected static ProxyServer pgServer;
protected static Set<String> WELL_KNOWN_CLIENT_HEADERS =
Collections.synchronizedSet(new HashSet<>());

protected List<WireMessage> getWireMessages() {
return new ArrayList<>(pgServer.getDebugMessages());
Expand Down Expand Up @@ -1280,6 +1285,14 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(
assertTrue(userAgent.contains("pg-adapter"));
assertTrue(
userAgent.contains(ServiceOptions.getGoogApiClientLibName() + "/"));

String pgAdapterWellKnownClient =
metadata.get(
Metadata.Key.of(
"pgadapter-well-known-client", Metadata.ASCII_STRING_MARSHALLER));
if (pgAdapterWellKnownClient != null) {
WELL_KNOWN_CLIENT_HEADERS.add(pgAdapterWellKnownClient);
}
}
return Contexts.interceptCall(
Context.current(), serverCall, metadata, serverCallHandler);
Expand Down Expand Up @@ -1374,6 +1387,7 @@ public void clearRequests() {
if (pgServer != null) {
pgServer.clearDebugMessages();
}
WELL_KNOWN_CLIENT_HEADERS.clear();
}

protected void addDdlResponseToSpannerAdmin() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.cloud.spanner.pgadapter.statements.PgCatalog.PgConstraint;
import com.google.cloud.spanner.pgadapter.statements.PgCatalog.PgExtension;
import com.google.cloud.spanner.pgadapter.statements.PgCatalog.PgIndex;
import com.google.cloud.spanner.pgadapter.utils.ClientAutoDetector.WellKnownClient;
import com.google.cloud.spanner.pgadapter.wireprotocol.ControlMessage.PreparedType;
import com.google.cloud.spanner.pgadapter.wireprotocol.DescribeMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.ExecuteMessage;
Expand Down Expand Up @@ -420,6 +421,11 @@ public void testQuery() throws SQLException {
assertTrue(request.getTransaction().hasSingleUse());
assertTrue(request.getTransaction().getSingleUse().hasReadOnly());
}

// Verify that a header was sent to Spanner to indicate which client was connected to PGAdapter.
assertTrue(
WELL_KNOWN_CLIENT_HEADERS.toString(),
WELL_KNOWN_CLIENT_HEADERS.contains(WellKnownClient.JDBC.name()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.cloud.spanner.pgadapter.AbstractMockServerTest;
import com.google.cloud.spanner.pgadapter.CopyInMockServerTest;
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
import com.google.cloud.spanner.pgadapter.utils.ClientAutoDetector.WellKnownClient;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -158,6 +159,11 @@ public void testSelect1() {
ExecuteSqlRequest executeRequest = requests.get(1);
assertEquals(sql, executeRequest.getSql());
assertEquals(QueryMode.NORMAL, executeRequest.getQueryMode());

// Verify that a header was sent to Spanner to indicate which client was connected to PGAdapter.
assertTrue(
WELL_KNOWN_CLIENT_HEADERS.toString(),
WELL_KNOWN_CLIENT_HEADERS.contains(WellKnownClient.PGX.name()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public void testSelect1() throws Exception {
ExecuteSqlRequest request = executeSqlRequests.get(0);
assertTrue(request.getTransaction().hasSingleUse());
assertTrue(request.getTransaction().getSingleUse().hasReadOnly());

// Verify that no header was sent to Spanner to indicate which client was connected to
// PGAdapter, as this client is not auto-detected.
assertTrue(WELL_KNOWN_CLIENT_HEADERS.isEmpty());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public void testSelect1() throws Exception {
assertEquals(sql, request.getSql());
assertTrue(request.getTransaction().hasSingleUse());
assertTrue(request.getTransaction().getSingleUse().hasReadOnly());

// Verify that no header was sent to Spanner to indicate which client was connected to
// PGAdapter, as this client is not auto-detected.
assertTrue(WELL_KNOWN_CLIENT_HEADERS.isEmpty());
}

@Test
Expand Down
Loading