diff --git a/clirr-ignores.xml b/clirr-ignores.xml index 9784b61c368..fbe39834e5a 100644 --- a/clirr-ignores.xml +++ b/clirr-ignores.xml @@ -16,6 +16,18 @@ Modified by ScyllaDB --> + + 7012 + com/datastax/driver/core/PreparedStatement + java.lang.String getOperationType() + New method to get the type of operation performed by this PreparedStatement + + + 7012 + com/datastax/driver/core/Session + com.datastax.driver.core.tracing.TracingInfoFactory getTracingInfoFactory() + New method to get the TracingInfo data factory associated with this Session + 7004 com/datastax/driver/core/Metadata diff --git a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java index 9317bd0a58b..cc5117be68c 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java @@ -74,6 +74,8 @@ public class BoundStatement extends Statement private ByteBuffer routingKey; + private final String operationType; + /** * Creates a new {@code BoundStatement} from the provided prepared statement. * @@ -92,6 +94,7 @@ public BoundStatement(PreparedStatement statement) { this.setSerialConsistencyLevel(statement.getSerialConsistencyLevel()); if (statement.isTracing()) this.enableTracing(); if (statement.getRetryPolicy() != null) this.setRetryPolicy(statement.getRetryPolicy()); + this.operationType = statement.getOperationType(); if (statement.getOutgoingPayload() != null) this.setOutgoingPayload(statement.getOutgoingPayload()); else @@ -104,6 +107,10 @@ public BoundStatement(PreparedStatement statement) { } } + public String getOperationType() { + return operationType; + } + @Override public boolean isLWT() { return statement.isLWT(); diff --git a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java index cdac5cff4d0..2a7ceeafca1 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java @@ -39,6 +39,8 @@ import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Functions; @@ -146,6 +148,8 @@ public class Cluster implements Closeable { final Manager manager; + private TracingInfoFactory tracingInfoFactory = new NoopTracingInfoFactory(); + /** * Constructs a new Cluster instance. * @@ -197,6 +201,25 @@ private Cluster( this.manager = new Manager(name, contactPoints, configuration, listeners); } + /** + * The tracingInfo factory class used by this Cluster. + * + * @return the factory used currently by this Cluster. + */ + public TracingInfoFactory getTracingInfoFactory() { + return tracingInfoFactory; + } + + /** + * Sets desired factory for tracing information for this Cluster. By default it is {@link + * com.datastax.driver.core.tracing.NoopTracingInfoFactory} + * + * @param tracingInfoFactory the factory to be set for this Cluster. + */ + public void setTracingInfoFactory(TracingInfoFactory tracingInfoFactory) { + this.tracingInfoFactory = tracingInfoFactory; + } + /** * Initialize this Cluster instance. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java index 346b63c32ab..eade71f952c 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java @@ -180,11 +180,11 @@ protected Connection(String name, EndPoint endPoint, Factory factory, Owner owne this(name, endPoint, factory, null); } - ListenableFuture initAsync() { - return initAsync(-1, 0); + ListenableFuture initAsync(boolean tracingRequested) { + return initAsync(-1, 0, tracingRequested); } - ListenableFuture initAsync(final int shardId, int serverPort) { + ListenableFuture initAsync(final int shardId, int serverPort, boolean tracingRequested) { if (factory.isShutdown) return Futures.immediateFailedFuture( new ConnectionException(endPoint, "Connection factory is shut down")); @@ -350,7 +350,9 @@ public void operationComplete(ChannelFuture future) throws Exception { ListenableFuture initializeTransportFuture = GuavaCompatibility.INSTANCE.transformAsync( - queryOptionsFuture, onOptionsReady(protocolVersion, initExecutor), initExecutor); + queryOptionsFuture, + onOptionsReady(protocolVersion, initExecutor, tracingRequested), + initExecutor); // Fallback on initializeTransportFuture so we can properly propagate specific exceptions. ListenableFuture initFuture = @@ -497,12 +499,17 @@ public ListenableFuture apply(Message.Response response) throws Exception } private AsyncFunction onOptionsReady( - final ProtocolVersion protocolVersion, final Executor initExecutor) { + final ProtocolVersion protocolVersion, + final Executor initExecutor, + final boolean tracingRequested) { return new AsyncFunction() { @Override public ListenableFuture apply(Void input) throws Exception { ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions(); Map extraOptions = new HashMap(); + if (tracingRequested) { + extraOptions.put("SCYLLA_OPENTELEMETRY_TRACING", "true"); + } LwtInfo lwtInfo = getHost().getLwtInfo(); if (lwtInfo != null) { lwtInfo.addOption(extraOptions); @@ -1288,7 +1295,7 @@ Connection open(Host host) Connection connection = new Connection(buildConnectionName(host), endPoint, this); // This method opens the connection synchronously, so wait until it's initialized try { - connection.initAsync().get(); + connection.initAsync(false).get(); return connection; } catch (ExecutionException e) { throw launderAsyncInitException(e); @@ -1306,10 +1313,11 @@ Connection open(HostConnectionPool pool, int shardId, int serverPort) throws ConnectionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException { pool.host.convictionPolicy.signalConnectionsOpening(1); + boolean tracingRequested = pool.isTracingRequested(); Connection connection = new Connection(buildConnectionName(pool.host), pool.host.getEndPoint(), this, pool); try { - connection.initAsync(shardId, serverPort).get(); + connection.initAsync(shardId, serverPort, tracingRequested).get(); return connection; } catch (ExecutionException e) { throw launderAsyncInitException(e); diff --git a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java index 287a55cbdb3..0202fc938b0 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java @@ -26,6 +26,7 @@ import com.datastax.driver.core.policies.RetryPolicy; import com.google.common.collect.ImmutableMap; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,6 +42,7 @@ public class DefaultPreparedStatement implements PreparedStatement { final Cluster cluster; final boolean isLWT; final Token.Factory partitioner; + final String operationType; volatile ByteBuffer routingKey; @@ -62,7 +64,17 @@ private DefaultPreparedStatement( this.preparedId = id; this.query = query; this.queryKeyspace = queryKeyspace; - this.incomingPayload = incomingPayload; + if (incomingPayload != null && incomingPayload.containsKey("opentelemetry")) { + Map incomingPayloadCopy = + new HashMap(incomingPayload); + ByteBuffer buf = incomingPayloadCopy.remove("opentelemetry"); + this.operationType = new String(buf.array(), buf.position(), buf.limit() - buf.position()); + if (incomingPayloadCopy.isEmpty()) this.incomingPayload = null; + else this.incomingPayload = ImmutableMap.copyOf(incomingPayloadCopy); + } else { + this.operationType = null; + this.incomingPayload = incomingPayload; + } this.cluster = cluster; this.isLWT = isLWT; this.partitioner = partitioner; @@ -315,4 +327,10 @@ public Boolean isIdempotent() { public boolean isLWT() { return isLWT; } + + /** {@inheritDoc} */ + @Override + public String getOperationType() { + return operationType; + } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java index d967bc937d2..d6fb79c874b 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java +++ b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java @@ -30,6 +30,7 @@ import com.datastax.driver.core.exceptions.BusyPoolException; import com.datastax.driver.core.exceptions.ConnectionException; import com.datastax.driver.core.exceptions.UnsupportedProtocolVersionException; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; @@ -267,6 +268,10 @@ public void run() { this.timeoutsExecutor = manager.getCluster().manager.connectionFactory.eventLoopGroup.next(); } + protected boolean isTracingRequested() { + return !(manager.getTracingInfoFactory() == NoopTracingInfoFactory.INSTANCE); + } + /** * @param reusedConnection an existing connection (from a reconnection attempt) that we want to * reuse as part of this pool. Might be null or already used by another pool. @@ -344,14 +349,15 @@ ListenableFuture initAsyncWithConnection(Connection reusedConnection) { } } - ListenableFuture connectionFuture = connection.initAsync(shardId, serverPort); + ListenableFuture connectionFuture = + connection.initAsync(shardId, serverPort, isTracingRequested()); connectionFutures.add(handleErrors(connectionFuture, initExecutor)); shardConnectionIndex++; } } else { for (Connection connection : newConnections) { - ListenableFuture connectionFuture = connection.initAsync(); + ListenableFuture connectionFuture = connection.initAsync(isTracingRequested()); connectionFutures.add(handleErrors(connectionFuture, initExecutor)); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java index e87cbc0341b..779c35a1672 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java @@ -361,4 +361,7 @@ public interface PreparedStatement { /** Whether a prepared statement is LWT statement */ public boolean isLWT(); + + /** Type of prepared operation (e.g. SELECT) */ + public String getOperationType(); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index a8a05f093f5..fd581914d41 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -40,6 +40,7 @@ import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.RetryPolicy.RetryDecision.Type; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy.SpeculativeExecutionPlan; +import com.datastax.driver.core.tracing.TracingInfo; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -47,6 +48,7 @@ import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Iterator; @@ -73,6 +75,10 @@ class RequestHandler { private static final QueryLogger QUERY_LOGGER = QueryLogger.builder().build(); static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS"; + private static final int STATEMENT_MAX_LENGTH = 1000; + private static final int PARTITION_KEY_MAX_LENGTH = 1000; + private static final int REPLICAS_MAX_LENGTH = 1000; + final String id; private final SessionManager manager; @@ -95,6 +101,8 @@ class RequestHandler { private final AtomicBoolean isDone = new AtomicBoolean(); private final AtomicInteger executionIndex = new AtomicInteger(); + private final TracingInfo tracingInfo; + private Iterator getReplicas( String loggedKeyspace, Statement statement, Iterator fallback) { ProtocolVersion protocolVersion = manager.cluster.manager.protocolVersion(); @@ -120,7 +128,8 @@ private Iterator getReplicas( return replicas.iterator(); } - public RequestHandler(SessionManager manager, Callback callback, Statement statement) { + public RequestHandler( + SessionManager manager, Callback callback, Statement statement, TracingInfo tracingInfo) { this.id = Long.toString(System.identityHashCode(this)); if (logger.isTraceEnabled()) logger.trace("[{}] {}", id, statement); this.manager = manager; @@ -156,6 +165,71 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null; this.startTime = System.nanoTime(); + + ConsistencyLevel consistency = statement.getConsistencyLevel(); + if (consistency == null) consistency = Statement.DEFAULT.getConsistencyLevel(); + + String statementType = null; + String statementText = null; + int batchSize = 1; + + String keyspace = null; + String partitionKey = null; + String table = null; + String operationType = null; + + if (statement instanceof BatchStatement) { + statementType = "batch"; + batchSize = ((BatchStatement) statement).size(); + StringBuilder statementTextBuilder = new StringBuilder(STATEMENT_MAX_LENGTH); + for (Statement subStatement : ((BatchStatement) statement).getStatements()) { + if (subStatement instanceof BoundStatement) + statementTextBuilder.append(((BoundStatement) subStatement).statement.getQueryString()); + else statementTextBuilder.append(subStatement.toString()); + } + statementText = statementTextBuilder.toString(); + } else if (statement instanceof BoundStatement) { + statementType = "prepared"; + statementText = ((BoundStatement) statement).statement.getQueryString(); + keyspace = ((BoundStatement) statement).getKeyspace(); + operationType = ((BoundStatement) statement).getOperationType(); + + ColumnDefinitions boundColumns = + ((BoundStatement) statement).statement.getPreparedId().boundValuesMetadata.variables; + + StringBuilder partitionKeyBuilder = new StringBuilder(PARTITION_KEY_MAX_LENGTH); + int[] rkIndexes = ((BoundStatement) statement).statement.getPreparedId().routingKeyIndexes; + if (rkIndexes != null) { + for (int i : rkIndexes) { + Object value = ((BoundStatement) statement).getObject(i); + String valueString = (value == null) ? "NULL" : value.toString(); + if (partitionKeyBuilder.length() > 0) partitionKeyBuilder.append(", "); + String columnName = boundColumns.getName(i); + partitionKeyBuilder.append(columnName); + partitionKeyBuilder.append('='); + partitionKeyBuilder.append(valueString); + } + } + partitionKey = partitionKeyBuilder.toString(); + + if (boundColumns.size() > 0) table = boundColumns.getTable(0); + } else if (statement instanceof RegularStatement) { + statementType = "regular"; + statementText = ((RegularStatement) statement).toString(); + } + + this.tracingInfo = tracingInfo; + this.tracingInfo.setNameAndStartTime("request"); + this.tracingInfo.setConsistencyLevel(consistency); + this.tracingInfo.setRetryPolicy(retryPolicy()); + this.tracingInfo.setBatchSize(batchSize); + this.tracingInfo.setLoadBalancingPolicy(manager.loadBalancingPolicy()); + if (statementType != null) this.tracingInfo.setStatementType(statementType); + if (statementText != null) this.tracingInfo.setStatement(statementText, STATEMENT_MAX_LENGTH); + if (keyspace != null) this.tracingInfo.setKeyspace(keyspace); + if (partitionKey != null) this.tracingInfo.setPartitionKey(partitionKey); + if (table != null) this.tracingInfo.setTable(table); + if (operationType != null) this.tracingInfo.setOperationType(operationType); } void sendRequest() { @@ -274,6 +348,17 @@ private void setFinalResult( logServerWarnings(response.warnings); } callback.onSet(connection, response, info, statement, System.nanoTime() - startTime); + + if (response.type == Message.Response.Type.RESULT) { + Responses.Result rm = (Responses.Result) response; + if (rm.kind == Responses.Result.Kind.ROWS) { + Responses.Result.Rows r = (Responses.Result.Rows) rm; + tracingInfo.setRowsCount(r.data.size()); + } + } + tracingInfo.setQueryPaged(info.getPagingState() != null); + tracingInfo.setStatus(TracingInfo.StatusCode.OK); + tracingInfo.tracingFinished(); } catch (Exception e) { callback.onException( connection, @@ -281,6 +366,10 @@ private void setFinalResult( "Unexpected exception while setting final result from " + response, e), System.nanoTime() - startTime, /*unused*/ 0); + + tracingInfo.recordException(e); + tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, e.toString()); + tracingInfo.tracingFinished(); } } @@ -305,6 +394,10 @@ private void setFinalException( cancelPendingExecutions(execution); + tracingInfo.recordException(exception); + tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, exception.toString()); + tracingInfo.tracingFinished(); + try { if (timerContext != null) timerContext.stop(); } finally { @@ -315,6 +408,8 @@ private void setFinalException( // Triggered when an execution reaches the end of the query plan. // This is only a failure if there are no other running executions. private void reportNoMoreHosts(SpeculativeExecution execution) { + execution.parentTracingInfo.setRetryCount(execution.retryCount()); + execution.parentTracingInfo.tracingFinished(); runningExecutions.remove(execution); if (runningExecutions.isEmpty()) setFinalException( @@ -383,11 +478,17 @@ class SpeculativeExecution implements Connection.ResponseCallback { private volatile Connection.ResponseHandler connectionHandler; + private final TracingInfo parentTracingInfo; + private TracingInfo currentChildTracingInfo; + SpeculativeExecution(Message.Request request, int position) { this.id = RequestHandler.this.id + "-" + position; this.request = request; this.position = position; this.queryStateRef = new AtomicReference(QueryState.INITIAL); + this.parentTracingInfo = + manager.getTracingInfoFactory().buildTracingInfo(RequestHandler.this.tracingInfo); + this.parentTracingInfo.setNameAndStartTime("speculative_execution." + position); if (logger.isTraceEnabled()) logger.trace("[{}] Starting", id); } @@ -429,6 +530,13 @@ private boolean query(final Host host) { if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", id, host); + currentChildTracingInfo = manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); + currentChildTracingInfo.setNameAndStartTime("query"); + InetSocketAddress hostAddress = host.getEndPoint().resolve(); + currentChildTracingInfo.setPeerName(hostAddress.getHostName()); + currentChildTracingInfo.setPeerIP(hostAddress.getAddress()); + currentChildTracingInfo.setPeerPort(hostAddress.getPort()); + if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true)) scheduleExecution(speculativeExecutionPlan.nextExecution(host)); @@ -647,6 +755,8 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setRetryCount(retryCount()); + parentTracingInfo.tracingFinished(); return; } else if (!previous.inProgress && queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_COMPLETE)) { @@ -659,6 +769,8 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setRetryCount(retryCount()); + parentTracingInfo.tracingFinished(); return; } } @@ -674,6 +786,9 @@ public Message.Request request() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -832,7 +947,10 @@ public void onSet( toPrepare.getQueryKeyspace(), connection.endPoint); - write(connection, prepareAndRetry(toPrepare.getQueryString())); + TracingInfo prepareTracingInfo = + manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); + prepareTracingInfo.setNameAndStartTime("prepare"); + write(connection, prepareAndRetry(toPrepare.getQueryString(), prepareTracingInfo)); // we're done for now, the prepareAndRetry callback will handle the rest return; case READ_FAILURE: @@ -878,7 +996,8 @@ public void onSet( } } - private Connection.ResponseCallback prepareAndRetry(final String toPrepare) { + private Connection.ResponseCallback prepareAndRetry( + final String toPrepare, final TracingInfo prepareTracingInfo) { // do not bother inspecting retry policy at this step, no other decision // makes sense than retry on the same host if the query was prepared, // or on another host, if an error/timeout occurred. @@ -902,6 +1021,8 @@ public int retryCount() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -944,11 +1065,14 @@ public void onSet( @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); SpeculativeExecution.this.onException(connection, exception, latency, retryCount); } @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -973,6 +1097,9 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) { @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -1010,6 +1137,9 @@ public void onException( @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -1051,10 +1181,31 @@ public int retryCount() { } private void setFinalException(Connection connection, Exception exception) { + parentTracingInfo.setRetryCount(retryCount()); + parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalException(this, connection, exception); } private void setFinalResult(Connection connection, Message.Response response) { + parentTracingInfo.setRetryCount(retryCount()); + + if (response.getCustomPayload() != null + && response.getCustomPayload().containsKey("opentelemetry")) { + ByteBuffer buf = response.getCustomPayload().get("opentelemetry"); + int rep = buf.getInt(); + StringBuilder replicasBuilder = new StringBuilder(REPLICAS_MAX_LENGTH); + for (int i = 0; i < rep; i++) { + int addrSize = (buf.get() & 0xFF); // convert to unsigned int + if (i > 0) replicasBuilder.append(", "); + for (int j = 0; j < addrSize; j++) { + if (j > 0) replicasBuilder.append('.'); + replicasBuilder.append(buf.get() & 0xFF); // convert to unsigned int + } + } + parentTracingInfo.setReplicas(replicasBuilder.toString()); + } + + parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalResult(this, connection, response); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/Session.java b/driver-core/src/main/java/com/datastax/driver/core/Session.java index a8e4a59b2a8..0e23caa8e78 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Session.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Session.java @@ -13,6 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/* + * Copyright (C) 2021 ScyllaDB + * + * Modified by ScyllaDB + */ package com.datastax.driver.core; import com.datastax.driver.core.exceptions.AuthenticationException; @@ -20,6 +26,7 @@ import com.datastax.driver.core.exceptions.QueryExecutionException; import com.datastax.driver.core.exceptions.QueryValidationException; import com.datastax.driver.core.exceptions.UnsupportedFeatureException; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.google.common.util.concurrent.ListenableFuture; import java.io.Closeable; import java.util.Collection; @@ -41,6 +48,13 @@ */ public interface Session extends Closeable { + /** + * The tracingInfo factory class used by this Session. + * + * @return the factory used currently by this Session. + */ + TracingInfoFactory getTracingInfoFactory(); + /** * The keyspace to which this Session is currently logged in, if any. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java index ada9cc1699c..4a4dcb2c159 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java @@ -29,6 +29,8 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.TracingInfo; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; @@ -76,6 +78,11 @@ class SessionManager extends AbstractSession { this.poolsState = new HostConnectionPool.PoolState(); } + @Override + public TracingInfoFactory getTracingInfoFactory() { + return cluster.getTracingInfoFactory(); + } + @Override public Session init() { try { @@ -155,6 +162,7 @@ public ResultSetFuture executeAsync(final Statement statement) { // Because of the way the future is built, we need another 'proxy' future that we can return // now. final ChainedResultSetFuture chainedFuture = new ChainedResultSetFuture(); + final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo(); this.initAsync() .addListener( new Runnable() { @@ -165,7 +173,7 @@ public void run() { SessionManager.this, cluster.manager.protocolVersion(), makeRequestMessage(statement, null)); - execute(actualFuture, statement); + execute(actualFuture, statement, tracingInfo); chainedFuture.setSource(actualFuture); } }, @@ -706,25 +714,34 @@ else if (fetchSize != Integer.MAX_VALUE) *

