diff --git a/clirr-ignores.xml b/clirr-ignores.xml index 9784b61c368..fbe39834e5a 100644 --- a/clirr-ignores.xml +++ b/clirr-ignores.xml @@ -16,6 +16,18 @@ Modified by ScyllaDB --> + + 7012 + com/datastax/driver/core/PreparedStatement + java.lang.String getOperationType() + New method to get the type of operation performed by this PreparedStatement + + + 7012 + com/datastax/driver/core/Session + com.datastax.driver.core.tracing.TracingInfoFactory getTracingInfoFactory() + New method to get the TracingInfo data factory associated with this Session + 7004 com/datastax/driver/core/Metadata diff --git a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java index 9317bd0a58b..cc5117be68c 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java @@ -74,6 +74,8 @@ public class BoundStatement extends Statement private ByteBuffer routingKey; + private final String operationType; + /** * Creates a new {@code BoundStatement} from the provided prepared statement. * @@ -92,6 +94,7 @@ public BoundStatement(PreparedStatement statement) { this.setSerialConsistencyLevel(statement.getSerialConsistencyLevel()); if (statement.isTracing()) this.enableTracing(); if (statement.getRetryPolicy() != null) this.setRetryPolicy(statement.getRetryPolicy()); + this.operationType = statement.getOperationType(); if (statement.getOutgoingPayload() != null) this.setOutgoingPayload(statement.getOutgoingPayload()); else @@ -104,6 +107,10 @@ public BoundStatement(PreparedStatement statement) { } } + public String getOperationType() { + return operationType; + } + @Override public boolean isLWT() { return statement.isLWT(); diff --git a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java index cdac5cff4d0..2a7ceeafca1 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java @@ -39,6 +39,8 @@ import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Functions; @@ -146,6 +148,8 @@ public class Cluster implements Closeable { final Manager manager; + private TracingInfoFactory tracingInfoFactory = new NoopTracingInfoFactory(); + /** * Constructs a new Cluster instance. * @@ -197,6 +201,25 @@ private Cluster( this.manager = new Manager(name, contactPoints, configuration, listeners); } + /** + * The tracingInfo factory class used by this Cluster. + * + * @return the factory used currently by this Cluster. + */ + public TracingInfoFactory getTracingInfoFactory() { + return tracingInfoFactory; + } + + /** + * Sets desired factory for tracing information for this Cluster. By default it is {@link + * com.datastax.driver.core.tracing.NoopTracingInfoFactory} + * + * @param tracingInfoFactory the factory to be set for this Cluster. + */ + public void setTracingInfoFactory(TracingInfoFactory tracingInfoFactory) { + this.tracingInfoFactory = tracingInfoFactory; + } + /** * Initialize this Cluster instance. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/CodecUtils.java b/driver-core/src/main/java/com/datastax/driver/core/CodecUtils.java index afaa7176bca..da342d95578 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/CodecUtils.java +++ b/driver-core/src/main/java/com/datastax/driver/core/CodecUtils.java @@ -24,6 +24,8 @@ public final class CodecUtils { private static final long EPOCH_AS_CQL_LONG = (1L << 31); + private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); + private CodecUtils() {} /** @@ -212,6 +214,19 @@ public static long fromDaysSinceEpochToCqlDate(int days) { return ((long) days + EPOCH_AS_CQL_LONG); } + public static String bytesToHex(byte[] bytes) { + final int INITIAL_CHARS = 2; + char[] hexChars = new char[INITIAL_CHARS + bytes.length * 2]; + hexChars[0] = '0'; + hexChars[1] = 'x'; + for (int j = 0; j < bytes.length; j++) { + int v = bytes[j] & 0xFF; + hexChars[INITIAL_CHARS + j * 2] = HEX_ARRAY[v >>> 4]; + hexChars[INITIAL_CHARS + j * 2 + 1] = HEX_ARRAY[v & 0x0F]; + } + return new String(hexChars); + } + private static int sizeOfCollectionSize(ProtocolVersion version) { switch (version) { case V1: diff --git a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java index 287a55cbdb3..6297660e5a2 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java @@ -41,6 +41,7 @@ public class DefaultPreparedStatement implements PreparedStatement { final Cluster cluster; final boolean isLWT; final Token.Factory partitioner; + final String operationType; volatile ByteBuffer routingKey; @@ -66,6 +67,7 @@ private DefaultPreparedStatement( this.cluster = cluster; this.isLWT = isLWT; this.partitioner = partitioner; + this.operationType = null; } static DefaultPreparedStatement fromMessage( @@ -315,4 +317,10 @@ public Boolean isIdempotent() { public boolean isLWT() { return isLWT; } + + /** {@inheritDoc} */ + @Override + public String getOperationType() { + return operationType; + } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java index e87cbc0341b..779c35a1672 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java @@ -361,4 +361,7 @@ public interface PreparedStatement { /** Whether a prepared statement is LWT statement */ public boolean isLWT(); + + /** Type of prepared operation (e.g. SELECT) */ + public String getOperationType(); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index a8a05f093f5..7c04d67282e 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -40,6 +40,7 @@ import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.RetryPolicy.RetryDecision.Type; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy.SpeculativeExecutionPlan; +import com.datastax.driver.core.tracing.TracingInfo; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -47,6 +48,7 @@ import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Iterator; @@ -73,6 +75,10 @@ class RequestHandler { private static final QueryLogger QUERY_LOGGER = QueryLogger.builder().build(); static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS"; + private static final int STATEMENT_MAX_LENGTH = 1000; + private static final int PARTITION_KEY_MAX_LENGTH = 1000; + private static final int BOUND_VALUES_MAX_LENGTH = 1000; + final String id; private final SessionManager manager; @@ -95,6 +101,8 @@ class RequestHandler { private final AtomicBoolean isDone = new AtomicBoolean(); private final AtomicInteger executionIndex = new AtomicInteger(); + private final TracingInfo tracingInfo; + private Iterator getReplicas( String loggedKeyspace, Statement statement, Iterator fallback) { ProtocolVersion protocolVersion = manager.cluster.manager.protocolVersion(); @@ -120,7 +128,8 @@ private Iterator getReplicas( return replicas.iterator(); } - public RequestHandler(SessionManager manager, Callback callback, Statement statement) { + public RequestHandler( + SessionManager manager, Callback callback, Statement statement, TracingInfo tracingInfo) { this.id = Long.toString(System.identityHashCode(this)); if (logger.isTraceEnabled()) logger.trace("[{}] {}", id, statement); this.manager = manager; @@ -156,6 +165,93 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null; this.startTime = System.nanoTime(); + + ConsistencyLevel consistency = statement.getConsistencyLevel(); + if (consistency == null) consistency = Statement.DEFAULT.getConsistencyLevel(); + + String statementType = null; + String statementText = null; + Integer batchSize = null; + + String keyspace = null; + String partitionKey = null; + String boundValues = null; + String table = null; + String operationType = null; + + if (statement instanceof BatchStatement) { + statementType = "batch"; + batchSize = ((BatchStatement) statement).size(); + StringBuilder statementTextBuilder = new StringBuilder(STATEMENT_MAX_LENGTH); + for (Statement subStatement : ((BatchStatement) statement).getStatements()) { + if (subStatement instanceof BoundStatement) + statementTextBuilder.append(((BoundStatement) subStatement).statement.getQueryString()); + else statementTextBuilder.append(subStatement.toString()); + } + statementText = statementTextBuilder.toString(); + } else if (statement instanceof BoundStatement) { + statementType = "prepared"; + statementText = ((BoundStatement) statement).statement.getQueryString(); + keyspace = ((BoundStatement) statement).getKeyspace(); + operationType = ((BoundStatement) statement).getOperationType(); + + ColumnDefinitions boundColumns = + ((BoundStatement) statement).statement.getPreparedId().boundValuesMetadata.variables; + + StringBuilder boundValuesBuilder = new StringBuilder(BOUND_VALUES_MAX_LENGTH); + StringBuilder partitionKeyBuilder = new StringBuilder(PARTITION_KEY_MAX_LENGTH); + int[] rkIndexes = ((BoundStatement) statement).statement.getPreparedId().routingKeyIndexes; + + for (int i = 0; i < boundColumns.size(); ++i) { + Object value = ((BoundStatement) statement).getObject(i); + String valueString = + (value == null) + ? "NULL" + : value instanceof ByteBuffer + ? CodecUtils.bytesToHex(((ByteBuffer) value).array()) + : value.toString(); + String columnName = boundColumns.getName(i); + if (boundValuesBuilder.length() > 0) boundValuesBuilder.append(", "); + boundValuesBuilder.append(columnName); + boundValuesBuilder.append('='); + boundValuesBuilder.append(valueString); + + if (rkIndexes != null) { + for (int j : rkIndexes) { + if (i == j) { + if (partitionKeyBuilder.length() > 0) partitionKeyBuilder.append(", "); + partitionKeyBuilder.append(columnName); + partitionKeyBuilder.append('='); + partitionKeyBuilder.append(valueString); + break; + } + } + } + } + boundValues = boundValuesBuilder.toString(); + partitionKey = partitionKeyBuilder.toString(); + + if (boundColumns.size() > 0) table = boundColumns.getTable(0); + } else if (statement instanceof RegularStatement) { + statementType = "regular"; + statementText = ((RegularStatement) statement).toString(); + } + + this.tracingInfo = tracingInfo; + this.tracingInfo.setNameAndStartTime("request"); + this.tracingInfo.setConsistencyLevel(consistency); + this.tracingInfo.setRetryPolicy(retryPolicy()); + this.tracingInfo.setLoadBalancingPolicy(manager.loadBalancingPolicy()); + this.tracingInfo.setSpeculativeExecutionPolicy(manager.speculativeExecutionPolicy()); + if (statement.getFetchSize() > 0) this.tracingInfo.setFetchSize(statement.getFetchSize()); + if (statementType != null) this.tracingInfo.setStatementType(statementType); + if (statementText != null) this.tracingInfo.setStatement(statementText, STATEMENT_MAX_LENGTH); + if (batchSize != null) this.tracingInfo.setBatchSize(batchSize); + if (keyspace != null) this.tracingInfo.setKeyspace(keyspace); + if (boundValues != null) this.tracingInfo.setBoundValues(boundValues); + if (partitionKey != null) this.tracingInfo.setPartitionKey(partitionKey); + if (table != null) this.tracingInfo.setTable(table); + if (operationType != null) this.tracingInfo.setOperationType(operationType); } void sendRequest() { @@ -273,7 +369,22 @@ private void setFinalResult( && logger.isWarnEnabled()) { logServerWarnings(response.warnings); } + + if (response.type == Message.Response.Type.RESULT) { + Responses.Result rm = (Responses.Result) response; + if (rm.kind == Responses.Result.Kind.ROWS) { + Responses.Result.Rows r = (Responses.Result.Rows) rm; + tracingInfo.setRowsCount(r.data.size()); + tracingInfo.setHasMorePages(r.metadata.pagingState != null); + } + } callback.onSet(connection, response, info, statement, System.nanoTime() - startTime); + + tracingInfo.setStatus( + response.type == Message.Response.Type.ERROR + ? TracingInfo.StatusCode.ERROR + : TracingInfo.StatusCode.OK); + tracingInfo.tracingFinished(); } catch (Exception e) { callback.onException( connection, @@ -281,6 +392,10 @@ private void setFinalResult( "Unexpected exception while setting final result from " + response, e), System.nanoTime() - startTime, /*unused*/ 0); + + tracingInfo.recordException(e); + tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, e.toString()); + tracingInfo.tracingFinished(); } } @@ -305,6 +420,10 @@ private void setFinalException( cancelPendingExecutions(execution); + tracingInfo.recordException(exception); + tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, exception.toString()); + tracingInfo.tracingFinished(); + try { if (timerContext != null) timerContext.stop(); } finally { @@ -315,6 +434,9 @@ private void setFinalException( // Triggered when an execution reaches the end of the query plan. // This is only a failure if there are no other running executions. private void reportNoMoreHosts(SpeculativeExecution execution) { + execution.parentTracingInfo.setAttemptCount(execution.retryCount() + 1); + execution.parentTracingInfo.setStatus(TracingInfo.StatusCode.ERROR); + execution.parentTracingInfo.tracingFinished(); runningExecutions.remove(execution); if (runningExecutions.isEmpty()) setFinalException( @@ -383,11 +505,17 @@ class SpeculativeExecution implements Connection.ResponseCallback { private volatile Connection.ResponseHandler connectionHandler; + private final TracingInfo parentTracingInfo; + private TracingInfo currentChildTracingInfo; + SpeculativeExecution(Message.Request request, int position) { this.id = RequestHandler.this.id + "-" + position; this.request = request; this.position = position; this.queryStateRef = new AtomicReference(QueryState.INITIAL); + this.parentTracingInfo = + manager.getTracingInfoFactory().buildTracingInfo(RequestHandler.this.tracingInfo); + this.parentTracingInfo.setNameAndStartTime("speculative_execution"); if (logger.isTraceEnabled()) logger.trace("[{}] Starting", id); } @@ -429,6 +557,13 @@ private boolean query(final Host host) { if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", id, host); + currentChildTracingInfo = manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); + currentChildTracingInfo.setNameAndStartTime("attempt"); + InetSocketAddress hostAddress = host.getEndPoint().resolve(); + currentChildTracingInfo.setPeerName(hostAddress.getHostName()); + currentChildTracingInfo.setPeerIP(hostAddress.getAddress()); + currentChildTracingInfo.setPeerPort(hostAddress.getPort()); + if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true)) scheduleExecution(speculativeExecutionPlan.nextExecution(host)); @@ -647,6 +782,9 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setAttemptCount(previous.retryCount + 1); + parentTracingInfo.setStatus(TracingInfo.StatusCode.OK); + parentTracingInfo.tracingFinished(); return; } else if (!previous.inProgress && queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_COMPLETE)) { @@ -659,6 +797,9 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setAttemptCount(previous.retryCount + 1); + parentTracingInfo.setStatus(TracingInfo.StatusCode.OK); + parentTracingInfo.tracingFinished(); return; } } @@ -674,6 +815,10 @@ public Message.Request request() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); + currentChildTracingInfo.setStatus(TracingInfo.StatusCode.OK); + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -832,7 +977,10 @@ public void onSet( toPrepare.getQueryKeyspace(), connection.endPoint); - write(connection, prepareAndRetry(toPrepare.getQueryString())); + TracingInfo prepareTracingInfo = + manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); + prepareTracingInfo.setNameAndStartTime("prepare"); + write(connection, prepareAndRetry(toPrepare.getQueryString(), prepareTracingInfo)); // we're done for now, the prepareAndRetry callback will handle the rest return; case READ_FAILURE: @@ -878,7 +1026,8 @@ public void onSet( } } - private Connection.ResponseCallback prepareAndRetry(final String toPrepare) { + private Connection.ResponseCallback prepareAndRetry( + final String toPrepare, final TracingInfo prepareTracingInfo) { // do not bother inspecting retry policy at this step, no other decision // makes sense than retry on the same host if the query was prepared, // or on another host, if an error/timeout occurred. @@ -902,6 +1051,8 @@ public int retryCount() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -944,11 +1095,14 @@ public void onSet( @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); SpeculativeExecution.this.onException(connection, exception, latency, retryCount); } @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -973,6 +1127,11 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) { @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); + currentChildTracingInfo.recordException(exception); + currentChildTracingInfo.setStatus(TracingInfo.StatusCode.ERROR); + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -1010,6 +1169,10 @@ public void onException( @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); + currentChildTracingInfo.setStatus(TracingInfo.StatusCode.ERROR, "timeout"); + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -1051,10 +1214,16 @@ public int retryCount() { } private void setFinalException(Connection connection, Exception exception) { + parentTracingInfo.setAttemptCount(retryCount() + 1); + parentTracingInfo.setStatus(TracingInfo.StatusCode.ERROR); + parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalException(this, connection, exception); } private void setFinalResult(Connection connection, Message.Response response) { + parentTracingInfo.setAttemptCount(retryCount() + 1); + parentTracingInfo.setStatus(TracingInfo.StatusCode.OK); + parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalResult(this, connection, response); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/Session.java b/driver-core/src/main/java/com/datastax/driver/core/Session.java index a8e4a59b2a8..0e23caa8e78 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Session.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Session.java @@ -13,6 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/* + * Copyright (C) 2021 ScyllaDB + * + * Modified by ScyllaDB + */ package com.datastax.driver.core; import com.datastax.driver.core.exceptions.AuthenticationException; @@ -20,6 +26,7 @@ import com.datastax.driver.core.exceptions.QueryExecutionException; import com.datastax.driver.core.exceptions.QueryValidationException; import com.datastax.driver.core.exceptions.UnsupportedFeatureException; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.google.common.util.concurrent.ListenableFuture; import java.io.Closeable; import java.util.Collection; @@ -41,6 +48,13 @@ */ public interface Session extends Closeable { + /** + * The tracingInfo factory class used by this Session. + * + * @return the factory used currently by this Session. + */ + TracingInfoFactory getTracingInfoFactory(); + /** * The keyspace to which this Session is currently logged in, if any. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java index ada9cc1699c..4a4dcb2c159 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java @@ -29,6 +29,8 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.TracingInfo; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; @@ -76,6 +78,11 @@ class SessionManager extends AbstractSession { this.poolsState = new HostConnectionPool.PoolState(); } + @Override + public TracingInfoFactory getTracingInfoFactory() { + return cluster.getTracingInfoFactory(); + } + @Override public Session init() { try { @@ -155,6 +162,7 @@ public ResultSetFuture executeAsync(final Statement statement) { // Because of the way the future is built, we need another 'proxy' future that we can return // now. final ChainedResultSetFuture chainedFuture = new ChainedResultSetFuture(); + final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo(); this.initAsync() .addListener( new Runnable() { @@ -165,7 +173,7 @@ public void run() { SessionManager.this, cluster.manager.protocolVersion(), makeRequestMessage(statement, null)); - execute(actualFuture, statement); + execute(actualFuture, statement, tracingInfo); chainedFuture.setSource(actualFuture); } }, @@ -706,25 +714,34 @@ else if (fetchSize != Integer.MAX_VALUE) *

This method will find a suitable node to connect to using the {@link LoadBalancingPolicy} * and handle host failover. */ - void execute(final RequestHandler.Callback callback, final Statement statement) { + void execute( + final RequestHandler.Callback callback, + final Statement statement, + final TracingInfo tracingInfo) { if (this.isClosed()) { callback.onException( null, new IllegalStateException("Could not send request, session is closed"), 0, 0); return; } - if (isInit) new RequestHandler(this, callback, statement).sendRequest(); + if (isInit) new RequestHandler(this, callback, statement, tracingInfo).sendRequest(); else this.initAsync() .addListener( new Runnable() { @Override public void run() { - new RequestHandler(SessionManager.this, callback, statement).sendRequest(); + new RequestHandler(SessionManager.this, callback, statement, tracingInfo) + .sendRequest(); } }, executor()); } + void execute(final RequestHandler.Callback callback, final Statement statement) { + final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo(); + execute(callback, statement, tracingInfo); + } + private ListenableFuture prepare( final PreparedStatement statement, EndPoint toExclude) { final String query = statement.getQueryString(); diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java new file mode 100644 index 00000000000..eeef3e30bcb --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import java.net.InetAddress; + +public class NoopTracingInfoFactory implements TracingInfoFactory { + + private static class NoopTracingInfo implements TracingInfo { + @Override + public void setNameAndStartTime(String name) {} + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) {} + + @Override + public void setStatementType(String statementType) {} + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) {} + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) {}; + + @Override + public void setSpeculativeExecutionPolicy( + SpeculativeExecutionPolicy speculativeExecutionPolicy) {} + + @Override + public void setBatchSize(int batchSize) {} + + @Override + public void setAttemptCount(int attemptCount) {} + + @Override + public void setShardID(int shardID) {} + + @Override + public void setPeerName(String peerName) {} + + @Override + public void setPeerIP(InetAddress peerIP) {} + + @Override + public void setPeerPort(int peerPort) {} + + @Override + public void setFetchSize(int fetchSize) {} + + @Override + public void setHasMorePages(boolean hasMorePages) {} + + @Override + public void setRowsCount(int rowsCount) {} + + @Override + public void setStatement(String statement, int limit) {} + + @Override + public void setKeyspace(String keyspace) {} + + @Override + public void setBoundValues(String boundValues) {} + + @Override + public void setPartitionKey(String partitionKey) {} + + @Override + public void setTable(String table) {} + + @Override + public void setOperationType(String operationType) {} + + @Override + public void setReplicas(String replicas) {} + + @Override + public void recordException(Exception exception) {} + + @Override + public void setStatus(StatusCode code, String description) {} + + @Override + public void setStatus(StatusCode code) {} + + @Override + public void tracingFinished() {} + } + + public static final NoopTracingInfo INSTANCE = new NoopTracingInfo(); + + @Override + public TracingInfo buildTracingInfo() { + return INSTANCE; + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + return new NoopTracingInfo(); + } +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java new file mode 100644 index 00000000000..a4c7bf0a26d --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import java.net.InetAddress; + +/** + * An abstraction layer over instrumentation library API, corresponding to a logical span in the + * trace. + */ +public interface TracingInfo { + + /** Final status of the traced execution. */ + enum StatusCode { + OK, + ERROR, + } + + /** + * Starts a span corresponding to this {@link TracingInfo} object. Must be called exactly once, + * before any other method, at the beginning of the traced execution. + * + * @param name the name given to the span being created. + */ + void setNameAndStartTime(String name); + + /** + * Adds provided consistency level to the trace. + * + * @param consistency the consistency level to be set. + */ + void setConsistencyLevel(ConsistencyLevel consistency); + + /** + * Adds provided statement type to the trace. + * + * @param statementType the statement type to be set. + */ + void setStatementType(String statementType); + + /** + * Adds provided retry policy to the trace. + * + * @param retryPolicy the retry policy to be set. + */ + void setRetryPolicy(RetryPolicy retryPolicy); + + /** + * Adds provided load balancing policy to the trace. + * + * @param loadBalancingPolicy the load balancing policy to be set. + */ + void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy); + + /** + * Adds provided speculative execution policy to the trace. + * + * @param speculativeExecutionPolicy the speculative execution policy to be set. + */ + void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy speculativeExecutionPolicy); + + /** + * Adds provided batch size to the trace. + * + * @param batchSize the batch size to be set. + */ + void setBatchSize(int batchSize); + + /** + * Adds provided attempt count to the trace. + * + * @param attemptCount the attempt count to be set. + */ + void setAttemptCount(int attemptCount); + + /** + * Adds provided shard ID to the trace. + * + * @param shardID the shard ID to be set. + */ + void setShardID(int shardID); + + /** + * Adds provided peer name to the trace. + * + * @param peerName the peer name to be set. + */ + void setPeerName(String peerName); + + /** + * Adds provided peer IP to the trace. + * + * @param peerIP the peer IP to be set. + */ + void setPeerIP(InetAddress peerIP); + + /** + * Adds provided peer port to the trace. + * + * @param peerPort the peer port to be set. + */ + void setPeerPort(int peerPort); + + /** + * Adds to the trace maximum number of rows fetched in one query. + * + * @param fetchSize maximum number of rows fetched in one query. + */ + void setFetchSize(int fetchSize); + + /** + * Adds to the trace information whether the query is paged and has more pages pending to be + * fetched. + * + * @param hasMorePages whether the query has more pages to be fetched. + */ + void setHasMorePages(boolean hasMorePages); + + /** + * Adds provided number of returned rows to the trace. + * + * @param rowsCount the number of returned rows to be set. + */ + void setRowsCount(int rowsCount); + + /** + * Adds provided statement text to the trace. If the statement length is greater than the given + * limit, the statement is truncated to the first {@param limit} characters. + * + * @param statement the statement text to be set. + * @param limit the statement length limit. + */ + void setStatement(String statement, int limit); + + /** + * Adds provided keyspace to the trace. + * + * @param keyspace the keyspace to be set. + */ + void setKeyspace(String keyspace); + + /** + * Adds provided bound values string to the trace. + * + * @param boundValues boundValues string to be set. + */ + void setBoundValues(String boundValues); + + /** + * Adds provided partition key string to the trace. + * + * @param partitionKey the partitionKey to be set. + */ + void setPartitionKey(String partitionKey); + + /** + * Adds provided table name to the trace. + * + * @param table the table name to be set. + */ + void setTable(String table); + + /** + * Adds provided operation type (e.g. SELECT) to the trace. + * + * @param operationType the operation type to be set. + */ + void setOperationType(String operationType); + + /** + * Adds provided list of contacted replicas to the trace. + * + * @param replicas the list of contacted replicas to be set. + */ + void setReplicas(String replicas); + + /** + * Records in the trace that the provided exception occured. + * + * @param exception the exception to be recorded. + */ + void recordException(Exception exception); + + /** + * Sets the final status of the traced execution. + * + * @param code the status code to be set. + */ + void setStatus(StatusCode code); + + /** + * Sets the final status of the traced execution, with additional description. + * + * @param code the status code to be set. + * @param description the additional description of the status. + */ + void setStatus(StatusCode code, String description); + + /** Must be always called exactly once at the logical end of traced execution. */ + void tracingFinished(); +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java new file mode 100644 index 00000000000..25c20f3d9ec --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** Factory of trace info objects. */ +public interface TracingInfoFactory { + + /** Creates new trace info object, inheriting global context. */ + TracingInfo buildTracingInfo(); + + /** + * Creates new trace info object, inheriting context from provided another trace info object. + * + * @param parent the trace info object to be set as the parent of newly created trace info object. + */ + TracingInfo buildTracingInfo(TracingInfo parent); +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/VerbosityLevel.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/VerbosityLevel.java new file mode 100644 index 00000000000..b37cef5a59b --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/VerbosityLevel.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** The verbosity level of tracing data that is to be collected. May be extended in the future. */ +public enum VerbosityLevel { + // Enum elements must be listed in ascending order of verbosity level (i.e. each next element + // listed adds some new information). + NORMAL, + FULL; +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java b/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java index 098164caa88..b4b73356186 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java @@ -27,6 +27,7 @@ import static org.testng.Assert.assertTrue; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.CassandraVersion; import com.google.common.base.Function; import com.google.common.base.Throwables; @@ -286,6 +287,11 @@ public ResultSet execute(Statement statement) { return executeAsync(statement).getUninterruptibly(); } + @Override + public TracingInfoFactory getTracingInfoFactory() { + return session.getTracingInfoFactory(); + } + @Override public String getLoggedKeyspace() { return session.getLoggedKeyspace(); diff --git a/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java b/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java index 0bee5ec9a8d..03d01e18cb4 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java @@ -29,7 +29,8 @@ import org.testng.annotations.Test; public class DelegatingClusterTest { - private static final Set NON_DELEGATED_METHODS = ImmutableSet.of("getClusterName"); + private static final Set NON_DELEGATED_METHODS = + ImmutableSet.of("getClusterName", "setTracingInfoFactory", "getTracingInfoFactory"); /** * Checks that all methods of {@link DelegatingCluster} invoke their counterpart in {@link diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java new file mode 100644 index 00000000000..57cebceccd2 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java @@ -0,0 +1,360 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import static org.junit.Assert.assertNotNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.policies.NoSpeculativeExecutionPolicy; +import com.datastax.driver.core.policies.PagingOptimizingLoadBalancingPolicy; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import org.testng.annotations.Test; + +public class BasicTracingTest extends CCMTestsSupport { + private static TestTracingInfoFactory testTracingInfoFactory; + private Session session; + + @Override + public void onTestContextInitialized() { + initializeTestTracing(); + session.execute("USE " + keyspace); + session.execute("DROP TABLE IF EXISTS t"); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + session.execute("CREATE TABLE blobs (k int PRIMARY KEY, v blob)"); + session.execute("INSERT INTO t(k, v) VALUES (2, 3)"); + session.execute("INSERT INTO t(k, v) VALUES (1, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (5, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (6, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (7, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (8, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (9, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (10, 7)"); + + Collection spans = testTracingInfoFactory.getSpans(); + spans.clear(); + } + + @Test(groups = "short") + public void simpleTracingTest() { + session.execute("INSERT INTO t(k, v) VALUES (4, 5)"); + + Collection spans = testTracingInfoFactory.getSpans(); + assertNotEquals(spans.size(), 0); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + spans.clear(); + } + + @Test(groups = "short") + public void tagsInsertTest() { + PreparedStatement prepared = session.prepare("INSERT INTO blobs(k, v) VALUES (?, ?)"); + + Collection prepareSpans = testTracingInfoFactory.getSpans(); + assertNotEquals(prepareSpans.size(), 0); + assertTrue(getRoot(prepareSpans) instanceof TestTracingInfo); + prepareSpans.clear(); + + BoundStatement bound = prepared.bind(1, ByteBuffer.wrap("\n\0\n".getBytes())); + session.execute(bound); + + Collection spans = testTracingInfoFactory.getSpans(); + assertNotEquals(spans.size(), 0); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + // these tags should be set for request span + assertEquals(root.getStatementType(), "prepared"); + assertNull(root.getBatchSize()); + assertEquals(root.getConsistencyLevel(), ConsistencyLevel.ONE); + assertNull(root.getRowsCount()); // no rows are returned in INSERT + assertTrue(root.getLoadBalancingPolicy() instanceof PagingOptimizingLoadBalancingPolicy); + assertTrue(root.getSpeculativeExecutionPolicy() instanceof NoSpeculativeExecutionPolicy); + assertTrue(root.getRetryPolicy() instanceof DefaultRetryPolicy); + assertNull(root.getFetchSize()); // fetch size was not explicitly set for this statement + assertNull(root.getHasMorePages()); // no paging are returned in INSERT + assertNull(root.getStatement()); // because of precision level NORMAL + // these are tags specific to bound statement + assertEquals(root.getKeyspace(), keyspace); + assertEquals(root.getBoundValues(), "k=1, v=0x0A000A"); // "\n\0\n" + assertEquals(root.getPartitionKey(), "k=1"); + assertEquals(root.getTable(), "blobs"); + + // these tags should not be set for request span + assertNull(root.getPeerName()); + assertNull(root.getPeerIP()); + assertNull(root.getPeerPort()); + assertNull(root.getAttemptCount()); + + ArrayList speculativeExecutions = getChildren(spans, root); + assertTrue(speculativeExecutions.size() > 0); + + for (TracingInfo speculativeExecutionSpan : speculativeExecutions) { + assertTrue(speculativeExecutionSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) speculativeExecutionSpan; + + // these tags should not be set for speculative execution span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getFetchSize()); + assertNull(tracingInfo.getHasMorePages()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getPeerName()); + assertNull(tracingInfo.getPeerIP()); + assertNull(tracingInfo.getPeerPort()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // this tag should be set for speculative execution span + assertTrue(tracingInfo.getAttemptCount() >= 1); + } + + ArrayList attempts = new ArrayList(); + for (TracingInfo tracingInfo : speculativeExecutions) { + attempts.addAll(getChildren(spans, tracingInfo)); + } + assertTrue(attempts.size() > 0); + + for (TracingInfo attemptSpan : attempts) { + assertTrue(attemptSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) attemptSpan; + + // these tags should not be set for attempt span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getFetchSize()); + assertNull(tracingInfo.getHasMorePages()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getAttemptCount()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // these tags should be set for attempt span + assertNotNull(tracingInfo.getPeerName()); + assertNotNull(tracingInfo.getPeerIP()); + assertNotNull(tracingInfo.getPeerPort()); + assertTrue(tracingInfo.getPeerPort() >= 0 && tracingInfo.getPeerPort() <= 65535); + } + + spans.clear(); + } + + @Test(groups = "short") + public void tagsSelectTest() { + SimpleStatement s = new SimpleStatement("SELECT k FROM t WHERE v = 7 ALLOW FILTERING"); + s.setFetchSize(2); + s.setIdempotent(true); + s.setRetryPolicy(FallthroughRetryPolicy.INSTANCE); + s.setConsistencyLevel(ConsistencyLevel.QUORUM); + + final Collection spans = testTracingInfoFactory.getSpans(); + class SpanChecks { + int totalRows = 0; + + void checkTotalCount() { + assertEquals(totalRows, 7); + } + + void checkAssertions(boolean hasMorePages) { + assertEquals(spans.size(), 3); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + // these tags should be set for request span + assertEquals(root.getStatementType(), "regular"); + assertNull(root.getBatchSize()); + assertEquals(root.getConsistencyLevel(), ConsistencyLevel.QUORUM); + assertNotNull(root.getRowsCount()); + totalRows += root.getRowsCount(); + assertTrue(root.getLoadBalancingPolicy() instanceof PagingOptimizingLoadBalancingPolicy); + assertTrue(root.getSpeculativeExecutionPolicy() instanceof NoSpeculativeExecutionPolicy); + assertTrue(root.getRetryPolicy() == FallthroughRetryPolicy.INSTANCE); + assertEquals(root.getFetchSize(), new Integer(2)); + assertEquals(root.getHasMorePages(), new Boolean(hasMorePages)); + assertNull(root.getStatement()); // because of precision level NORMAL + + // these are tags specific to bound statement + assertNull(root.getKeyspace()); + assertNull(root.getPartitionKey()); + assertNull(root.getTable()); + + // these tags should not be set for request span + assertNull(root.getPeerName()); + assertNull(root.getPeerIP()); + assertNull(root.getPeerPort()); + assertNull(root.getAttemptCount()); + + ArrayList speculativeExecutions = getChildren(spans, root); + assertTrue(speculativeExecutions.size() > 0); + + for (TracingInfo speculativeExecutionSpan : speculativeExecutions) { + assertTrue(speculativeExecutionSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) speculativeExecutionSpan; + + // these tags should not be set for speculative execution span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getFetchSize()); + assertNull(tracingInfo.getHasMorePages()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getPeerName()); + assertNull(tracingInfo.getPeerIP()); + assertNull(tracingInfo.getPeerPort()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // this tag should be set for speculative execution span + assertTrue(tracingInfo.getAttemptCount() >= 1); + } + + ArrayList attempts = new ArrayList(); + for (TracingInfo tracingInfo : speculativeExecutions) { + attempts.addAll(getChildren(spans, tracingInfo)); + } + assertTrue(attempts.size() > 0); + + for (TracingInfo attemptSpan : attempts) { + assertTrue(attemptSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) attemptSpan; + + // these tags should not be set for attempt span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getFetchSize()); + assertNull(tracingInfo.getHasMorePages()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getAttemptCount()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // these tags should be set for attempt span + assertNotNull(tracingInfo.getPeerName()); + assertNotNull(tracingInfo.getPeerIP()); + assertNotNull(tracingInfo.getPeerPort()); + assertTrue(tracingInfo.getPeerPort() >= 0 && tracingInfo.getPeerPort() <= 65535); + } + + spans.clear(); + } + } + + SpanChecks spanChecks = new SpanChecks(); + + try { + ResultSet rs = session.execute(s); + + while (!rs.isFullyFetched()) { + spanChecks.checkAssertions(true); + rs.fetchMoreResults().get(); + } + spanChecks.checkAssertions(false); + + } catch (InterruptedException e) { + assert false : "InterruptedException"; + } catch (ExecutionException e) { + assert false : "ExecutionException"; + } + spanChecks.checkTotalCount(); + } + + private void initializeTestTracing() { + testTracingInfoFactory = new TestTracingInfoFactory(VerbosityLevel.NORMAL); + cluster().setTracingInfoFactory(testTracingInfoFactory); + session = cluster().connect(); + } + + private TracingInfo getRoot(Collection spans) { + TracingInfo root = null; + for (TracingInfo tracingInfo : spans) { + if (tracingInfo instanceof TestTracingInfo + && ((TestTracingInfo) tracingInfo).getParent() == null) { + assertNull(root); // There should be only one root. + root = tracingInfo; + } + } + + return root; + } + + private ArrayList getChildren(Collection spans, TracingInfo parent) { + ArrayList children = new ArrayList(); + for (TracingInfo tracingInfo : spans) { + if (tracingInfo instanceof TestTracingInfo + && ((TestTracingInfo) tracingInfo).getParent() == parent) { + children.add(tracingInfo); + } + } + return children; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java new file mode 100644 index 00000000000..785bd4f5321 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java @@ -0,0 +1,322 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; + +public class TestTracingInfo implements TracingInfo { + + private final VerbosityLevel precision; + private TracingInfo parent = null; + + private boolean spanStarted = false; + private boolean spanFinished = false; + private String spanName; + private ConsistencyLevel consistencyLevel; + private String statement; + private String statementType; + private Collection exceptions; + private StatusCode statusCode; + private String description; + private InetAddress peerIP; + private RetryPolicy retryPolicy; + private LoadBalancingPolicy loadBalancingPolicy; + private SpeculativeExecutionPolicy speculativeExecutionPolicy; + private Integer batchSize; + private Integer attemptCount; + private Integer shardID; + private String peerName; + private Integer peerPort; + private Integer fetchSize; + private Boolean hasMorePages; + private Integer rowsCount; + private String keyspace; + private String boundValues; + private String partitionKey; + private String table; + private String operationType; + private String replicas; + + public TestTracingInfo(VerbosityLevel precision) { + this.precision = precision; + } + + public TestTracingInfo(VerbosityLevel precision, TracingInfo parent) { + this(precision); + this.parent = parent; + } + + public VerbosityLevel getVerbosity() { + return precision; + } + + @Override + public void setNameAndStartTime(String name) { + this.spanStarted = true; + this.spanName = name; + } + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) { + this.consistencyLevel = consistency; + } + + @Override + public void setStatementType(String statementType) { + this.statementType = statementType; + } + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + } + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) { + this.loadBalancingPolicy = loadBalancingPolicy; + } + + @Override + public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy speculativeExecutionPolicy) { + this.speculativeExecutionPolicy = speculativeExecutionPolicy; + } + + @Override + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + @Override + public void setAttemptCount(int attemptCount) { + this.attemptCount = attemptCount; + } + + @Override + public void setShardID(int shardID) { + this.shardID = shardID; + } + + @Override + public void setPeerName(String peerName) { + this.peerName = peerName; + } + + @Override + public void setPeerIP(InetAddress peerIP) { + this.peerIP = peerIP; + } + + @Override + public void setPeerPort(int peerPort) { + this.peerPort = peerPort; + } + + @Override + public void setFetchSize(int fetchSize) { + this.fetchSize = fetchSize; + } + + @Override + public void setHasMorePages(boolean hasMorePages) { + this.hasMorePages = hasMorePages; + } + + @Override + public void setRowsCount(int rowsCount) { + this.rowsCount = rowsCount; + } + + @Override + public void setStatement(String statement, int limit) { + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { + if (statement.length() > limit) statement = statement.substring(0, limit); + this.statement = statement; + } + } + + @Override + public void setKeyspace(String keyspace) { + this.keyspace = keyspace; + } + + @Override + public void setBoundValues(String boundValues) { + this.boundValues = boundValues; + } + + @Override + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + + @Override + public void setTable(String table) { + this.table = table; + } + + @Override + public void setOperationType(String operationType) { + this.operationType = operationType; + } + + @Override + public void setReplicas(String replicas) { + this.replicas = replicas; + } + + @Override + public void recordException(Exception exception) { + if (this.exceptions == null) { + this.exceptions = new ArrayList(); + } + this.exceptions.add(exception); + } + + @Override + public void setStatus(StatusCode code) { + this.statusCode = code; + } + + @Override + public void setStatus(StatusCode code, String description) { + this.statusCode = code; + this.description = description; + } + + @Override + public void tracingFinished() { + this.spanFinished = true; + } + + private boolean currentVerbosityLevelIsAtLeast(VerbosityLevel requiredLevel) { + return requiredLevel.compareTo(precision) <= 0; + } + + public boolean isSpanStarted() { + return spanStarted; + } + + public boolean isSpanFinished() { + return spanFinished; + } + + public String getSpanName() { + return spanName; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public String getStatementType() { + return statementType; + } + + public RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + public LoadBalancingPolicy getLoadBalancingPolicy() { + return loadBalancingPolicy; + } + + public SpeculativeExecutionPolicy getSpeculativeExecutionPolicy() { + return speculativeExecutionPolicy; + } + + public Integer getBatchSize() { + return batchSize; + } + + public Integer getAttemptCount() { + return attemptCount; + } + + public Integer getShardID() { + return shardID; + } + + public String getPeerName() { + return peerName; + } + + public InetAddress getPeerIP() { + return peerIP; + } + + public Integer getPeerPort() { + return peerPort; + } + + public Integer getFetchSize() { + return fetchSize; + } + + public Boolean getHasMorePages() { + return hasMorePages; + } + + public Integer getRowsCount() { + return rowsCount; + } + + public String getStatement() { + return statement; + } + + public String getKeyspace() { + return keyspace; + } + + public String getBoundValues() { + return boundValues; + } + + public String getPartitionKey() { + return partitionKey; + } + + public String getTable() { + return table; + } + + public String getOperationType() { + return operationType; + } + + public String getReplicas() { + return replicas; + } + + public StatusCode getStatusCode() { + return statusCode; + } + + public String getDescription() { + return description; + } + + public TracingInfo getParent() { + return parent; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java new file mode 100644 index 00000000000..f87e380acbb --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +public class TestTracingInfoFactory implements TracingInfoFactory { + private final VerbosityLevel precision; + private Collection spans = + Collections.synchronizedList(new ArrayList()); + + public TestTracingInfoFactory() { + this.precision = VerbosityLevel.NORMAL; + } + + public TestTracingInfoFactory(final VerbosityLevel precision) { + this.precision = precision; + } + + @Override + public TracingInfo buildTracingInfo() { + TracingInfo tracingInfo = new TestTracingInfo(precision); + spans.add(tracingInfo); + return tracingInfo; + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + TracingInfo tracingInfo; + + if (parent instanceof TestTracingInfo) { + final TestTracingInfo castedParent = (TestTracingInfo) parent; + tracingInfo = new TestTracingInfo(castedParent.getVerbosity(), parent); + spans.add(tracingInfo); + return tracingInfo; + } + + tracingInfo = new NoopTracingInfoFactory().buildTracingInfo(); + spans.add(tracingInfo); + return tracingInfo; + } + + public Collection getSpans() { + return spans; + } +} diff --git a/driver-examples/README.md b/driver-examples/README.md index 553cdbc5117..e696e948716 100644 --- a/driver-examples/README.md +++ b/driver-examples/README.md @@ -5,6 +5,17 @@ Apache Cassandra. ## Usage -Unless otherwise stated, all examples assume that you have a single-node Cassandra 3.0 cluster +Unless otherwise stated, all examples assume that you have a single-node Cassandra 3.0 cluster listening on localhost:9042. +Before running examples, make sure you installed repo artifacts (in root driver directory): +``` +mvn install -q -Dmaven.test.skip=true +``` + +To conveniently run the example showing OpenTelemetry integration (ZipkinConfiguration), +you can use the provided ```docker-compose.yaml``` file (which runs both Scylla cluster and Zipkin instance): +``` +docker-compose up +./runOpenTelemetryExample.sh +``` diff --git a/driver-examples/docker-compose.yaml b/driver-examples/docker-compose.yaml new file mode 100644 index 00000000000..1daf0c46e65 --- /dev/null +++ b/driver-examples/docker-compose.yaml @@ -0,0 +1,12 @@ +version: '3.4' + +services: + zipkin: + image: openzipkin/zipkin + ports: + - "9411:9411" + scylla_node: + image: scylladb/scylla + ports: + - "9042:9042" + command: "--smp 1 --skip-wait-for-gossip-to-settle 0" diff --git a/driver-examples/pom.xml b/driver-examples/pom.xml index 5f28572501b..b47107fbcc7 100644 --- a/driver-examples/pom.xml +++ b/driver-examples/pom.xml @@ -48,6 +48,11 @@ true + + com.scylladb + scylla-driver-opentelemetry + + @@ -134,6 +139,26 @@ logback-classic + + + + io.opentelemetry + opentelemetry-sdk + 1.9.1 + + + + io.opentelemetry + opentelemetry-semconv + 1.9.0-alpha + + + + io.opentelemetry + opentelemetry-exporter-zipkin + 1.9.1 + + @@ -184,6 +209,15 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + diff --git a/driver-examples/runOpenTelemetryExample.sh b/driver-examples/runOpenTelemetryExample.sh new file mode 100755 index 00000000000..7603ca64d18 --- /dev/null +++ b/driver-examples/runOpenTelemetryExample.sh @@ -0,0 +1 @@ +mvn -q exec:java -Dexec.mainClass="com.datastax.driver.examples.opentelemetry.ZipkinUsage" diff --git a/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java new file mode 100644 index 00000000000..d959b71f77a --- /dev/null +++ b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.examples.opentelemetry; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; + +/** Example showing how to configure OpenTelemetry for tracing with Scylla Java Driver. */ +class OpenTelemetryConfiguration { + private static final String SERVICE_NAME = "Scylla Java driver"; + + public static OpenTelemetry initialize(SpanExporter spanExporter) { + Resource serviceNameResource = + Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, SERVICE_NAME)); + + // Set to process the spans by the spanExporter. + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).buildAndRegisterGlobal(); + + // Add a shutdown hook to shut down the SDK. + Runtime.getRuntime() + .addShutdownHook( + new Thread( + new Runnable() { + @Override + public void run() { + tracerProvider.close(); + } + })); + + // Return the configured instance so it can be used for instrumentation. + return openTelemetry; + } + + public static OpenTelemetry initializeForZipkin(String ip, int port) { + String endpointPath = "/api/v2/spans"; + String httpUrl = String.format("http://%s:%s", ip, port); + + SpanExporter exporter = + ZipkinSpanExporter.builder().setEndpoint(httpUrl + endpointPath).build(); + + return initialize(exporter); + } +} diff --git a/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java new file mode 100644 index 00000000000..43490c58b3b --- /dev/null +++ b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java @@ -0,0 +1,234 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2021 ScyllaDB + * + * Modified by ScyllaDB + */ +package com.datastax.driver.examples.opentelemetry; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.opentelemetry.OpenTelemetryTracingInfoFactory; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; + +/** + * Creates a keyspace and tables, and loads some data into them. Sends OpenTelemetry tracing data to + * Zipkin tracing backend + * + *

Preconditions: - a Scylla cluster is running and accessible through the contacts points + * identified by CONTACT_POINTS and PORT and Zipkin backend is running and accessible through the + * contacts points identified by ZIPKIN_CONTACT_POINT and ZIPKIN_PORT. + * + *

Side effects: - creates a new keyspace "simplex" in the cluster. If a keyspace with this name + * already exists, it will be reused; - creates two tables "simplex.songs" and "simplex.playlists". + * If they exist already, they will be reused; - inserts a row in each table. + */ +public class ZipkinUsage { + private static final String CONTACT_POINT = "127.0.0.1"; + private static final int PORT = 9042; + + private static final String ZIPKIN_CONTACT_POINT = "127.0.0.1"; + private static final int ZIPKIN_PORT = 9411; + + private Cluster cluster; + private Session session; + + private Tracer tracer; + + public static void main(String[] args) { + // Workaround for setting ContextStorage to ThreadLocalContextStorage. + System.setProperty("io.opentelemetry.context.contextStorageProvider", "default"); + + ZipkinUsage client = new ZipkinUsage(); + + try { + client.connect(); + client.createSchema(); + client.loadData(); + client.querySchema(); + System.out.println( + "All requests have been completed. Now you can visit Zipkin at " + + "http://" + + ZIPKIN_CONTACT_POINT + + ":" + + ZIPKIN_PORT + + " and examine the produced trace."); + } finally { + client.close(); + } + } + + /** Initiates a connection to the cluster. */ + public void connect() { + cluster = Cluster.builder().addContactPoints(CONTACT_POINT).withPort(PORT).build(); + + System.out.printf("Connected to cluster: %s%n", cluster.getMetadata().getClusterName()); + + OpenTelemetry openTelemetry = + OpenTelemetryConfiguration.initializeForZipkin(ZIPKIN_CONTACT_POINT, ZIPKIN_PORT); + tracer = openTelemetry.getTracerProvider().get("this"); + TracingInfoFactory tracingInfoFactory = new OpenTelemetryTracingInfoFactory(tracer); + cluster.setTracingInfoFactory(tracingInfoFactory); + + session = cluster.connect(); + } + + /** Creates the schema (keyspace) and tables for this example. */ + public void createSchema() { + session.execute("DROP KEYSPACE IF EXISTS simplex;"); + Span parentSpan = tracer.spanBuilder("create schema").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + { + Span span = tracer.spanBuilder("create simplex").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute( + "CREATE KEYSPACE IF NOT EXISTS simplex WITH replication " + + "= {'class':'SimpleStrategy', 'replication_factor':1};"); + + } finally { + span.end(); + } + } + { + Span span = tracer.spanBuilder("create simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + ResultSetFuture f = + session.executeAsync( + "CREATE TABLE IF NOT EXISTS simplex.songs (" + + "id uuid," + + "title text," + + "album text," + + "artist text," + + "tags set," + + "data blob," + + "PRIMARY KEY ((title, artist), album)" + + ");"); + f.getUninterruptibly(); + } finally { + span.end(); + } + } + { + Span span = tracer.spanBuilder("create simplex.playlists").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute( + "CREATE TABLE IF NOT EXISTS simplex.playlists (" + + "id uuid," + + "title text," + + "album text, " + + "artist text," + + "song_id uuid," + + "PRIMARY KEY (id, title, album, artist)" + + ");"); + + } finally { + span.end(); + } + } + } finally { + parentSpan.end(); + } + } + + /** Inserts data into the tables. */ + public void loadData() { + Span parentSpan = tracer.spanBuilder("load data").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + + Span prepareSpan = tracer.spanBuilder("prepare").startSpan(); + PreparedStatement ps; + try (Scope prepareScope = prepareSpan.makeCurrent()) { + ps = + session.prepare( + "INSERT INTO simplex.songs (id, title, album, artist, tags) " + + "VALUES (" + + "756716f7-2e54-4715-9f00-91dcbea6cf50," + + "?," + + "?," + + "?," + + "{'jazz', '2013'})" + + ";"); + } finally { + prepareSpan.end(); + } + + Span bindSpan = tracer.spanBuilder("bind").startSpan(); + BoundStatement bound; + try (Scope bindScope = bindSpan.makeCurrent()) { + bound = ps.bind("La Petite Tonkinoise", "Bye Bye Blackbird", "Joséphine Baker"); + } finally { + bindSpan.end(); + } + + Span span = tracer.spanBuilder("insert simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute(bound); + } finally { + span.end(); + } + + } finally { + parentSpan.end(); + } + } + + public void querySchema() { + Span parentSpan = tracer.spanBuilder("query schema").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + + Span prepareSpan = tracer.spanBuilder("prepare").startSpan(); + PreparedStatement ps; + try (Scope prepareScope = prepareSpan.makeCurrent()) { + ps = session.prepare("SELECT * FROM simplex.songs WHERE artist = ? AND title = ?;"); + } finally { + prepareSpan.end(); + } + + Span bindSpan = tracer.spanBuilder("bind").startSpan(); + BoundStatement bound; + try (Scope bindScope = bindSpan.makeCurrent()) { + bound = ps.bind("Joséphine Baker", "La Petite Tonkinoise"); + } finally { + bindSpan.end(); + } + + Span span = tracer.spanBuilder("query simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute(bound); + } finally { + span.end(); + } + + } finally { + parentSpan.end(); + } + } + + /** Closes the session and the cluster. */ + public void close() { + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } +} diff --git a/driver-opentelemetry/pom.xml b/driver-opentelemetry/pom.xml new file mode 100644 index 00000000000..6c59be7b9ad --- /dev/null +++ b/driver-opentelemetry/pom.xml @@ -0,0 +1,149 @@ + + + + + 4.0.0 + + + com.scylladb + scylla-driver-parent + 3.11.2.1-SNAPSHOT + + + scylla-driver-opentelemetry + Java Driver for Scylla and Apache Cassandra - OpenTelemetry integration + An extension of Java Driver for Scylla and Apache Cassandra by adding + functionality of creating traces and spans in OpenTelemetry format. + + + + + + + + io.opentelemetry + opentelemetry-bom + 1.9.1 + pom + import + + + + + + + + + + + + com.scylladb + scylla-driver-core + + + + + + io.opentelemetry + opentelemetry-api + 1.9.1 + + + + + + com.scylladb + scylla-driver-core + test-jar + test + + + + io.opentelemetry + opentelemetry-sdk + test + + + + org.apache.commons + commons-exec + test + + + + org.testng + testng + test + + + + org.slf4j + slf4j-log4j12 + test + + + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + org.codehaus.mojo + animal-sniffer-maven-plugin + 1.15 + + + check-jdk6 + process-classes + + check + + + true + + + + check-jdk8 + + check + + + true + + + + + + + + + + diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java new file mode 100644 index 00000000000..de1463ce293 --- /dev/null +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.opentelemetry; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.TracingInfo; +import com.datastax.driver.core.tracing.VerbosityLevel; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import java.net.InetAddress; + +public class OpenTelemetryTracingInfo implements TracingInfo { + private Span span; + private final Tracer tracer; + private final Context context; + private boolean tracingStarted; + private final VerbosityLevel precision; + + protected OpenTelemetryTracingInfo(Tracer tracer, Context context, VerbosityLevel precision) { + this.tracer = tracer; + this.context = context; + this.precision = precision; + this.tracingStarted = false; + } + + public Tracer getTracer() { + return tracer; + } + + public Context getContext() { + return context.with(span); + } + + private void assertStarted() { + assert tracingStarted : "TracingInfo.setStartTime must be called before any other method"; + } + + public VerbosityLevel getVerbosity() { + return precision; + } + + @Override + public void setNameAndStartTime(String name) { + assert !tracingStarted : "TracingInfo.setStartTime may only be called once."; + tracingStarted = true; + span = tracer.spanBuilder(name).setParent(context).startSpan(); + } + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) { + assertStarted(); + span.setAttribute("db.scylladb.consistency_level", consistency.toString()); + } + + public void setStatement(String statement) { + assertStarted(); + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { + span.setAttribute("db.scylladb.statement", statement); + } + } + + public void setHostname(String hostname) { + assertStarted(); + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { + span.setAttribute("net.peer.name", hostname); + } + } + + @Override + public void setStatementType(String statementType) { + assertStarted(); + span.setAttribute("db.scylladb.statement_type", statementType); + } + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) { + assertStarted(); + span.setAttribute("db.scylladb.retry_policy", retryPolicy.getClass().getSimpleName()); + } + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) { + assertStarted(); + span.setAttribute( + "db.scylladb.load_balancing_policy", loadBalancingPolicy.getClass().getSimpleName()); + } + + @Override + public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy speculativeExecutionPolicy) { + assertStarted(); + span.setAttribute( + "db.scylladb.speculative_execution_policy", + speculativeExecutionPolicy.getClass().getSimpleName()); + } + + @Override + public void setBatchSize(int batchSize) { + assertStarted(); + span.setAttribute("db.scylladb.batch_size", String.valueOf(batchSize)); + } + + @Override + public void setAttemptCount(int attemptCount) { + assertStarted(); + span.setAttribute("db.scylladb.attempt_count", String.valueOf(attemptCount)); + } + + @Override + public void setShardID(int shardID) { + assertStarted(); + span.setAttribute("db.scylladb.shard_id", String.valueOf(shardID)); + } + + @Override + public void setPeerName(String peerName) { + assertStarted(); + span.setAttribute("net.peer.name", peerName); + } + + @Override + public void setPeerIP(InetAddress peerIP) { + assertStarted(); + span.setAttribute("net.peer.ip", peerIP.getHostAddress()); + } + + @Override + public void setPeerPort(int peerPort) { + assertStarted(); + span.setAttribute("net.peer.port", String.valueOf(peerPort)); + } + + @Override + public void setFetchSize(int fetchSize) { + assertStarted(); + span.setAttribute("db.scylladb.fetch_size", Integer.toString(fetchSize)); + } + + @Override + public void setHasMorePages(boolean hasMorePages) { + assertStarted(); + span.setAttribute("db.scylladb.has_more_pages", Boolean.toString(hasMorePages)); + } + + @Override + public void setRowsCount(int rowsCount) { + assertStarted(); + span.setAttribute("db.scylladb.rows_count", Integer.toString(rowsCount)); + } + + @Override + public void setStatement(String statement, int limit) { + assertStarted(); + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { + if (statement.length() > limit) statement = statement.substring(0, limit); + span.setAttribute("db.scylladb.statement", statement); + } + } + + @Override + public void setKeyspace(String keyspace) { + assertStarted(); + span.setAttribute("db.scylladb.keyspace", keyspace); + } + + @Override + public void setBoundValues(String boundValues) { + assertStarted(); + span.setAttribute("db.scylladb.bound_values", boundValues); + } + + @Override + public void setPartitionKey(String partitionKey) { + assertStarted(); + span.setAttribute("db.scylladb.partition_key", partitionKey); + } + + @Override + public void setTable(String table) { + assertStarted(); + span.setAttribute("db.scylladb.table", table); + } + + @Override + public void setOperationType(String operationType) { + assertStarted(); + span.setAttribute("db.operation", operationType); + } + + @Override + public void setReplicas(String replicas) { + assertStarted(); + span.setAttribute("db.scylladb.replicas", replicas); + } + + private io.opentelemetry.api.trace.StatusCode mapStatusCode(StatusCode code) { + switch (code) { + case OK: + return io.opentelemetry.api.trace.StatusCode.OK; + case ERROR: + return io.opentelemetry.api.trace.StatusCode.ERROR; + } + return null; + } + + @Override + public void recordException(Exception exception) { + assertStarted(); + span.recordException(exception); + } + + @Override + public void setStatus(StatusCode code, String description) { + assertStarted(); + span.setStatus(mapStatusCode(code), description); + } + + @Override + public void setStatus(StatusCode code) { + assertStarted(); + span.setStatus(mapStatusCode(code)); + } + + @Override + public void tracingFinished() { + assertStarted(); + span.end(); + } + + private boolean currentVerbosityLevelIsAtLeast(VerbosityLevel requiredLevel) { + return requiredLevel.compareTo(precision) <= 0; + } +} diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java new file mode 100644 index 00000000000..91fab8e9c8a --- /dev/null +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.opentelemetry; + +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfo; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.core.tracing.VerbosityLevel; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; + +public class OpenTelemetryTracingInfoFactory implements TracingInfoFactory { + private final Tracer tracer; + private final VerbosityLevel precision; + + public OpenTelemetryTracingInfoFactory(final Tracer tracer) { + this(tracer, VerbosityLevel.NORMAL); + } + + public OpenTelemetryTracingInfoFactory(final Tracer tracer, final VerbosityLevel precision) { + this.tracer = tracer; + this.precision = precision; + } + + @Override + public TracingInfo buildTracingInfo() { + final Context current = Context.current(); + return new OpenTelemetryTracingInfo(tracer, current, precision); + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + if (parent instanceof OpenTelemetryTracingInfo) { + final OpenTelemetryTracingInfo castedParent = (OpenTelemetryTracingInfo) parent; + return new OpenTelemetryTracingInfo( + castedParent.getTracer(), castedParent.getContext(), castedParent.getVerbosity()); + } + + return new NoopTracingInfoFactory().buildTracingInfo(); + } + + public TracingInfo buildTracingInfo(Span parent) { + final Context current = Context.current().with(parent); + return new OpenTelemetryTracingInfo(tracer, current, precision); + } +} diff --git a/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java new file mode 100644 index 00000000000..4026b360078 --- /dev/null +++ b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java @@ -0,0 +1,889 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.opentelemetry; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.SyntaxError; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.core.tracing.VerbosityLevel; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.testng.annotations.Test; + +/** Tests for OpenTelemetry integration. */ +public class OpenTelemetryTest extends CCMTestsSupport { + /** Collects and saves spans. */ + private static final class BookkeepingSpanProcessor implements SpanProcessor { + final Lock lock = new ReentrantLock(); + final Condition allEnded = lock.newCondition(); + + final Collection startedSpans = new ArrayList<>(); + final Collection spans = new ArrayList<>(); + + int activeSpans = 0; + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) { + lock.lock(); + + startedSpans.add(span); + ++activeSpans; + + lock.unlock(); + } + + @Override + public boolean isStartRequired() { + return true; + } + + @Override + public void onEnd(ReadableSpan span) { + lock.lock(); + + spans.add(span); + --activeSpans; + + if (activeSpans == 0) allEnded.signal(); + + lock.unlock(); + } + + @Override + public boolean isEndRequired() { + return true; + } + + public Collection getSpans() { + lock.lock(); + + try { + while (activeSpans > 0) allEnded.await(); + + for (ReadableSpan span : startedSpans) { + assertTrue(span.hasEnded()); + } + } catch (InterruptedException e) { + assert false; + } finally { + lock.unlock(); + } + + return spans; + } + } + + private Session session; + + /** + * Prepare OpenTelemetry configuration and run test with it. + * + * @param precisionLevel precision level of tracing for the tests. + * @param test test to run. + * @return collected spans. + */ + private Collection collectSpans( + VerbosityLevel precisionLevel, BiConsumer test) { + final Resource serviceNameResource = + Resource.create( + Attributes.of(ResourceAttributes.SERVICE_NAME, "Scylla Java driver - test")); + + final BookkeepingSpanProcessor collector = new BookkeepingSpanProcessor(); + + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(collector) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + final OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); + + final Tracer tracer = openTelemetry.getTracerProvider().get("this"); + final OpenTelemetryTracingInfoFactory tracingInfoFactory = + new OpenTelemetryTracingInfoFactory(tracer, precisionLevel); + cluster().setTracingInfoFactory(tracingInfoFactory); + + session = cluster().connect(); + session.execute("USE " + keyspace); + session.execute("DROP TABLE IF EXISTS t"); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + + BatchStatement bs = new BatchStatement(); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (12, 3)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (1, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (5, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (6, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (7, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (8, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (9, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (10, 7)")); + bs.setConsistencyLevel(ConsistencyLevel.ALL); + session.execute(bs); + + collector.getSpans().clear(); + + test.accept(tracer, tracingInfoFactory); + session.close(); + + tracerProvider.close(); + cluster().setTracingInfoFactory(new NoopTracingInfoFactory()); + + return collector.getSpans(); + } + + private Collection getChildrenOfSpans( + Collection allSpans, Collection parentSpans) { + return allSpans.stream() + .filter( + span -> + parentSpans.stream() + .filter( + parentSpan -> + parentSpan.getSpanContext().equals(span.getParentSpanContext())) + .findAny() + .isPresent()) + .collect(Collectors.toList()); + } + + /** Basic test for creating spans with INSERT statement. */ + @Test(groups = "short") + public void simpleTracingInsertTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("INSERT INTO t(k, v) VALUES (4, 2)"); + session.execute("INSERT INTO t(k, v) VALUES (2, 1)"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 2); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such information with operation INSERT: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with UPDATE statement. */ + @Test(groups = "short") + public void simpleTracingUpdateTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + final BatchStatement batch = new BatchStatement(); + batch.addAll( + new ArrayList() { + { + add("UPDATE t SET v=0 WHERE k=1"); + add("UPDATE t SET v=0 WHERE k=2"); + add("UPDATE t SET v=0 WHERE k=3"); + add("UPDATE t SET v=0 WHERE k=4"); + } + }.stream().map(SimpleStatement::new).collect(Collectors.toList())); + + session.execute(batch); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 1); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 1); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 1); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "batch"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + // tags specific to BatchStatement + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.batch_size")), "4"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in BatchStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such information with operation UPDATE: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with DELETE statement. */ + @Test(groups = "short") + public void simpleTracingDeleteTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + final PreparedStatement ps = session.prepare("DELETE FROM t WHERE k=?"); + + session.execute(ps.bind(7)); + session.execute(ps.bind(8)); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter( + span -> + span.getParentSpanContext().equals(userSpan.getSpanContext()) + && span.toSpanData() + .getAttributes() + .get(AttributeKey.stringKey("db.scylladb.statement_type")) + != null) // to exclude preparation spans + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 2); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "prepared"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + // tags specific to PreparedStatement + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.keyspace")), keyspace); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.table")), "t"); + String partitionKey = tags.get(AttributeKey.stringKey("db.scylladb.partition_key")); + assertTrue(partitionKey.equals("k=7") || partitionKey.equals("k=8")); + String boundValues = tags.get(AttributeKey.stringKey("db.scylladb.bound_values")); + assertTrue(boundValues.equals("k=7") || boundValues.equals("k=8")); + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.db.operation"))); // not supported so far + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in BatchStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such information with operation DELETE: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with TRUNCATE statement. */ + @Test(groups = "short") + public void simpleTracingTruncateTest() { + final Collection spans = + collectSpans( + VerbosityLevel.FULL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("TRUNCATE t"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 1); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 1); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 1); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // present with VerbosityLevel.FULL: + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.statement")), "TRUNCATE t"); + // no such information with operation TRUNCATE: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with SELECT statement. */ + @Test(groups = "short") + public void simpleTracingSelectTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + SimpleStatement s = + new SimpleStatement("SELECT k FROM t WHERE v = 7 ALLOW FILTERING"); + s.setFetchSize(2); + s.setIdempotent(true); + s.setRetryPolicy(FallthroughRetryPolicy.INSTANCE); + s.setConsistencyLevel(ConsistencyLevel.ALL); + + assertEquals(session.execute(s).all().size(), 7); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assert requestSpans.size() >= 4; + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assert speculativeExecutionsSpans.size() >= 4; + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assert attemptSpans.size() >= 4; + + boolean wasNoMorePages = false; + int totalRows = 0; + + for (ReadableSpan requestSpan : requestSpans) { + SpanData spanData = requestSpan.toSpanData(); + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ALL"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.fetch_size")), "2"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.retry_policy")), "FallthroughRetryPolicy"); + + // tags specific for SELECT statement + final String hasMorePages = tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages")); + assertNotNull(hasMorePages); + if (hasMorePages.equals("false")) wasNoMorePages = true; + assertTrue(!(wasNoMorePages && hasMorePages.equals("true"))); + final String rowsCount = tags.get(AttributeKey.stringKey("db.scylladb.rows_count")); + assertNotNull(rowsCount); + totalRows += Integer.parseInt(rowsCount); + + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + } + + assertTrue(wasNoMorePages); + assertEquals(totalRows, 7); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with an erroneous statement. */ + @Test(groups = "short") + public void simpleRequestErrorTracingTest() { + final Collection spans = + collectSpans( + VerbosityLevel.FULL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + try { + session.execute("INSERT ONTO t(k, v) VALUES (4, 2)"); + // ^ syntax error here + assert false; // exception should be thrown before this line is executed + } catch (SyntaxError error) { + // pass + } + + try { + session.execute("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)"); + // ^ too many values + assert false; // exception should be thrown before this line is executed + } catch (InvalidQueryException error) { + // pass + } + + scope.close(); + userSpan.end(); + }); + + // Retrieve span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.ERROR); + final String collectedStatement = + spanData.getAttributes().get(AttributeKey.stringKey("db.scylladb.statement")); + assert collectedStatement.equals("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)") + || collectedStatement.equals("INSERT ONTO t(k, v) VALUES (4, 2)") + : "Bad statement gathered"; + }); + } +} diff --git a/pom.xml b/pom.xml index 307e2df8864..e925b5a3c6e 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,7 @@ driver-examples driver-tests driver-dist + driver-opentelemetry @@ -133,6 +134,12 @@ ${project.parent.version} + + com.scylladb + scylla-driver-opentelemetry + ${project.parent.version} + + com.google.guava guava @@ -727,6 +734,9 @@ java18 1.0 + + io.opentelemetry:* +