This method will find a suitable node to connect to using the {@link LoadBalancingPolicy} * and handle host failover. */ - void execute(final RequestHandler.Callback callback, final Statement statement) { + void execute( + final RequestHandler.Callback callback, + final Statement statement, + final TracingInfo tracingInfo) { if (this.isClosed()) { callback.onException( null, new IllegalStateException("Could not send request, session is closed"), 0, 0); return; } - if (isInit) new RequestHandler(this, callback, statement).sendRequest(); + if (isInit) new RequestHandler(this, callback, statement, tracingInfo).sendRequest(); else this.initAsync() .addListener( new Runnable() { @Override public void run() { - new RequestHandler(SessionManager.this, callback, statement).sendRequest(); + new RequestHandler(SessionManager.this, callback, statement, tracingInfo) + .sendRequest(); } }, executor()); } + void execute(final RequestHandler.Callback callback, final Statement statement) { + final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo(); + execute(callback, statement, tracingInfo); + } + private ListenableFuture prepare( final PreparedStatement statement, EndPoint toExclude) { final String query = statement.getQueryString(); diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java new file mode 100644 index 00000000000..c499c0a496e --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import java.net.InetAddress; + +public class NoopTracingInfoFactory implements TracingInfoFactory { + + private static class NoopTracingInfo implements TracingInfo { + @Override + public void setNameAndStartTime(String name) {} + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) {} + + @Override + public void setStatementType(String statementType) {} + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) {} + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) {}; + + @Override + public void setBatchSize(int batchSize) {} + + @Override + public void setRetryCount(int retryCount) {} + + @Override + public void setShardID(int shardID) {} + + @Override + public void setPeerName(String peerName) {} + + @Override + public void setPeerIP(InetAddress peerIP) {} + + @Override + public void setPeerPort(int peerPort) {} + + @Override + public void setQueryPaged(Boolean queryPaged) {} + + @Override + public void setRowsCount(int rowsCount) {} + + @Override + public void setStatement(String statement, int limit) {} + + @Override + public void setKeyspace(String keyspace) {} + + @Override + public void setPartitionKey(String partitionKey) {} + + @Override + public void setTable(String table) {} + + @Override + public void setOperationType(String operationType) {} + + @Override + public void setReplicas(String replicas) {} + + @Override + public void recordException(Exception exception) {} + + @Override + public void setStatus(StatusCode code, String description) {} + + @Override + public void setStatus(StatusCode code) {} + + @Override + public void tracingFinished() {} + } + + public static final NoopTracingInfo INSTANCE = new NoopTracingInfo(); + + @Override + public TracingInfo buildTracingInfo() { + return INSTANCE; + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + return new NoopTracingInfo(); + } +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/PrecisionLevel.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/PrecisionLevel.java new file mode 100644 index 00000000000..e2e13f541d8 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/PrecisionLevel.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** The precision level of tracing data that is to be collected. May be extended in the future. */ +public enum PrecisionLevel { + // Enum elements must be listed in ascending order of precision level (i.e. each next element + // listed adds some new information). + NORMAL, + FULL; +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java new file mode 100644 index 00000000000..cb398b993ee --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java @@ -0,0 +1,196 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import java.net.InetAddress; + +/** + * An abstraction layer over instrumentation library API, corresponding to a logical span in the + * trace. + */ +public interface TracingInfo { + + /** Final status of the traced execution. */ + enum StatusCode { + OK, + ERROR, + } + + /** + * Starts a span corresponding to this {@link TracingInfo} object. Must be called exactly once, + * before any other method, at the beginning of the traced execution. + * + * @param name the name given to the span being created. + */ + void setNameAndStartTime(String name); + + /** + * Adds provided consistency level to the trace. + * + * @param consistency the consistency level to be set. + */ + void setConsistencyLevel(ConsistencyLevel consistency); + + /** + * Adds provided statement type to the trace. + * + * @param statementType the statement type to be set. + */ + void setStatementType(String statementType); + + /** + * Adds provided retry policy to the trace. + * + * @param retryPolicy the retry policy to be set. + */ + void setRetryPolicy(RetryPolicy retryPolicy); + + /** + * Adds provided load balancing policy to the trace. + * + * @param loadBalancingPolicy the load balancing policy to be set. + */ + void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy); + + /** + * Adds provided batch size to the trace. + * + * @param batchSize the batch size to be set. + */ + void setBatchSize(int batchSize); + + /** + * Adds provided retry count to the trace. + * + * @param retryCount the retry count to be set. + */ + void setRetryCount(int retryCount); + + /** + * Adds provided shard ID to the trace. + * + * @param shardID the shard ID to be set. + */ + void setShardID(int shardID); + + /** + * Adds provided peer name to the trace. + * + * @param peerName the peer name to be set. + */ + void setPeerName(String peerName); + + /** + * Adds provided peer IP to the trace. + * + * @param peerIP the peer IP to be set. + */ + void setPeerIP(InetAddress peerIP); + + /** + * Adds provided peer port to the trace. + * + * @param peerPort the peer port to be set. + */ + void setPeerPort(int peerPort); + + /** + * Adds information whether the query was paged to the trace. + * + * @param queryPaged information whether the query was paged. + */ + void setQueryPaged(Boolean queryPaged); + + /** + * Adds provided number of returned rows to the trace. + * + * @param rowsCount the number of returned rows to be set. + */ + void setRowsCount(int rowsCount); + + /** + * Adds provided statement text to the trace. If the statement length is greater than given limit, + * the statement is trimmed to the first {@param limit} signs. + * + * @param statement the statement text to be set. + * @param limit the statement length limit. + */ + void setStatement(String statement, int limit); + + /** + * Adds provided keyspace to the trace. + * + * @param keyspace the keyspace to be set. + */ + void setKeyspace(String keyspace); + + /** + * Adds provided partition key string to the trace. + * + * @param partitionKey the partitionKey to be set. + */ + void setPartitionKey(String partitionKey); + + /** + * Adds provided table name to the trace. + * + * @param table the table name to be set. + */ + void setTable(String table); + + /** + * Adds provided operation type (e.g. SELECT) to the trace. + * + * @param operationType the operation type to be set. + */ + void setOperationType(String operationType); + + /** + * Adds provided list of contacted replicas to the trace. + * + * @param replicas the list of contacted replicas to be set. + */ + void setReplicas(String replicas); + + /** + * Records in the trace that the provided exception occured. + * + * @param exception the exception to be recorded. + */ + void recordException(Exception exception); + + /** + * Sets the final status of the traced execution. + * + * @param code the status code to be set. + */ + void setStatus(StatusCode code); + + /** + * Sets the final status of the traced execution, with additional description. + * + * @param code the status code to be set. + * @param description the additional description of the status. + */ + void setStatus(StatusCode code, String description); + + /** Must be always called exactly once at the logical end of traced execution. */ + void tracingFinished(); +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java new file mode 100644 index 00000000000..25c20f3d9ec --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** Factory of trace info objects. */ +public interface TracingInfoFactory { + + /** Creates new trace info object, inheriting global context. */ + TracingInfo buildTracingInfo(); + + /** + * Creates new trace info object, inheriting context from provided another trace info object. + * + * @param parent the trace info object to be set as the parent of newly created trace info object. + */ + TracingInfo buildTracingInfo(TracingInfo parent); +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java b/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java index 098164caa88..b4b73356186 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java @@ -27,6 +27,7 @@ import static org.testng.Assert.assertTrue; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.CassandraVersion; import com.google.common.base.Function; import com.google.common.base.Throwables; @@ -286,6 +287,11 @@ public ResultSet execute(Statement statement) { return executeAsync(statement).getUninterruptibly(); } + @Override + public TracingInfoFactory getTracingInfoFactory() { + return session.getTracingInfoFactory(); + } + @Override public String getLoggedKeyspace() { return session.getLoggedKeyspace(); diff --git a/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java b/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java index 0bee5ec9a8d..03d01e18cb4 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java @@ -29,7 +29,8 @@ import org.testng.annotations.Test; public class DelegatingClusterTest { - private static final Set NON_DELEGATED_METHODS = ImmutableSet.of("getClusterName"); + private static final Set NON_DELEGATED_METHODS = + ImmutableSet.of("getClusterName", "setTracingInfoFactory", "getTracingInfoFactory"); /** * Checks that all methods of {@link DelegatingCluster} invoke their counterpart in {@link diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java new file mode 100644 index 00000000000..06c6bd85557 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java @@ -0,0 +1,204 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.PagingOptimizingLoadBalancingPolicy; +import java.util.ArrayList; +import java.util.Collection; +import org.testng.annotations.Test; + +public class BasicTracingTest extends CCMTestsSupport { + private static TestTracingInfoFactory testTracingInfoFactory; + private Session session; + + @Override + public void onTestContextInitialized() { + initializeTestTracing(); + session.execute("USE " + keyspace); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + + Collection spans = testTracingInfoFactory.getSpans(); + spans.clear(); + } + + @Test(groups = "short") + public void simpleTracingTest() { + session.execute("INSERT INTO t(k, v) VALUES (1, 7)"); + + Collection spans = testTracingInfoFactory.getSpans(); + assertNotEquals(spans.size(), 0); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + spans.clear(); + } + + @Test(groups = "short") + public void tagsTest() { + PreparedStatement prepared = session.prepare("INSERT INTO t(k, v) VALUES (?, ?)"); + + Collection prepareSpans = testTracingInfoFactory.getSpans(); + assertNotEquals(prepareSpans.size(), 0); + assertTrue(getRoot(prepareSpans) instanceof TestTracingInfo); + prepareSpans.clear(); + + BoundStatement bound = prepared.bind(1, 7); + session.execute(bound); + + Collection spans = testTracingInfoFactory.getSpans(); + assertNotEquals(spans.size(), 0); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + // these tags should be set for request span + assertEquals(root.getStatementType(), "prepared"); + assertEquals(root.getBatchSize(), new Integer(1)); + assertEquals(root.getConsistencyLevel(), ConsistencyLevel.ONE); + assertNull(root.getRowsCount()); // no rows are returned in insert + assertTrue(root.getLoadBalancingPolicy() instanceof PagingOptimizingLoadBalancingPolicy); + assertTrue(root.getRetryPolicy() instanceof DefaultRetryPolicy); + assertFalse(root.getQueryPaged()); + assertNull(root.getStatement()); // because of precision level NORMAL + // these are tags specific to bound statement + assertEquals(root.getKeyspace(), keyspace); + assertEquals(root.getPartitionKey(), "k=1"); + assertEquals(root.getTable(), "t"); + + // these tags should not be set for request span + assertNull(root.getPeerName()); + assertNull(root.getPeerIP()); + assertNull(root.getPeerPort()); + assertNull(root.getRetryCount()); + + ArrayList speculativeExecutions = getChildren(spans, root); + assertTrue(speculativeExecutions.size() > 0); + + for (TracingInfo speculativeExecutionSpan : speculativeExecutions) { + assertTrue(speculativeExecutionSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) speculativeExecutionSpan; + + // these tags should not be set for speculative execution span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getQueryPaged()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getPeerName()); + assertNull(tracingInfo.getPeerIP()); + assertNull(tracingInfo.getPeerPort()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // this tag should be set for speculative execution span + assertTrue(tracingInfo.getRetryCount() >= 0); + } + + ArrayList queries = new ArrayList(); + for (TracingInfo tracingInfo : speculativeExecutions) { + queries.addAll(getChildren(spans, tracingInfo)); + } + assertTrue(queries.size() > 0); + + for (TracingInfo querySpan : queries) { + assertTrue(querySpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) querySpan; + + // these tags should not be set for query span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getQueryPaged()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getRetryCount()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // these tags should be set for query span + assertNotNull(tracingInfo.getPeerName()); + assertNotNull(tracingInfo.getPeerIP()); + assertNotNull(tracingInfo.getPeerPort()); + assertTrue(tracingInfo.getPeerPort() >= 0 && tracingInfo.getPeerPort() <= 65535); + } + + spans.clear(); + } + + private void initializeTestTracing() { + testTracingInfoFactory = new TestTracingInfoFactory(PrecisionLevel.NORMAL); + cluster().setTracingInfoFactory(testTracingInfoFactory); + session = cluster().connect(); + } + + private TracingInfo getRoot(Collection spans) { + TracingInfo root = null; + for (TracingInfo tracingInfo : spans) { + if (tracingInfo instanceof TestTracingInfo + && ((TestTracingInfo) tracingInfo).getParent() == null) { + assertNull(root); // There should be only one root. + root = tracingInfo; + } + } + + return root; + } + + private ArrayList getChildren(Collection spans, TracingInfo parent) { + ArrayList children = new ArrayList(); + for (TracingInfo tracingInfo : spans) { + if (tracingInfo instanceof TestTracingInfo + && ((TestTracingInfo) tracingInfo).getParent() == parent) { + children.add(tracingInfo); + } + } + return children; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java new file mode 100644 index 00000000000..903c8415f7e --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java @@ -0,0 +1,291 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; + +public class TestTracingInfo implements TracingInfo { + + private final PrecisionLevel precision; + private TracingInfo parent = null; + + private Boolean spanStarted = false; + private Boolean spanFinished = false; + private String spanName; + private ConsistencyLevel consistencyLevel; + private String statement; + private String statementType; + private Collection exceptions; + private StatusCode statusCode; + private String description; + private InetAddress peerIP; + private RetryPolicy retryPolicy; + private LoadBalancingPolicy loadBalancingPolicy; + private Integer batchSize; + private Integer retryCount; + private Integer shardID; + private String peerName; + private Integer peerPort; + private Boolean queryPaged; + private Integer rowsCount; + private String keyspace; + private String partitionKey; + private String table; + private String operationType; + private String replicas; + + public TestTracingInfo(PrecisionLevel precision) { + this.precision = precision; + } + + public TestTracingInfo(PrecisionLevel precision, TracingInfo parent) { + this(precision); + this.parent = parent; + } + + public PrecisionLevel getPrecision() { + return precision; + } + + @Override + public void setNameAndStartTime(String name) { + this.spanStarted = true; + this.spanName = name; + } + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) { + this.consistencyLevel = consistency; + } + + @Override + public void setStatementType(String statementType) { + this.statementType = statementType; + } + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + } + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) { + this.loadBalancingPolicy = loadBalancingPolicy; + } + + @Override + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + @Override + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + @Override + public void setShardID(int shardID) { + this.shardID = shardID; + } + + @Override + public void setPeerName(String peerName) { + this.peerName = peerName; + } + + @Override + public void setPeerIP(InetAddress peerIP) { + this.peerIP = peerIP; + } + + @Override + public void setPeerPort(int peerPort) { + this.peerPort = peerPort; + } + + @Override + public void setQueryPaged(Boolean queryPaged) { + this.queryPaged = queryPaged; + } + + @Override + public void setRowsCount(int rowsCount) { + this.rowsCount = rowsCount; + } + + @Override + public void setStatement(String statement, int limit) { + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (statement.length() > limit) statement = statement.substring(0, limit); + this.statement = statement; + } + } + + @Override + public void setKeyspace(String keyspace) { + this.keyspace = keyspace; + } + + @Override + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + + @Override + public void setTable(String table) { + this.table = table; + } + + @Override + public void setOperationType(String operationType) { + this.operationType = operationType; + } + + @Override + public void setReplicas(String replicas) { + this.replicas = replicas; + } + + @Override + public void recordException(Exception exception) { + if (this.exceptions == null) { + this.exceptions = new ArrayList(); + } + this.exceptions.add(exception); + } + + @Override + public void setStatus(StatusCode code) { + this.statusCode = code; + } + + @Override + public void setStatus(StatusCode code, String description) { + this.statusCode = code; + this.description = description; + } + + @Override + public void tracingFinished() { + this.spanFinished = true; + } + + private boolean currentPrecisionLevelIsAtLeast(PrecisionLevel requiredLevel) { + return requiredLevel.compareTo(precision) <= 0; + } + + public boolean isSpanStarted() { + return spanStarted; + } + + public boolean isSpanFinished() { + return spanFinished; + } + + public String getSpanName() { + return spanName; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public String getStatementType() { + return statementType; + } + + public RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + public LoadBalancingPolicy getLoadBalancingPolicy() { + return loadBalancingPolicy; + } + + public Integer getBatchSize() { + return batchSize; + } + + public Integer getRetryCount() { + return retryCount; + } + + public Integer getShardID() { + return shardID; + } + + public String getPeerName() { + return peerName; + } + + public InetAddress getPeerIP() { + return peerIP; + } + + public Integer getPeerPort() { + return peerPort; + } + + public Boolean getQueryPaged() { + return queryPaged; + } + + public Integer getRowsCount() { + return rowsCount; + } + + public String getStatement() { + return statement; + } + + public String getKeyspace() { + return keyspace; + } + + public String getPartitionKey() { + return partitionKey; + } + + public String getTable() { + return table; + } + + public String getOperationType() { + return operationType; + } + + public String getReplicas() { + return replicas; + } + + public StatusCode getStatusCode() { + return statusCode; + } + + public String getDescription() { + return description; + } + + public TracingInfo getParent() { + return parent; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java new file mode 100644 index 00000000000..236d0da4c7c --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +public class TestTracingInfoFactory implements TracingInfoFactory { + private final PrecisionLevel precision; + private Collection spans = + Collections.synchronizedList(new ArrayList()); + + public TestTracingInfoFactory() { + this.precision = PrecisionLevel.NORMAL; + } + + public TestTracingInfoFactory(final PrecisionLevel precision) { + this.precision = precision; + } + + @Override + public TracingInfo buildTracingInfo() { + TracingInfo tracingInfo = new TestTracingInfo(precision); + spans.add(tracingInfo); + return tracingInfo; + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + TracingInfo tracingInfo; + + if (parent instanceof TestTracingInfo) { + final TestTracingInfo castedParent = (TestTracingInfo) parent; + tracingInfo = new TestTracingInfo(castedParent.getPrecision(), parent); + spans.add(tracingInfo); + return tracingInfo; + } + + tracingInfo = new NoopTracingInfoFactory().buildTracingInfo(); + spans.add(tracingInfo); + return tracingInfo; + } + + public Collection getSpans() { + return spans; + } +} diff --git a/driver-examples/README.md b/driver-examples/README.md index 553cdbc5117..e696e948716 100644 --- a/driver-examples/README.md +++ b/driver-examples/README.md @@ -5,6 +5,17 @@ Apache Cassandra. ## Usage -Unless otherwise stated, all examples assume that you have a single-node Cassandra 3.0 cluster +Unless otherwise stated, all examples assume that you have a single-node Cassandra 3.0 cluster listening on localhost:9042. +Before running examples, make sure you installed repo artifacts (in root driver directory): +``` +mvn install -q -Dmaven.test.skip=true +``` + +To conveniently run the example showing OpenTelemetry integration (ZipkinConfiguration), +you can use the provided ```docker-compose.yaml``` file (which runs both Scylla cluster and Zipkin instance): +``` +docker-compose up +./runOpenTelemetryExample.sh +``` diff --git a/driver-examples/docker-compose.yaml b/driver-examples/docker-compose.yaml new file mode 100644 index 00000000000..1daf0c46e65 --- /dev/null +++ b/driver-examples/docker-compose.yaml @@ -0,0 +1,12 @@ +version: '3.4' + +services: + zipkin: + image: openzipkin/zipkin + ports: + - "9411:9411" + scylla_node: + image: scylladb/scylla + ports: + - "9042:9042" + command: "--smp 1 --skip-wait-for-gossip-to-settle 0" diff --git a/driver-examples/pom.xml b/driver-examples/pom.xml index 5f28572501b..b47107fbcc7 100644 --- a/driver-examples/pom.xml +++ b/driver-examples/pom.xml @@ -48,6 +48,11 @@ true + + com.scylladb + scylla-driver-opentelemetry + + @@ -134,6 +139,26 @@ logback-classic + + + + io.opentelemetry + opentelemetry-sdk + 1.9.1 + + + + io.opentelemetry + opentelemetry-semconv + 1.9.0-alpha + + + + io.opentelemetry + opentelemetry-exporter-zipkin + 1.9.1 + + @@ -184,6 +209,15 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + diff --git a/driver-examples/runOpenTelemetryExample.sh b/driver-examples/runOpenTelemetryExample.sh new file mode 100755 index 00000000000..7603ca64d18 --- /dev/null +++ b/driver-examples/runOpenTelemetryExample.sh @@ -0,0 +1 @@ +mvn -q exec:java -Dexec.mainClass="com.datastax.driver.examples.opentelemetry.ZipkinUsage" diff --git a/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java new file mode 100644 index 00000000000..f68bca6bafd --- /dev/null +++ b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.examples.opentelemetry; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; + +/** Example showing how to configure OpenTelemetry for tracing with Scylla Java Driver. */ +class OpenTelemetryConfiguration { + private static final String SERVICE_NAME = "Scylla Java driver"; + + public static OpenTelemetry initialize(SpanExporter spanExporter) { + Resource serviceNameResource = + Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, SERVICE_NAME)); + + // Set to process the spans by the spanExporter. + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).buildAndRegisterGlobal(); + + // Add a shutdown hook to shut down the SDK. + Runtime.getRuntime() + .addShutdownHook( + new Thread( + new Runnable() { + @Override + public void run() { + tracerProvider.close(); + } + })); + + // Return the configured instance so it can be used for instrumentation. + return openTelemetry; + } + + public static OpenTelemetry initializeForZipkin(String ip, int port) { + String endpointPath = "/api/v2/spans"; + String httpUrl = String.format("http://%s:%s", ip, port); + + SpanExporter exporter = + ZipkinSpanExporter.builder().setEndpoint(httpUrl + endpointPath).build(); + + return initialize(exporter); + } +} diff --git a/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java new file mode 100644 index 00000000000..9dcc37f5ba2 --- /dev/null +++ b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java @@ -0,0 +1,231 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2021 ScyllaDB + * + * Modified by ScyllaDB + */ +package com.datastax.driver.examples.opentelemetry; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.opentelemetry.OpenTelemetryTracingInfoFactory; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; + +/** + * Creates a keyspace and tables, and loads some data into them. Sends OpenTelemetry tracing data to + * Zipkin tracing backend + * + *

Preconditions: - a Scylla cluster is running and accessible through the contacts points + * identified by CONTACT_POINTS and PORT and Zipkin backend is running and accessible through the + * contacts points identified by ZIPKIN_CONTACT_POINT and ZIPKIN_PORT. + * + *

Side effects: - creates a new keyspace "simplex" in the cluster. If a keyspace with this name + * already exists, it will be reused; - creates two tables "simplex.songs" and "simplex.playlists". + * If they exist already, they will be reused; - inserts a row in each table. + */ +public class ZipkinUsage { + private static final String CONTACT_POINT = "127.0.0.1"; + private static final int PORT = 9042; + + private static final String ZIPKIN_CONTACT_POINT = "127.0.0.1"; + private static final int ZIPKIN_PORT = 9411; + + private Cluster cluster; + private Session session; + + private Tracer tracer; + + public static void main(String[] args) { + // Workaround for setting ContextStorage to ThreadLocalContextStorage. + System.setProperty("io.opentelemetry.context.contextStorageProvider", "default"); + + ZipkinUsage client = new ZipkinUsage(); + + try { + client.connect(); + client.createSchema(); + client.loadData(); + client.querySchema(); + System.out.println( + "All requests have been completed. Now you can visit Zipkin at " + + "http://" + + ZIPKIN_CONTACT_POINT + + ":" + + ZIPKIN_PORT + + " and examine the produced trace."); + } finally { + client.close(); + } + } + + /** Initiates a connection to the cluster. */ + public void connect() { + cluster = Cluster.builder().addContactPoints(CONTACT_POINT).withPort(PORT).build(); + + System.out.printf("Connected to cluster: %s%n", cluster.getMetadata().getClusterName()); + + OpenTelemetry openTelemetry = + OpenTelemetryConfiguration.initializeForZipkin(ZIPKIN_CONTACT_POINT, ZIPKIN_PORT); + tracer = openTelemetry.getTracerProvider().get("this"); + TracingInfoFactory tracingInfoFactory = new OpenTelemetryTracingInfoFactory(tracer); + cluster.setTracingInfoFactory(tracingInfoFactory); + + session = cluster.connect(); + } + + /** Creates the schema (keyspace) and tables for this example. */ + public void createSchema() { + session.execute("DROP KEYSPACE IF EXISTS simplex;"); + Span parentSpan = tracer.spanBuilder("create schema").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + { + Span span = tracer.spanBuilder("create simplex").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute( + "CREATE KEYSPACE IF NOT EXISTS simplex WITH replication " + + "= {'class':'SimpleStrategy', 'replication_factor':1};"); + + } finally { + span.end(); + } + } + { + Span span = tracer.spanBuilder("create simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.executeAsync( + "CREATE TABLE IF NOT EXISTS simplex.songs (" + + "id uuid," + + "title text," + + "album text," + + "artist text," + + "tags set," + + "data blob," + + "PRIMARY KEY ((title, artist), album)" + + ");"); + } finally { + span.end(); + } + } + { + Span span = tracer.spanBuilder("create simplex.playlists").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute( + "CREATE TABLE IF NOT EXISTS simplex.playlists (" + + "id uuid," + + "title text," + + "album text, " + + "artist text," + + "song_id uuid," + + "PRIMARY KEY (id, title, album, artist)" + + ");"); + + } finally { + span.end(); + } + } + } finally { + parentSpan.end(); + } + } + + /** Inserts data into the tables. */ + public void loadData() { + Span parentSpan = tracer.spanBuilder("load data").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + + Span prepareSpan = tracer.spanBuilder("prepare").startSpan(); + PreparedStatement ps; + try (Scope prepareScope = prepareSpan.makeCurrent()) { + ps = + session.prepare( + "INSERT INTO simplex.songs (id, title, album, artist, tags) " + + "VALUES (" + + "756716f7-2e54-4715-9f00-91dcbea6cf50," + + "?," + + "?," + + "?," + + "{'jazz', '2013'})" + + ";"); + } finally { + prepareSpan.end(); + } + + Span bindSpan = tracer.spanBuilder("bind").startSpan(); + BoundStatement bound; + try (Scope bindScope = bindSpan.makeCurrent()) { + bound = ps.bind("La Petite Tonkinoise", "Bye Bye Blackbird", "Joséphine Baker"); + } finally { + bindSpan.end(); + } + + Span span = tracer.spanBuilder("insert simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute(bound); + } finally { + span.end(); + } + + } finally { + parentSpan.end(); + } + } + + public void querySchema() { + Span parentSpan = tracer.spanBuilder("query schema").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + + Span prepareSpan = tracer.spanBuilder("prepare").startSpan(); + PreparedStatement ps; + try (Scope prepareScope = prepareSpan.makeCurrent()) { + ps = session.prepare("SELECT * FROM simplex.songs WHERE artist = ? AND title = ?;"); + } finally { + prepareSpan.end(); + } + + Span bindSpan = tracer.spanBuilder("bind").startSpan(); + BoundStatement bound; + try (Scope bindScope = bindSpan.makeCurrent()) { + bound = ps.bind("Joséphine Baker", "La Petite Tonkinoise"); + } finally { + bindSpan.end(); + } + + Span span = tracer.spanBuilder("query simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute(bound); + } finally { + span.end(); + } + + } finally { + parentSpan.end(); + } + } + + /** Closes the session and the cluster. */ + public void close() { + session.close(); + cluster.close(); + } +} diff --git a/driver-opentelemetry/pom.xml b/driver-opentelemetry/pom.xml new file mode 100644 index 00000000000..6c59be7b9ad --- /dev/null +++ b/driver-opentelemetry/pom.xml @@ -0,0 +1,149 @@ + + + + + 4.0.0 + + + com.scylladb + scylla-driver-parent + 3.11.2.1-SNAPSHOT + + + scylla-driver-opentelemetry + Java Driver for Scylla and Apache Cassandra - OpenTelemetry integration + An extension of Java Driver for Scylla and Apache Cassandra by adding + functionality of creating traces and spans in OpenTelemetry format. + + + + + + + + io.opentelemetry + opentelemetry-bom + 1.9.1 + pom + import + + + + + + + + + + + + com.scylladb + scylla-driver-core + + + + + + io.opentelemetry + opentelemetry-api + 1.9.1 + + + + + + com.scylladb + scylla-driver-core + test-jar + test + + + + io.opentelemetry + opentelemetry-sdk + test + + + + org.apache.commons + commons-exec + test + + + + org.testng + testng + test + + + + org.slf4j + slf4j-log4j12 + test + + + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + org.codehaus.mojo + animal-sniffer-maven-plugin + 1.15 + + + check-jdk6 + process-classes + + check + + + true + + + + check-jdk8 + + check + + + true + + + + + + + + + + diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java new file mode 100644 index 00000000000..3db5724c623 --- /dev/null +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java @@ -0,0 +1,230 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.opentelemetry; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.tracing.PrecisionLevel; +import com.datastax.driver.core.tracing.TracingInfo; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import java.net.InetAddress; + +public class OpenTelemetryTracingInfo implements TracingInfo { + private Span span; + private final Tracer tracer; + private final Context context; + private boolean tracingStarted; + private final PrecisionLevel precision; + + protected OpenTelemetryTracingInfo(Tracer tracer, Context context, PrecisionLevel precision) { + this.tracer = tracer; + this.context = context; + this.precision = precision; + this.tracingStarted = false; + } + + public Tracer getTracer() { + return tracer; + } + + public Context getContext() { + return context.with(span); + } + + private void assertStarted() { + assert tracingStarted : "TracingInfo.setStartTime must be called before any other method"; + } + + public PrecisionLevel getPrecision() { + return precision; + } + + @Override + public void setNameAndStartTime(String name) { + assert !tracingStarted : "TracingInfo.setStartTime may only be called once."; + tracingStarted = true; + span = tracer.spanBuilder(name).setParent(context).startSpan(); + } + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) { + assertStarted(); + span.setAttribute("db.scylla.consistency_level", consistency.toString()); + } + + public void setStatement(String statement) { + assertStarted(); + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + span.setAttribute("db.scylla.statement", statement); + } + } + + public void setHostname(String hostname) { + assertStarted(); + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + span.setAttribute("net.peer.name", hostname); + } + } + + @Override + public void setStatementType(String statementType) { + assertStarted(); + span.setAttribute("db.scylla.statement_type", statementType); + } + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) { + assertStarted(); + span.setAttribute("db.scylla.retry_policy", retryPolicy.getClass().getSimpleName()); + } + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) { + assertStarted(); + span.setAttribute( + "db.scylla.load_balancing_policy", loadBalancingPolicy.getClass().getSimpleName()); + } + + @Override + public void setBatchSize(int batchSize) { + assertStarted(); + span.setAttribute("db.scylla.batch_size", String.valueOf(batchSize)); + } + + @Override + public void setRetryCount(int retryCount) { + assertStarted(); + span.setAttribute("db.scylla.retry_count", String.valueOf(retryCount)); + } + + @Override + public void setShardID(int shardID) { + assertStarted(); + span.setAttribute("db.scylla.shard_id", String.valueOf(shardID)); + } + + @Override + public void setPeerName(String peerName) { + assertStarted(); + span.setAttribute("net.peer.name", peerName); + } + + @Override + public void setPeerIP(InetAddress peerIP) { + assertStarted(); + span.setAttribute("net.peer.ip", peerIP.getHostAddress()); + } + + @Override + public void setPeerPort(int peerPort) { + assertStarted(); + span.setAttribute("net.peer.port", String.valueOf(peerPort)); + } + + @Override + public void setQueryPaged(Boolean queryPaged) { + assertStarted(); + if (queryPaged) span.setAttribute("db.scylla.query_paged", "true"); + else span.setAttribute("db.scylla.query_paged", "false"); + } + + @Override + public void setRowsCount(int rowsCount) { + assertStarted(); + span.setAttribute("db.scylla.rows_count", rowsCount); + } + + @Override + public void setStatement(String statement, int limit) { + assertStarted(); + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (statement.length() > limit) statement = statement.substring(0, limit); + span.setAttribute("db.scylla.statement", statement); + } + } + + @Override + public void setKeyspace(String keyspace) { + assertStarted(); + span.setAttribute("db.scylla.keyspace", keyspace); + } + + @Override + public void setPartitionKey(String partitionKey) { + assertStarted(); + span.setAttribute("db.scylla.partition_key", partitionKey); + } + + @Override + public void setTable(String table) { + assertStarted(); + span.setAttribute("db.scylla.table", table); + } + + @Override + public void setOperationType(String operationType) { + assertStarted(); + span.setAttribute("db.operation", operationType); + } + + @Override + public void setReplicas(String replicas) { + assertStarted(); + span.setAttribute("db.scylla.replicas", replicas); + } + + private io.opentelemetry.api.trace.StatusCode mapStatusCode(StatusCode code) { + switch (code) { + case OK: + return io.opentelemetry.api.trace.StatusCode.OK; + case ERROR: + return io.opentelemetry.api.trace.StatusCode.ERROR; + } + return null; + } + + @Override + public void recordException(Exception exception) { + assertStarted(); + span.recordException(exception); + } + + @Override + public void setStatus(StatusCode code, String description) { + assertStarted(); + span.setStatus(mapStatusCode(code), description); + } + + @Override + public void setStatus(StatusCode code) { + assertStarted(); + span.setStatus(mapStatusCode(code)); + } + + @Override + public void tracingFinished() { + assertStarted(); + span.end(); + } + + private boolean currentPrecisionLevelIsAtLeast(PrecisionLevel requiredLevel) { + return requiredLevel.compareTo(precision) <= 0; + } +} diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java new file mode 100644 index 00000000000..64ff29f82a9 --- /dev/null +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.opentelemetry; + +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.PrecisionLevel; +import com.datastax.driver.core.tracing.TracingInfo; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; + +public class OpenTelemetryTracingInfoFactory implements TracingInfoFactory { + private final Tracer tracer; + private final PrecisionLevel precision; + + public OpenTelemetryTracingInfoFactory(final Tracer tracer) { + this(tracer, PrecisionLevel.NORMAL); + } + + public OpenTelemetryTracingInfoFactory(final Tracer tracer, final PrecisionLevel precision) { + this.tracer = tracer; + this.precision = precision; + } + + @Override + public TracingInfo buildTracingInfo() { + final Context current = Context.current(); + return new OpenTelemetryTracingInfo(tracer, current, precision); + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + if (parent instanceof OpenTelemetryTracingInfo) { + final OpenTelemetryTracingInfo castedParent = (OpenTelemetryTracingInfo) parent; + return new OpenTelemetryTracingInfo( + castedParent.getTracer(), castedParent.getContext(), castedParent.getPrecision()); + } + + return new NoopTracingInfoFactory().buildTracingInfo(); + } + + public TracingInfo buildTracingInfo(Span parent) { + final Context current = Context.current().with(parent); + return new OpenTelemetryTracingInfo(tracer, current, precision); + } +} diff --git a/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java new file mode 100644 index 00000000000..f77c4941c90 --- /dev/null +++ b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java @@ -0,0 +1,171 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.opentelemetry; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.testng.annotations.Test; + +/** Tests for OpenTelemetry integration. */ +public class OpenTelemetryTest extends CCMTestsSupport { + /** Collects and saves spans. */ + private static final class SpansCollector implements SpanProcessor { + final Collection startedSpans = + Collections.synchronizedList(new ArrayList()); + final Collection spans = + Collections.synchronizedList(new ArrayList()); + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) { + startedSpans.add(span); + } + + @Override + public boolean isStartRequired() { + return true; + } + + @Override + public void onEnd(ReadableSpan span) { + spans.add(span); + } + + @Override + public boolean isEndRequired() { + return true; + } + + public Collection getSpans() { + for (ReadableSpan span : startedSpans) { + assertTrue(span.hasEnded()); + } + + return spans; + } + } + + private Session session; + + /** + * Prepare OpenTelemetry configuration and run test with it. + * + * @param test test to run. + * @return collected spans. + */ + private Collection collectSpans(BiConsumer test) { + final Resource serviceNameResource = + Resource.create( + Attributes.of(ResourceAttributes.SERVICE_NAME, "Scylla Java driver - test")); + + final SpansCollector collector = new SpansCollector(); + + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(collector) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + final OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).buildAndRegisterGlobal(); + + final Tracer tracer = openTelemetry.getTracerProvider().get("this"); + final OpenTelemetryTracingInfoFactory tracingInfoFactory = + new OpenTelemetryTracingInfoFactory(tracer); + cluster().setTracingInfoFactory(tracingInfoFactory); + session = cluster().connect(); + + session.execute("USE " + keyspace); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + collector.getSpans().clear(); + + test.accept(tracer, tracingInfoFactory); + + tracerProvider.close(); + cluster().setTracingInfoFactory(new NoopTracingInfoFactory()); + + return collector.getSpans(); + } + + /** Basic test for creating spans. */ + @Test(groups = "short") + public void simpleTracingTest() { + final Collection spans = + collectSpans( + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("INSERT INTO t(k, v) VALUES (4, 2)"); + session.execute("INSERT INTO t(k, v) VALUES (2, 1)"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection rootSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(rootSpans.size(), 2); + + rootSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + }); + } +} diff --git a/pom.xml b/pom.xml index 307e2df8864..e925b5a3c6e 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,7 @@ driver-examples driver-tests driver-dist + driver-opentelemetry @@ -133,6 +134,12 @@ ${project.parent.version} + + com.scylladb + scylla-driver-opentelemetry + ${project.parent.version} + + com.google.guava guava @@ -727,6 +734,9 @@ java18 1.0 + + io.opentelemetry:* +