From 567fbd8eaf5774f7801a5b7636cc50c5372bf511 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 11:03:12 +0200 Subject: [PATCH 01/10] Tracing: introduced abstract tracing classes In order to be generic and to still stick to Java 6 requirement for driver-core, abstract classes: TracingInfo and TracingInfoFactory are introduced. Further implementations of tracing are expected to be placed apart from driver-core and hence keep Java 8 (or higher) requirements away. --- .../driver/core/tracing/TracingInfo.java | 35 +++++++++++++++++++ .../core/tracing/TracingInfoFactory.java | 31 ++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java create mode 100644 driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java new file mode 100644 index 00000000000..f5914a5bf19 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** + * An abstraction layer over instrumentation library API, corresponding to a logical span in the + * trace. + */ +public interface TracingInfo { + + /** + * Starts a span corresponding to this {@link TracingInfo} object. Must be called exactly once, + * before any other method, at the beginning of the traced execution. + * + * @param name the name given to the span being created. + */ + void setNameAndStartTime(String name); + + /** Must be always called exactly once at the logical end of traced execution. */ + void tracingFinished(); +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java new file mode 100644 index 00000000000..25c20f3d9ec --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** Factory of trace info objects. */ +public interface TracingInfoFactory { + + /** Creates new trace info object, inheriting global context. */ + TracingInfo buildTracingInfo(); + + /** + * Creates new trace info object, inheriting context from provided another trace info object. + * + * @param parent the trace info object to be set as the parent of newly created trace info object. + */ + TracingInfo buildTracingInfo(TracingInfo parent); +} From f3d82159a303b108cbac155d69d01eff4dd2133e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 11:20:30 +0200 Subject: [PATCH 02/10] Tracing: added customizable verbosity level As some gathered data can be heavy or sensitive or we might want to not have them in the trace for some reason, verbosity level layer has been added. Currently, statement is both possibly long and containing some sensitive data, so it is included only on level FULL (and not NORMAL). --- .../driver/core/tracing/VerbosityLevel.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/tracing/VerbosityLevel.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/VerbosityLevel.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/VerbosityLevel.java new file mode 100644 index 00000000000..b37cef5a59b --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/VerbosityLevel.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** The verbosity level of tracing data that is to be collected. May be extended in the future. */ +public enum VerbosityLevel { + // Enum elements must be listed in ascending order of verbosity level (i.e. each next element + // listed adds some new information). + NORMAL, + FULL; +} From 84fc4572bd72afbc9dec59099070296b8ad67a8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 13:06:52 +0200 Subject: [PATCH 03/10] Tracing: added abstract tracing classes to driver While building a Cluster object, one can pass a TracingInfo class implementation and it will be used for tracing in all sessions with this cluster from now on. --- clirr-ignores.xml | 6 +++ .../com/datastax/driver/core/Cluster.java | 23 +++++++++++ .../com/datastax/driver/core/Session.java | 14 +++++++ .../datastax/driver/core/SessionManager.java | 6 +++ .../core/tracing/NoopTracingInfoFactory.java | 40 +++++++++++++++++++ .../datastax/driver/core/AsyncQueryTest.java | 6 +++ .../driver/core/DelegatingClusterTest.java | 3 +- 7 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java diff --git a/clirr-ignores.xml b/clirr-ignores.xml index 9784b61c368..5cf67ffc00c 100644 --- a/clirr-ignores.xml +++ b/clirr-ignores.xml @@ -16,6 +16,12 @@ Modified by ScyllaDB --> + + 7012 + com/datastax/driver/core/Session + com.datastax.driver.core.tracing.TracingInfoFactory getTracingInfoFactory() + New method to get the TracingInfo data factory associated with this Session + 7004 com/datastax/driver/core/Metadata diff --git a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java index cdac5cff4d0..2a7ceeafca1 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java @@ -39,6 +39,8 @@ import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Functions; @@ -146,6 +148,8 @@ public class Cluster implements Closeable { final Manager manager; + private TracingInfoFactory tracingInfoFactory = new NoopTracingInfoFactory(); + /** * Constructs a new Cluster instance. * @@ -197,6 +201,25 @@ private Cluster( this.manager = new Manager(name, contactPoints, configuration, listeners); } + /** + * The tracingInfo factory class used by this Cluster. + * + * @return the factory used currently by this Cluster. + */ + public TracingInfoFactory getTracingInfoFactory() { + return tracingInfoFactory; + } + + /** + * Sets desired factory for tracing information for this Cluster. By default it is {@link + * com.datastax.driver.core.tracing.NoopTracingInfoFactory} + * + * @param tracingInfoFactory the factory to be set for this Cluster. + */ + public void setTracingInfoFactory(TracingInfoFactory tracingInfoFactory) { + this.tracingInfoFactory = tracingInfoFactory; + } + /** * Initialize this Cluster instance. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/Session.java b/driver-core/src/main/java/com/datastax/driver/core/Session.java index a8e4a59b2a8..0e23caa8e78 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Session.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Session.java @@ -13,6 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/* + * Copyright (C) 2021 ScyllaDB + * + * Modified by ScyllaDB + */ package com.datastax.driver.core; import com.datastax.driver.core.exceptions.AuthenticationException; @@ -20,6 +26,7 @@ import com.datastax.driver.core.exceptions.QueryExecutionException; import com.datastax.driver.core.exceptions.QueryValidationException; import com.datastax.driver.core.exceptions.UnsupportedFeatureException; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.google.common.util.concurrent.ListenableFuture; import java.io.Closeable; import java.util.Collection; @@ -41,6 +48,13 @@ */ public interface Session extends Closeable { + /** + * The tracingInfo factory class used by this Session. + * + * @return the factory used currently by this Session. + */ + TracingInfoFactory getTracingInfoFactory(); + /** * The keyspace to which this Session is currently logged in, if any. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java index ada9cc1699c..99dbe520cd3 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java @@ -29,6 +29,7 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; @@ -76,6 +77,11 @@ class SessionManager extends AbstractSession { this.poolsState = new HostConnectionPool.PoolState(); } + @Override + public TracingInfoFactory getTracingInfoFactory() { + return cluster.getTracingInfoFactory(); + } + @Override public Session init() { try { diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java new file mode 100644 index 00000000000..3706c0239aa --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +public class NoopTracingInfoFactory implements TracingInfoFactory { + + private static class NoopTracingInfo implements TracingInfo { + @Override + public void setNameAndStartTime(String name) {} + + @Override + public void tracingFinished() {} + } + + public static final NoopTracingInfo INSTANCE = new NoopTracingInfo(); + + @Override + public TracingInfo buildTracingInfo() { + return INSTANCE; + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + return new NoopTracingInfo(); + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java b/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java index 098164caa88..b4b73356186 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java @@ -27,6 +27,7 @@ import static org.testng.Assert.assertTrue; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.CassandraVersion; import com.google.common.base.Function; import com.google.common.base.Throwables; @@ -286,6 +287,11 @@ public ResultSet execute(Statement statement) { return executeAsync(statement).getUninterruptibly(); } + @Override + public TracingInfoFactory getTracingInfoFactory() { + return session.getTracingInfoFactory(); + } + @Override public String getLoggedKeyspace() { return session.getLoggedKeyspace(); diff --git a/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java b/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java index 0bee5ec9a8d..03d01e18cb4 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java @@ -29,7 +29,8 @@ import org.testng.annotations.Test; public class DelegatingClusterTest { - private static final Set NON_DELEGATED_METHODS = ImmutableSet.of("getClusterName"); + private static final Set NON_DELEGATED_METHODS = + ImmutableSet.of("getClusterName", "setTracingInfoFactory", "getTracingInfoFactory"); /** * Checks that all methods of {@link DelegatingCluster} invoke their counterpart in {@link From 57aa86fecb429486e6da559aee1b83cebee55ecd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 13:33:49 +0200 Subject: [PATCH 04/10] Tracing: added empty spans covering request Every request (in RequestHandler) is covered by one "request" span, and every (speculative) execution has its own "speculative_execution" span. Each retry attempt is covered by "attempt" span, effectively yielding a tree of form: "request" -1--many-> "speculative_execution" -1--many-> "attempt" --- .../datastax/driver/core/RequestHandler.java | 47 +++++++++++++++++-- .../datastax/driver/core/SessionManager.java | 19 ++++++-- 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index a8a05f093f5..aa7ba659032 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -40,6 +40,7 @@ import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.RetryPolicy.RetryDecision.Type; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy.SpeculativeExecutionPlan; +import com.datastax.driver.core.tracing.TracingInfo; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -95,6 +96,8 @@ class RequestHandler { private final AtomicBoolean isDone = new AtomicBoolean(); private final AtomicInteger executionIndex = new AtomicInteger(); + private final TracingInfo tracingInfo; + private Iterator getReplicas( String loggedKeyspace, Statement statement, Iterator fallback) { ProtocolVersion protocolVersion = manager.cluster.manager.protocolVersion(); @@ -120,7 +123,8 @@ private Iterator getReplicas( return replicas.iterator(); } - public RequestHandler(SessionManager manager, Callback callback, Statement statement) { + public RequestHandler( + SessionManager manager, Callback callback, Statement statement, TracingInfo tracingInfo) { this.id = Long.toString(System.identityHashCode(this)); if (logger.isTraceEnabled()) logger.trace("[{}] {}", id, statement); this.manager = manager; @@ -156,6 +160,9 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null; this.startTime = System.nanoTime(); + + this.tracingInfo = tracingInfo; + this.tracingInfo.setNameAndStartTime("request"); } void sendRequest() { @@ -274,6 +281,7 @@ private void setFinalResult( logServerWarnings(response.warnings); } callback.onSet(connection, response, info, statement, System.nanoTime() - startTime); + tracingInfo.tracingFinished(); } catch (Exception e) { callback.onException( connection, @@ -281,6 +289,8 @@ private void setFinalResult( "Unexpected exception while setting final result from " + response, e), System.nanoTime() - startTime, /*unused*/ 0); + + tracingInfo.tracingFinished(); } } @@ -305,6 +315,8 @@ private void setFinalException( cancelPendingExecutions(execution); + tracingInfo.tracingFinished(); + try { if (timerContext != null) timerContext.stop(); } finally { @@ -315,6 +327,7 @@ private void setFinalException( // Triggered when an execution reaches the end of the query plan. // This is only a failure if there are no other running executions. private void reportNoMoreHosts(SpeculativeExecution execution) { + execution.parentTracingInfo.tracingFinished(); runningExecutions.remove(execution); if (runningExecutions.isEmpty()) setFinalException( @@ -383,11 +396,17 @@ class SpeculativeExecution implements Connection.ResponseCallback { private volatile Connection.ResponseHandler connectionHandler; + private final TracingInfo parentTracingInfo; + private TracingInfo currentChildTracingInfo; + SpeculativeExecution(Message.Request request, int position) { this.id = RequestHandler.this.id + "-" + position; this.request = request; this.position = position; this.queryStateRef = new AtomicReference(QueryState.INITIAL); + this.parentTracingInfo = + manager.getTracingInfoFactory().buildTracingInfo(RequestHandler.this.tracingInfo); + this.parentTracingInfo.setNameAndStartTime("speculative_execution"); if (logger.isTraceEnabled()) logger.trace("[{}] Starting", id); } @@ -429,6 +448,9 @@ private boolean query(final Host host) { if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", id, host); + currentChildTracingInfo = manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); + currentChildTracingInfo.setNameAndStartTime("attempt"); + if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true)) scheduleExecution(speculativeExecutionPlan.nextExecution(host)); @@ -647,6 +669,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.tracingFinished(); return; } else if (!previous.inProgress && queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_COMPLETE)) { @@ -659,6 +682,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.tracingFinished(); return; } } @@ -674,6 +698,8 @@ public Message.Request request() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -832,7 +858,10 @@ public void onSet( toPrepare.getQueryKeyspace(), connection.endPoint); - write(connection, prepareAndRetry(toPrepare.getQueryString())); + TracingInfo prepareTracingInfo = + manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); + prepareTracingInfo.setNameAndStartTime("prepare"); + write(connection, prepareAndRetry(toPrepare.getQueryString(), prepareTracingInfo)); // we're done for now, the prepareAndRetry callback will handle the rest return; case READ_FAILURE: @@ -878,7 +907,8 @@ public void onSet( } } - private Connection.ResponseCallback prepareAndRetry(final String toPrepare) { + private Connection.ResponseCallback prepareAndRetry( + final String toPrepare, final TracingInfo prepareTracingInfo) { // do not bother inspecting retry policy at this step, no other decision // makes sense than retry on the same host if the query was prepared, // or on another host, if an error/timeout occurred. @@ -902,6 +932,8 @@ public int retryCount() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -944,11 +976,14 @@ public void onSet( @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); SpeculativeExecution.this.onException(connection, exception, latency, retryCount); } @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -973,6 +1008,8 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) { @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -1010,6 +1047,8 @@ public void onException( @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -1051,10 +1090,12 @@ public int retryCount() { } private void setFinalException(Connection connection, Exception exception) { + parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalException(this, connection, exception); } private void setFinalResult(Connection connection, Message.Response response) { + parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalResult(this, connection, response); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java index 99dbe520cd3..4a4dcb2c159 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java @@ -29,6 +29,7 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.TracingInfo; import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.base.Functions; @@ -161,6 +162,7 @@ public ResultSetFuture executeAsync(final Statement statement) { // Because of the way the future is built, we need another 'proxy' future that we can return // now. final ChainedResultSetFuture chainedFuture = new ChainedResultSetFuture(); + final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo(); this.initAsync() .addListener( new Runnable() { @@ -171,7 +173,7 @@ public void run() { SessionManager.this, cluster.manager.protocolVersion(), makeRequestMessage(statement, null)); - execute(actualFuture, statement); + execute(actualFuture, statement, tracingInfo); chainedFuture.setSource(actualFuture); } }, @@ -712,25 +714,34 @@ else if (fetchSize != Integer.MAX_VALUE) *

This method will find a suitable node to connect to using the {@link LoadBalancingPolicy} * and handle host failover. */ - void execute(final RequestHandler.Callback callback, final Statement statement) { + void execute( + final RequestHandler.Callback callback, + final Statement statement, + final TracingInfo tracingInfo) { if (this.isClosed()) { callback.onException( null, new IllegalStateException("Could not send request, session is closed"), 0, 0); return; } - if (isInit) new RequestHandler(this, callback, statement).sendRequest(); + if (isInit) new RequestHandler(this, callback, statement, tracingInfo).sendRequest(); else this.initAsync() .addListener( new Runnable() { @Override public void run() { - new RequestHandler(SessionManager.this, callback, statement).sendRequest(); + new RequestHandler(SessionManager.this, callback, statement, tracingInfo) + .sendRequest(); } }, executor()); } + void execute(final RequestHandler.Callback callback, final Statement statement) { + final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo(); + execute(callback, statement, tracingInfo); + } + private ListenableFuture prepare( final PreparedStatement statement, EndPoint toExclude) { final String query = statement.getQueryString(); From d41834f538e6dd4f52cdaa6c35622eb6d4a09241 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 14:11:18 +0200 Subject: [PATCH 05/10] Tracing: added request status codes and exceptions After a request is completed, its status code (OK or ERROR) is added to trace data, as well as exception information (if one occured). --- .../datastax/driver/core/RequestHandler.java | 18 ++++++++++++ .../core/tracing/NoopTracingInfoFactory.java | 9 ++++++ .../driver/core/tracing/TracingInfo.java | 28 +++++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index aa7ba659032..6fc2a50125f 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -281,6 +281,11 @@ private void setFinalResult( logServerWarnings(response.warnings); } callback.onSet(connection, response, info, statement, System.nanoTime() - startTime); + + tracingInfo.setStatus( + response.type == Message.Response.Type.ERROR + ? TracingInfo.StatusCode.ERROR + : TracingInfo.StatusCode.OK); tracingInfo.tracingFinished(); } catch (Exception e) { callback.onException( @@ -290,6 +295,8 @@ private void setFinalResult( System.nanoTime() - startTime, /*unused*/ 0); + tracingInfo.recordException(e); + tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, e.toString()); tracingInfo.tracingFinished(); } } @@ -315,6 +322,8 @@ private void setFinalException( cancelPendingExecutions(execution); + tracingInfo.recordException(exception); + tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, exception.toString()); tracingInfo.tracingFinished(); try { @@ -327,6 +336,7 @@ private void setFinalException( // Triggered when an execution reaches the end of the query plan. // This is only a failure if there are no other running executions. private void reportNoMoreHosts(SpeculativeExecution execution) { + execution.parentTracingInfo.setStatus(TracingInfo.StatusCode.ERROR); execution.parentTracingInfo.tracingFinished(); runningExecutions.remove(execution); if (runningExecutions.isEmpty()) @@ -669,6 +679,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setStatus(TracingInfo.StatusCode.OK); parentTracingInfo.tracingFinished(); return; } else if (!previous.inProgress @@ -682,6 +693,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setStatus(TracingInfo.StatusCode.OK); parentTracingInfo.tracingFinished(); return; } @@ -698,6 +710,7 @@ public Message.Request request() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + currentChildTracingInfo.setStatus(TracingInfo.StatusCode.OK); currentChildTracingInfo.tracingFinished(); QueryState queryState = queryStateRef.get(); @@ -1008,6 +1021,8 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) { @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + currentChildTracingInfo.recordException(exception); + currentChildTracingInfo.setStatus(TracingInfo.StatusCode.ERROR); currentChildTracingInfo.tracingFinished(); QueryState queryState = queryStateRef.get(); @@ -1047,6 +1062,7 @@ public void onException( @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + currentChildTracingInfo.setStatus(TracingInfo.StatusCode.ERROR, "timeout"); currentChildTracingInfo.tracingFinished(); QueryState queryState = queryStateRef.get(); @@ -1090,11 +1106,13 @@ public int retryCount() { } private void setFinalException(Connection connection, Exception exception) { + parentTracingInfo.setStatus(TracingInfo.StatusCode.ERROR); parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalException(this, connection, exception); } private void setFinalResult(Connection connection, Message.Response response) { + parentTracingInfo.setStatus(TracingInfo.StatusCode.OK); parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalResult(this, connection, response); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java index 3706c0239aa..a2c39d6cee3 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java @@ -22,6 +22,15 @@ private static class NoopTracingInfo implements TracingInfo { @Override public void setNameAndStartTime(String name) {} + @Override + public void recordException(Exception exception) {} + + @Override + public void setStatus(StatusCode code, String description) {} + + @Override + public void setStatus(StatusCode code) {} + @Override public void tracingFinished() {} } diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java index f5914a5bf19..090b32e41af 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java @@ -22,6 +22,12 @@ */ public interface TracingInfo { + /** Final status of the traced execution. */ + enum StatusCode { + OK, + ERROR, + } + /** * Starts a span corresponding to this {@link TracingInfo} object. Must be called exactly once, * before any other method, at the beginning of the traced execution. @@ -30,6 +36,28 @@ public interface TracingInfo { */ void setNameAndStartTime(String name); + /** + * Records in the trace that the provided exception occured. + * + * @param exception the exception to be recorded. + */ + void recordException(Exception exception); + + /** + * Sets the final status of the traced execution. + * + * @param code the status code to be set. + */ + void setStatus(StatusCode code); + + /** + * Sets the final status of the traced execution, with additional description. + * + * @param code the status code to be set. + * @param description the additional description of the status. + */ + void setStatus(StatusCode code, String description); + /** Must be always called exactly once at the logical end of traced execution. */ void tracingFinished(); } From f40dcfe7369315c797fc203571fe502459811b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Fri, 29 Jul 2022 10:32:00 +0200 Subject: [PATCH 06/10] Tracing: added various request driver-side tags All data that constitute added tags are already know by the driver, i.e. they do not require fetching any additional metadata from the cluster. --- clirr-ignores.xml | 6 + .../datastax/driver/core/BoundStatement.java | 7 + .../com/datastax/driver/core/CodecUtils.java | 15 ++ .../driver/core/DefaultPreparedStatement.java | 8 + .../driver/core/PreparedStatement.java | 3 + .../datastax/driver/core/RequestHandler.java | 110 ++++++++++++ .../core/tracing/NoopTracingInfoFactory.java | 70 ++++++++ .../driver/core/tracing/TracingInfo.java | 156 ++++++++++++++++++ 8 files changed, 375 insertions(+) diff --git a/clirr-ignores.xml b/clirr-ignores.xml index 5cf67ffc00c..fbe39834e5a 100644 --- a/clirr-ignores.xml +++ b/clirr-ignores.xml @@ -16,6 +16,12 @@ Modified by ScyllaDB --> + + 7012 + com/datastax/driver/core/PreparedStatement + java.lang.String getOperationType() + New method to get the type of operation performed by this PreparedStatement + 7012 com/datastax/driver/core/Session diff --git a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java index 9317bd0a58b..cc5117be68c 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java @@ -74,6 +74,8 @@ public class BoundStatement extends Statement private ByteBuffer routingKey; + private final String operationType; + /** * Creates a new {@code BoundStatement} from the provided prepared statement. * @@ -92,6 +94,7 @@ public BoundStatement(PreparedStatement statement) { this.setSerialConsistencyLevel(statement.getSerialConsistencyLevel()); if (statement.isTracing()) this.enableTracing(); if (statement.getRetryPolicy() != null) this.setRetryPolicy(statement.getRetryPolicy()); + this.operationType = statement.getOperationType(); if (statement.getOutgoingPayload() != null) this.setOutgoingPayload(statement.getOutgoingPayload()); else @@ -104,6 +107,10 @@ public BoundStatement(PreparedStatement statement) { } } + public String getOperationType() { + return operationType; + } + @Override public boolean isLWT() { return statement.isLWT(); diff --git a/driver-core/src/main/java/com/datastax/driver/core/CodecUtils.java b/driver-core/src/main/java/com/datastax/driver/core/CodecUtils.java index afaa7176bca..da342d95578 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/CodecUtils.java +++ b/driver-core/src/main/java/com/datastax/driver/core/CodecUtils.java @@ -24,6 +24,8 @@ public final class CodecUtils { private static final long EPOCH_AS_CQL_LONG = (1L << 31); + private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); + private CodecUtils() {} /** @@ -212,6 +214,19 @@ public static long fromDaysSinceEpochToCqlDate(int days) { return ((long) days + EPOCH_AS_CQL_LONG); } + public static String bytesToHex(byte[] bytes) { + final int INITIAL_CHARS = 2; + char[] hexChars = new char[INITIAL_CHARS + bytes.length * 2]; + hexChars[0] = '0'; + hexChars[1] = 'x'; + for (int j = 0; j < bytes.length; j++) { + int v = bytes[j] & 0xFF; + hexChars[INITIAL_CHARS + j * 2] = HEX_ARRAY[v >>> 4]; + hexChars[INITIAL_CHARS + j * 2 + 1] = HEX_ARRAY[v & 0x0F]; + } + return new String(hexChars); + } + private static int sizeOfCollectionSize(ProtocolVersion version) { switch (version) { case V1: diff --git a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java index 287a55cbdb3..6297660e5a2 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java @@ -41,6 +41,7 @@ public class DefaultPreparedStatement implements PreparedStatement { final Cluster cluster; final boolean isLWT; final Token.Factory partitioner; + final String operationType; volatile ByteBuffer routingKey; @@ -66,6 +67,7 @@ private DefaultPreparedStatement( this.cluster = cluster; this.isLWT = isLWT; this.partitioner = partitioner; + this.operationType = null; } static DefaultPreparedStatement fromMessage( @@ -315,4 +317,10 @@ public Boolean isIdempotent() { public boolean isLWT() { return isLWT; } + + /** {@inheritDoc} */ + @Override + public String getOperationType() { + return operationType; + } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java index e87cbc0341b..779c35a1672 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java @@ -361,4 +361,7 @@ public interface PreparedStatement { /** Whether a prepared statement is LWT statement */ public boolean isLWT(); + + /** Type of prepared operation (e.g. SELECT) */ + public String getOperationType(); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index 6fc2a50125f..7c04d67282e 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -48,6 +48,7 @@ import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Iterator; @@ -74,6 +75,10 @@ class RequestHandler { private static final QueryLogger QUERY_LOGGER = QueryLogger.builder().build(); static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS"; + private static final int STATEMENT_MAX_LENGTH = 1000; + private static final int PARTITION_KEY_MAX_LENGTH = 1000; + private static final int BOUND_VALUES_MAX_LENGTH = 1000; + final String id; private final SessionManager manager; @@ -161,8 +166,92 @@ public RequestHandler( this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null; this.startTime = System.nanoTime(); + ConsistencyLevel consistency = statement.getConsistencyLevel(); + if (consistency == null) consistency = Statement.DEFAULT.getConsistencyLevel(); + + String statementType = null; + String statementText = null; + Integer batchSize = null; + + String keyspace = null; + String partitionKey = null; + String boundValues = null; + String table = null; + String operationType = null; + + if (statement instanceof BatchStatement) { + statementType = "batch"; + batchSize = ((BatchStatement) statement).size(); + StringBuilder statementTextBuilder = new StringBuilder(STATEMENT_MAX_LENGTH); + for (Statement subStatement : ((BatchStatement) statement).getStatements()) { + if (subStatement instanceof BoundStatement) + statementTextBuilder.append(((BoundStatement) subStatement).statement.getQueryString()); + else statementTextBuilder.append(subStatement.toString()); + } + statementText = statementTextBuilder.toString(); + } else if (statement instanceof BoundStatement) { + statementType = "prepared"; + statementText = ((BoundStatement) statement).statement.getQueryString(); + keyspace = ((BoundStatement) statement).getKeyspace(); + operationType = ((BoundStatement) statement).getOperationType(); + + ColumnDefinitions boundColumns = + ((BoundStatement) statement).statement.getPreparedId().boundValuesMetadata.variables; + + StringBuilder boundValuesBuilder = new StringBuilder(BOUND_VALUES_MAX_LENGTH); + StringBuilder partitionKeyBuilder = new StringBuilder(PARTITION_KEY_MAX_LENGTH); + int[] rkIndexes = ((BoundStatement) statement).statement.getPreparedId().routingKeyIndexes; + + for (int i = 0; i < boundColumns.size(); ++i) { + Object value = ((BoundStatement) statement).getObject(i); + String valueString = + (value == null) + ? "NULL" + : value instanceof ByteBuffer + ? CodecUtils.bytesToHex(((ByteBuffer) value).array()) + : value.toString(); + String columnName = boundColumns.getName(i); + if (boundValuesBuilder.length() > 0) boundValuesBuilder.append(", "); + boundValuesBuilder.append(columnName); + boundValuesBuilder.append('='); + boundValuesBuilder.append(valueString); + + if (rkIndexes != null) { + for (int j : rkIndexes) { + if (i == j) { + if (partitionKeyBuilder.length() > 0) partitionKeyBuilder.append(", "); + partitionKeyBuilder.append(columnName); + partitionKeyBuilder.append('='); + partitionKeyBuilder.append(valueString); + break; + } + } + } + } + boundValues = boundValuesBuilder.toString(); + partitionKey = partitionKeyBuilder.toString(); + + if (boundColumns.size() > 0) table = boundColumns.getTable(0); + } else if (statement instanceof RegularStatement) { + statementType = "regular"; + statementText = ((RegularStatement) statement).toString(); + } + this.tracingInfo = tracingInfo; this.tracingInfo.setNameAndStartTime("request"); + this.tracingInfo.setConsistencyLevel(consistency); + this.tracingInfo.setRetryPolicy(retryPolicy()); + this.tracingInfo.setLoadBalancingPolicy(manager.loadBalancingPolicy()); + this.tracingInfo.setSpeculativeExecutionPolicy(manager.speculativeExecutionPolicy()); + if (statement.getFetchSize() > 0) this.tracingInfo.setFetchSize(statement.getFetchSize()); + if (statementType != null) this.tracingInfo.setStatementType(statementType); + if (statementText != null) this.tracingInfo.setStatement(statementText, STATEMENT_MAX_LENGTH); + if (batchSize != null) this.tracingInfo.setBatchSize(batchSize); + if (keyspace != null) this.tracingInfo.setKeyspace(keyspace); + if (boundValues != null) this.tracingInfo.setBoundValues(boundValues); + if (partitionKey != null) this.tracingInfo.setPartitionKey(partitionKey); + if (table != null) this.tracingInfo.setTable(table); + if (operationType != null) this.tracingInfo.setOperationType(operationType); } void sendRequest() { @@ -280,6 +369,15 @@ private void setFinalResult( && logger.isWarnEnabled()) { logServerWarnings(response.warnings); } + + if (response.type == Message.Response.Type.RESULT) { + Responses.Result rm = (Responses.Result) response; + if (rm.kind == Responses.Result.Kind.ROWS) { + Responses.Result.Rows r = (Responses.Result.Rows) rm; + tracingInfo.setRowsCount(r.data.size()); + tracingInfo.setHasMorePages(r.metadata.pagingState != null); + } + } callback.onSet(connection, response, info, statement, System.nanoTime() - startTime); tracingInfo.setStatus( @@ -336,6 +434,7 @@ private void setFinalException( // Triggered when an execution reaches the end of the query plan. // This is only a failure if there are no other running executions. private void reportNoMoreHosts(SpeculativeExecution execution) { + execution.parentTracingInfo.setAttemptCount(execution.retryCount() + 1); execution.parentTracingInfo.setStatus(TracingInfo.StatusCode.ERROR); execution.parentTracingInfo.tracingFinished(); runningExecutions.remove(execution); @@ -460,6 +559,10 @@ private boolean query(final Host host) { currentChildTracingInfo = manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); currentChildTracingInfo.setNameAndStartTime("attempt"); + InetSocketAddress hostAddress = host.getEndPoint().resolve(); + currentChildTracingInfo.setPeerName(hostAddress.getHostName()); + currentChildTracingInfo.setPeerIP(hostAddress.getAddress()); + currentChildTracingInfo.setPeerPort(hostAddress.getPort()); if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true)) scheduleExecution(speculativeExecutionPlan.nextExecution(host)); @@ -679,6 +782,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setAttemptCount(previous.retryCount + 1); parentTracingInfo.setStatus(TracingInfo.StatusCode.OK); parentTracingInfo.tracingFinished(); return; @@ -693,6 +797,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setAttemptCount(previous.retryCount + 1); parentTracingInfo.setStatus(TracingInfo.StatusCode.OK); parentTracingInfo.tracingFinished(); return; @@ -710,6 +815,7 @@ public Message.Request request() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); currentChildTracingInfo.setStatus(TracingInfo.StatusCode.OK); currentChildTracingInfo.tracingFinished(); @@ -1021,6 +1127,7 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) { @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); currentChildTracingInfo.recordException(exception); currentChildTracingInfo.setStatus(TracingInfo.StatusCode.ERROR); currentChildTracingInfo.tracingFinished(); @@ -1062,6 +1169,7 @@ public void onException( @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); currentChildTracingInfo.setStatus(TracingInfo.StatusCode.ERROR, "timeout"); currentChildTracingInfo.tracingFinished(); @@ -1106,12 +1214,14 @@ public int retryCount() { } private void setFinalException(Connection connection, Exception exception) { + parentTracingInfo.setAttemptCount(retryCount() + 1); parentTracingInfo.setStatus(TracingInfo.StatusCode.ERROR); parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalException(this, connection, exception); } private void setFinalResult(Connection connection, Message.Response response) { + parentTracingInfo.setAttemptCount(retryCount() + 1); parentTracingInfo.setStatus(TracingInfo.StatusCode.OK); parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalResult(this, connection, response); diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java index a2c39d6cee3..eeef3e30bcb 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java @@ -16,12 +16,82 @@ package com.datastax.driver.core.tracing; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import java.net.InetAddress; + public class NoopTracingInfoFactory implements TracingInfoFactory { private static class NoopTracingInfo implements TracingInfo { @Override public void setNameAndStartTime(String name) {} + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) {} + + @Override + public void setStatementType(String statementType) {} + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) {} + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) {}; + + @Override + public void setSpeculativeExecutionPolicy( + SpeculativeExecutionPolicy speculativeExecutionPolicy) {} + + @Override + public void setBatchSize(int batchSize) {} + + @Override + public void setAttemptCount(int attemptCount) {} + + @Override + public void setShardID(int shardID) {} + + @Override + public void setPeerName(String peerName) {} + + @Override + public void setPeerIP(InetAddress peerIP) {} + + @Override + public void setPeerPort(int peerPort) {} + + @Override + public void setFetchSize(int fetchSize) {} + + @Override + public void setHasMorePages(boolean hasMorePages) {} + + @Override + public void setRowsCount(int rowsCount) {} + + @Override + public void setStatement(String statement, int limit) {} + + @Override + public void setKeyspace(String keyspace) {} + + @Override + public void setBoundValues(String boundValues) {} + + @Override + public void setPartitionKey(String partitionKey) {} + + @Override + public void setTable(String table) {} + + @Override + public void setOperationType(String operationType) {} + + @Override + public void setReplicas(String replicas) {} + @Override public void recordException(Exception exception) {} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java index 090b32e41af..a4c7bf0a26d 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java @@ -16,6 +16,12 @@ package com.datastax.driver.core.tracing; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import java.net.InetAddress; + /** * An abstraction layer over instrumentation library API, corresponding to a logical span in the * trace. @@ -36,6 +42,156 @@ enum StatusCode { */ void setNameAndStartTime(String name); + /** + * Adds provided consistency level to the trace. + * + * @param consistency the consistency level to be set. + */ + void setConsistencyLevel(ConsistencyLevel consistency); + + /** + * Adds provided statement type to the trace. + * + * @param statementType the statement type to be set. + */ + void setStatementType(String statementType); + + /** + * Adds provided retry policy to the trace. + * + * @param retryPolicy the retry policy to be set. + */ + void setRetryPolicy(RetryPolicy retryPolicy); + + /** + * Adds provided load balancing policy to the trace. + * + * @param loadBalancingPolicy the load balancing policy to be set. + */ + void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy); + + /** + * Adds provided speculative execution policy to the trace. + * + * @param speculativeExecutionPolicy the speculative execution policy to be set. + */ + void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy speculativeExecutionPolicy); + + /** + * Adds provided batch size to the trace. + * + * @param batchSize the batch size to be set. + */ + void setBatchSize(int batchSize); + + /** + * Adds provided attempt count to the trace. + * + * @param attemptCount the attempt count to be set. + */ + void setAttemptCount(int attemptCount); + + /** + * Adds provided shard ID to the trace. + * + * @param shardID the shard ID to be set. + */ + void setShardID(int shardID); + + /** + * Adds provided peer name to the trace. + * + * @param peerName the peer name to be set. + */ + void setPeerName(String peerName); + + /** + * Adds provided peer IP to the trace. + * + * @param peerIP the peer IP to be set. + */ + void setPeerIP(InetAddress peerIP); + + /** + * Adds provided peer port to the trace. + * + * @param peerPort the peer port to be set. + */ + void setPeerPort(int peerPort); + + /** + * Adds to the trace maximum number of rows fetched in one query. + * + * @param fetchSize maximum number of rows fetched in one query. + */ + void setFetchSize(int fetchSize); + + /** + * Adds to the trace information whether the query is paged and has more pages pending to be + * fetched. + * + * @param hasMorePages whether the query has more pages to be fetched. + */ + void setHasMorePages(boolean hasMorePages); + + /** + * Adds provided number of returned rows to the trace. + * + * @param rowsCount the number of returned rows to be set. + */ + void setRowsCount(int rowsCount); + + /** + * Adds provided statement text to the trace. If the statement length is greater than the given + * limit, the statement is truncated to the first {@param limit} characters. + * + * @param statement the statement text to be set. + * @param limit the statement length limit. + */ + void setStatement(String statement, int limit); + + /** + * Adds provided keyspace to the trace. + * + * @param keyspace the keyspace to be set. + */ + void setKeyspace(String keyspace); + + /** + * Adds provided bound values string to the trace. + * + * @param boundValues boundValues string to be set. + */ + void setBoundValues(String boundValues); + + /** + * Adds provided partition key string to the trace. + * + * @param partitionKey the partitionKey to be set. + */ + void setPartitionKey(String partitionKey); + + /** + * Adds provided table name to the trace. + * + * @param table the table name to be set. + */ + void setTable(String table); + + /** + * Adds provided operation type (e.g. SELECT) to the trace. + * + * @param operationType the operation type to be set. + */ + void setOperationType(String operationType); + + /** + * Adds provided list of contacted replicas to the trace. + * + * @param replicas the list of contacted replicas to be set. + */ + void setReplicas(String replicas); + /** * Records in the trace that the provided exception occured. * From 3be782a0469c7a1c4290f27443c8689cab6e4f8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 09:49:39 +0200 Subject: [PATCH 07/10] Tracing: added abstract-based unit tests These tests verify that the span tree structure is valid and that spans are filled with tags of proper content, by providing a mock implementation of TracingInfo. --- .../driver/core/tracing/BasicTracingTest.java | 360 ++++++++++++++++++ .../driver/core/tracing/TestTracingInfo.java | 322 ++++++++++++++++ .../core/tracing/TestTracingInfoFactory.java | 62 +++ 3 files changed, 744 insertions(+) create mode 100644 driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java create mode 100644 driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java create mode 100644 driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java new file mode 100644 index 00000000000..57cebceccd2 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java @@ -0,0 +1,360 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import static org.junit.Assert.assertNotNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.policies.NoSpeculativeExecutionPolicy; +import com.datastax.driver.core.policies.PagingOptimizingLoadBalancingPolicy; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import org.testng.annotations.Test; + +public class BasicTracingTest extends CCMTestsSupport { + private static TestTracingInfoFactory testTracingInfoFactory; + private Session session; + + @Override + public void onTestContextInitialized() { + initializeTestTracing(); + session.execute("USE " + keyspace); + session.execute("DROP TABLE IF EXISTS t"); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + session.execute("CREATE TABLE blobs (k int PRIMARY KEY, v blob)"); + session.execute("INSERT INTO t(k, v) VALUES (2, 3)"); + session.execute("INSERT INTO t(k, v) VALUES (1, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (5, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (6, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (7, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (8, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (9, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (10, 7)"); + + Collection spans = testTracingInfoFactory.getSpans(); + spans.clear(); + } + + @Test(groups = "short") + public void simpleTracingTest() { + session.execute("INSERT INTO t(k, v) VALUES (4, 5)"); + + Collection spans = testTracingInfoFactory.getSpans(); + assertNotEquals(spans.size(), 0); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + spans.clear(); + } + + @Test(groups = "short") + public void tagsInsertTest() { + PreparedStatement prepared = session.prepare("INSERT INTO blobs(k, v) VALUES (?, ?)"); + + Collection prepareSpans = testTracingInfoFactory.getSpans(); + assertNotEquals(prepareSpans.size(), 0); + assertTrue(getRoot(prepareSpans) instanceof TestTracingInfo); + prepareSpans.clear(); + + BoundStatement bound = prepared.bind(1, ByteBuffer.wrap("\n\0\n".getBytes())); + session.execute(bound); + + Collection spans = testTracingInfoFactory.getSpans(); + assertNotEquals(spans.size(), 0); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + // these tags should be set for request span + assertEquals(root.getStatementType(), "prepared"); + assertNull(root.getBatchSize()); + assertEquals(root.getConsistencyLevel(), ConsistencyLevel.ONE); + assertNull(root.getRowsCount()); // no rows are returned in INSERT + assertTrue(root.getLoadBalancingPolicy() instanceof PagingOptimizingLoadBalancingPolicy); + assertTrue(root.getSpeculativeExecutionPolicy() instanceof NoSpeculativeExecutionPolicy); + assertTrue(root.getRetryPolicy() instanceof DefaultRetryPolicy); + assertNull(root.getFetchSize()); // fetch size was not explicitly set for this statement + assertNull(root.getHasMorePages()); // no paging are returned in INSERT + assertNull(root.getStatement()); // because of precision level NORMAL + // these are tags specific to bound statement + assertEquals(root.getKeyspace(), keyspace); + assertEquals(root.getBoundValues(), "k=1, v=0x0A000A"); // "\n\0\n" + assertEquals(root.getPartitionKey(), "k=1"); + assertEquals(root.getTable(), "blobs"); + + // these tags should not be set for request span + assertNull(root.getPeerName()); + assertNull(root.getPeerIP()); + assertNull(root.getPeerPort()); + assertNull(root.getAttemptCount()); + + ArrayList speculativeExecutions = getChildren(spans, root); + assertTrue(speculativeExecutions.size() > 0); + + for (TracingInfo speculativeExecutionSpan : speculativeExecutions) { + assertTrue(speculativeExecutionSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) speculativeExecutionSpan; + + // these tags should not be set for speculative execution span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getFetchSize()); + assertNull(tracingInfo.getHasMorePages()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getPeerName()); + assertNull(tracingInfo.getPeerIP()); + assertNull(tracingInfo.getPeerPort()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // this tag should be set for speculative execution span + assertTrue(tracingInfo.getAttemptCount() >= 1); + } + + ArrayList attempts = new ArrayList(); + for (TracingInfo tracingInfo : speculativeExecutions) { + attempts.addAll(getChildren(spans, tracingInfo)); + } + assertTrue(attempts.size() > 0); + + for (TracingInfo attemptSpan : attempts) { + assertTrue(attemptSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) attemptSpan; + + // these tags should not be set for attempt span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getFetchSize()); + assertNull(tracingInfo.getHasMorePages()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getAttemptCount()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // these tags should be set for attempt span + assertNotNull(tracingInfo.getPeerName()); + assertNotNull(tracingInfo.getPeerIP()); + assertNotNull(tracingInfo.getPeerPort()); + assertTrue(tracingInfo.getPeerPort() >= 0 && tracingInfo.getPeerPort() <= 65535); + } + + spans.clear(); + } + + @Test(groups = "short") + public void tagsSelectTest() { + SimpleStatement s = new SimpleStatement("SELECT k FROM t WHERE v = 7 ALLOW FILTERING"); + s.setFetchSize(2); + s.setIdempotent(true); + s.setRetryPolicy(FallthroughRetryPolicy.INSTANCE); + s.setConsistencyLevel(ConsistencyLevel.QUORUM); + + final Collection spans = testTracingInfoFactory.getSpans(); + class SpanChecks { + int totalRows = 0; + + void checkTotalCount() { + assertEquals(totalRows, 7); + } + + void checkAssertions(boolean hasMorePages) { + assertEquals(spans.size(), 3); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + // these tags should be set for request span + assertEquals(root.getStatementType(), "regular"); + assertNull(root.getBatchSize()); + assertEquals(root.getConsistencyLevel(), ConsistencyLevel.QUORUM); + assertNotNull(root.getRowsCount()); + totalRows += root.getRowsCount(); + assertTrue(root.getLoadBalancingPolicy() instanceof PagingOptimizingLoadBalancingPolicy); + assertTrue(root.getSpeculativeExecutionPolicy() instanceof NoSpeculativeExecutionPolicy); + assertTrue(root.getRetryPolicy() == FallthroughRetryPolicy.INSTANCE); + assertEquals(root.getFetchSize(), new Integer(2)); + assertEquals(root.getHasMorePages(), new Boolean(hasMorePages)); + assertNull(root.getStatement()); // because of precision level NORMAL + + // these are tags specific to bound statement + assertNull(root.getKeyspace()); + assertNull(root.getPartitionKey()); + assertNull(root.getTable()); + + // these tags should not be set for request span + assertNull(root.getPeerName()); + assertNull(root.getPeerIP()); + assertNull(root.getPeerPort()); + assertNull(root.getAttemptCount()); + + ArrayList speculativeExecutions = getChildren(spans, root); + assertTrue(speculativeExecutions.size() > 0); + + for (TracingInfo speculativeExecutionSpan : speculativeExecutions) { + assertTrue(speculativeExecutionSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) speculativeExecutionSpan; + + // these tags should not be set for speculative execution span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getFetchSize()); + assertNull(tracingInfo.getHasMorePages()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getPeerName()); + assertNull(tracingInfo.getPeerIP()); + assertNull(tracingInfo.getPeerPort()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // this tag should be set for speculative execution span + assertTrue(tracingInfo.getAttemptCount() >= 1); + } + + ArrayList attempts = new ArrayList(); + for (TracingInfo tracingInfo : speculativeExecutions) { + attempts.addAll(getChildren(spans, tracingInfo)); + } + assertTrue(attempts.size() > 0); + + for (TracingInfo attemptSpan : attempts) { + assertTrue(attemptSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) attemptSpan; + + // these tags should not be set for attempt span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getFetchSize()); + assertNull(tracingInfo.getHasMorePages()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getAttemptCount()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // these tags should be set for attempt span + assertNotNull(tracingInfo.getPeerName()); + assertNotNull(tracingInfo.getPeerIP()); + assertNotNull(tracingInfo.getPeerPort()); + assertTrue(tracingInfo.getPeerPort() >= 0 && tracingInfo.getPeerPort() <= 65535); + } + + spans.clear(); + } + } + + SpanChecks spanChecks = new SpanChecks(); + + try { + ResultSet rs = session.execute(s); + + while (!rs.isFullyFetched()) { + spanChecks.checkAssertions(true); + rs.fetchMoreResults().get(); + } + spanChecks.checkAssertions(false); + + } catch (InterruptedException e) { + assert false : "InterruptedException"; + } catch (ExecutionException e) { + assert false : "ExecutionException"; + } + spanChecks.checkTotalCount(); + } + + private void initializeTestTracing() { + testTracingInfoFactory = new TestTracingInfoFactory(VerbosityLevel.NORMAL); + cluster().setTracingInfoFactory(testTracingInfoFactory); + session = cluster().connect(); + } + + private TracingInfo getRoot(Collection spans) { + TracingInfo root = null; + for (TracingInfo tracingInfo : spans) { + if (tracingInfo instanceof TestTracingInfo + && ((TestTracingInfo) tracingInfo).getParent() == null) { + assertNull(root); // There should be only one root. + root = tracingInfo; + } + } + + return root; + } + + private ArrayList getChildren(Collection spans, TracingInfo parent) { + ArrayList children = new ArrayList(); + for (TracingInfo tracingInfo : spans) { + if (tracingInfo instanceof TestTracingInfo + && ((TestTracingInfo) tracingInfo).getParent() == parent) { + children.add(tracingInfo); + } + } + return children; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java new file mode 100644 index 00000000000..785bd4f5321 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java @@ -0,0 +1,322 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; + +public class TestTracingInfo implements TracingInfo { + + private final VerbosityLevel precision; + private TracingInfo parent = null; + + private boolean spanStarted = false; + private boolean spanFinished = false; + private String spanName; + private ConsistencyLevel consistencyLevel; + private String statement; + private String statementType; + private Collection exceptions; + private StatusCode statusCode; + private String description; + private InetAddress peerIP; + private RetryPolicy retryPolicy; + private LoadBalancingPolicy loadBalancingPolicy; + private SpeculativeExecutionPolicy speculativeExecutionPolicy; + private Integer batchSize; + private Integer attemptCount; + private Integer shardID; + private String peerName; + private Integer peerPort; + private Integer fetchSize; + private Boolean hasMorePages; + private Integer rowsCount; + private String keyspace; + private String boundValues; + private String partitionKey; + private String table; + private String operationType; + private String replicas; + + public TestTracingInfo(VerbosityLevel precision) { + this.precision = precision; + } + + public TestTracingInfo(VerbosityLevel precision, TracingInfo parent) { + this(precision); + this.parent = parent; + } + + public VerbosityLevel getVerbosity() { + return precision; + } + + @Override + public void setNameAndStartTime(String name) { + this.spanStarted = true; + this.spanName = name; + } + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) { + this.consistencyLevel = consistency; + } + + @Override + public void setStatementType(String statementType) { + this.statementType = statementType; + } + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + } + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) { + this.loadBalancingPolicy = loadBalancingPolicy; + } + + @Override + public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy speculativeExecutionPolicy) { + this.speculativeExecutionPolicy = speculativeExecutionPolicy; + } + + @Override + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + @Override + public void setAttemptCount(int attemptCount) { + this.attemptCount = attemptCount; + } + + @Override + public void setShardID(int shardID) { + this.shardID = shardID; + } + + @Override + public void setPeerName(String peerName) { + this.peerName = peerName; + } + + @Override + public void setPeerIP(InetAddress peerIP) { + this.peerIP = peerIP; + } + + @Override + public void setPeerPort(int peerPort) { + this.peerPort = peerPort; + } + + @Override + public void setFetchSize(int fetchSize) { + this.fetchSize = fetchSize; + } + + @Override + public void setHasMorePages(boolean hasMorePages) { + this.hasMorePages = hasMorePages; + } + + @Override + public void setRowsCount(int rowsCount) { + this.rowsCount = rowsCount; + } + + @Override + public void setStatement(String statement, int limit) { + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { + if (statement.length() > limit) statement = statement.substring(0, limit); + this.statement = statement; + } + } + + @Override + public void setKeyspace(String keyspace) { + this.keyspace = keyspace; + } + + @Override + public void setBoundValues(String boundValues) { + this.boundValues = boundValues; + } + + @Override + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + + @Override + public void setTable(String table) { + this.table = table; + } + + @Override + public void setOperationType(String operationType) { + this.operationType = operationType; + } + + @Override + public void setReplicas(String replicas) { + this.replicas = replicas; + } + + @Override + public void recordException(Exception exception) { + if (this.exceptions == null) { + this.exceptions = new ArrayList(); + } + this.exceptions.add(exception); + } + + @Override + public void setStatus(StatusCode code) { + this.statusCode = code; + } + + @Override + public void setStatus(StatusCode code, String description) { + this.statusCode = code; + this.description = description; + } + + @Override + public void tracingFinished() { + this.spanFinished = true; + } + + private boolean currentVerbosityLevelIsAtLeast(VerbosityLevel requiredLevel) { + return requiredLevel.compareTo(precision) <= 0; + } + + public boolean isSpanStarted() { + return spanStarted; + } + + public boolean isSpanFinished() { + return spanFinished; + } + + public String getSpanName() { + return spanName; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public String getStatementType() { + return statementType; + } + + public RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + public LoadBalancingPolicy getLoadBalancingPolicy() { + return loadBalancingPolicy; + } + + public SpeculativeExecutionPolicy getSpeculativeExecutionPolicy() { + return speculativeExecutionPolicy; + } + + public Integer getBatchSize() { + return batchSize; + } + + public Integer getAttemptCount() { + return attemptCount; + } + + public Integer getShardID() { + return shardID; + } + + public String getPeerName() { + return peerName; + } + + public InetAddress getPeerIP() { + return peerIP; + } + + public Integer getPeerPort() { + return peerPort; + } + + public Integer getFetchSize() { + return fetchSize; + } + + public Boolean getHasMorePages() { + return hasMorePages; + } + + public Integer getRowsCount() { + return rowsCount; + } + + public String getStatement() { + return statement; + } + + public String getKeyspace() { + return keyspace; + } + + public String getBoundValues() { + return boundValues; + } + + public String getPartitionKey() { + return partitionKey; + } + + public String getTable() { + return table; + } + + public String getOperationType() { + return operationType; + } + + public String getReplicas() { + return replicas; + } + + public StatusCode getStatusCode() { + return statusCode; + } + + public String getDescription() { + return description; + } + + public TracingInfo getParent() { + return parent; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java new file mode 100644 index 00000000000..f87e380acbb --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +public class TestTracingInfoFactory implements TracingInfoFactory { + private final VerbosityLevel precision; + private Collection spans = + Collections.synchronizedList(new ArrayList()); + + public TestTracingInfoFactory() { + this.precision = VerbosityLevel.NORMAL; + } + + public TestTracingInfoFactory(final VerbosityLevel precision) { + this.precision = precision; + } + + @Override + public TracingInfo buildTracingInfo() { + TracingInfo tracingInfo = new TestTracingInfo(precision); + spans.add(tracingInfo); + return tracingInfo; + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + TracingInfo tracingInfo; + + if (parent instanceof TestTracingInfo) { + final TestTracingInfo castedParent = (TestTracingInfo) parent; + tracingInfo = new TestTracingInfo(castedParent.getVerbosity(), parent); + spans.add(tracingInfo); + return tracingInfo; + } + + tracingInfo = new NoopTracingInfoFactory().buildTracingInfo(); + spans.add(tracingInfo); + return tracingInfo; + } + + public Collection getSpans() { + return spans; + } +} From 14bacd265dc5c6f5d1f27718153de01d8c9cbb04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 11:04:40 +0200 Subject: [PATCH 08/10] Tracing: added OpenTelemetry implementation Added TracingInfo and TracingInfoFactory implementations in separate driver-opentelemetry module. --- driver-opentelemetry/pom.xml | 149 +++++++++++ .../OpenTelemetryTracingInfo.java | 250 ++++++++++++++++++ .../OpenTelemetryTracingInfoFactory.java | 61 +++++ pom.xml | 10 + 4 files changed, 470 insertions(+) create mode 100644 driver-opentelemetry/pom.xml create mode 100644 driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java create mode 100644 driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java diff --git a/driver-opentelemetry/pom.xml b/driver-opentelemetry/pom.xml new file mode 100644 index 00000000000..6c59be7b9ad --- /dev/null +++ b/driver-opentelemetry/pom.xml @@ -0,0 +1,149 @@ + + + + + 4.0.0 + + + com.scylladb + scylla-driver-parent + 3.11.2.1-SNAPSHOT + + + scylla-driver-opentelemetry + Java Driver for Scylla and Apache Cassandra - OpenTelemetry integration + An extension of Java Driver for Scylla and Apache Cassandra by adding + functionality of creating traces and spans in OpenTelemetry format. + + + + + + + + io.opentelemetry + opentelemetry-bom + 1.9.1 + pom + import + + + + + + + + + + + + com.scylladb + scylla-driver-core + + + + + + io.opentelemetry + opentelemetry-api + 1.9.1 + + + + + + com.scylladb + scylla-driver-core + test-jar + test + + + + io.opentelemetry + opentelemetry-sdk + test + + + + org.apache.commons + commons-exec + test + + + + org.testng + testng + test + + + + org.slf4j + slf4j-log4j12 + test + + + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + org.codehaus.mojo + animal-sniffer-maven-plugin + 1.15 + + + check-jdk6 + process-classes + + check + + + true + + + + check-jdk8 + + check + + + true + + + + + + + + + + diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java new file mode 100644 index 00000000000..ad20d82420d --- /dev/null +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.opentelemetry; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.PrecisionLevel; +import com.datastax.driver.core.tracing.TracingInfo; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import java.net.InetAddress; + +public class OpenTelemetryTracingInfo implements TracingInfo { + private Span span; + private final Tracer tracer; + private final Context context; + private boolean tracingStarted; + private final PrecisionLevel precision; + + protected OpenTelemetryTracingInfo(Tracer tracer, Context context, PrecisionLevel precision) { + this.tracer = tracer; + this.context = context; + this.precision = precision; + this.tracingStarted = false; + } + + public Tracer getTracer() { + return tracer; + } + + public Context getContext() { + return context.with(span); + } + + private void assertStarted() { + assert tracingStarted : "TracingInfo.setStartTime must be called before any other method"; + } + + public PrecisionLevel getPrecision() { + return precision; + } + + @Override + public void setNameAndStartTime(String name) { + assert !tracingStarted : "TracingInfo.setStartTime may only be called once."; + tracingStarted = true; + span = tracer.spanBuilder(name).setParent(context).startSpan(); + } + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) { + assertStarted(); + span.setAttribute("db.scylladb.consistency_level", consistency.toString()); + } + + public void setStatement(String statement) { + assertStarted(); + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + span.setAttribute("db.scylladb.statement", statement); + } + } + + public void setHostname(String hostname) { + assertStarted(); + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + span.setAttribute("net.peer.name", hostname); + } + } + + @Override + public void setStatementType(String statementType) { + assertStarted(); + span.setAttribute("db.scylladb.statement_type", statementType); + } + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) { + assertStarted(); + span.setAttribute("db.scylladb.retry_policy", retryPolicy.getClass().getSimpleName()); + } + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) { + assertStarted(); + span.setAttribute( + "db.scylladb.load_balancing_policy", loadBalancingPolicy.getClass().getSimpleName()); + } + + @Override + public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy speculativeExecutionPolicy) { + assertStarted(); + span.setAttribute( + "db.scylladb.speculative_execution_policy", + speculativeExecutionPolicy.getClass().getSimpleName()); + } + + @Override + public void setBatchSize(int batchSize) { + assertStarted(); + span.setAttribute("db.scylladb.batch_size", String.valueOf(batchSize)); + } + + @Override + public void setAttemptCount(int attemptCount) { + assertStarted(); + span.setAttribute("db.scylladb.attempt_count", String.valueOf(attemptCount)); + } + + @Override + public void setShardID(int shardID) { + assertStarted(); + span.setAttribute("db.scylladb.shard_id", String.valueOf(shardID)); + } + + @Override + public void setPeerName(String peerName) { + assertStarted(); + span.setAttribute("net.peer.name", peerName); + } + + @Override + public void setPeerIP(InetAddress peerIP) { + assertStarted(); + span.setAttribute("net.peer.ip", peerIP.getHostAddress()); + } + + @Override + public void setPeerPort(int peerPort) { + assertStarted(); + span.setAttribute("net.peer.port", String.valueOf(peerPort)); + } + + @Override + public void setFetchSize(int fetchSize) { + assertStarted(); + span.setAttribute("db.scylladb.fetch_size", Integer.toString(fetchSize)); + } + + @Override + public void setHasMorePages(boolean hasMorePages) { + assertStarted(); + span.setAttribute("db.scylladb.has_more_pages", Boolean.toString(hasMorePages)); + } + + @Override + public void setRowsCount(int rowsCount) { + assertStarted(); + span.setAttribute("db.scylladb.rows_count", Integer.toString(rowsCount)); + } + + @Override + public void setStatement(String statement, int limit) { + assertStarted(); + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (statement.length() > limit) statement = statement.substring(0, limit); + span.setAttribute("db.scylladb.statement", statement); + } + } + + @Override + public void setKeyspace(String keyspace) { + assertStarted(); + span.setAttribute("db.scylladb.keyspace", keyspace); + } + + @Override + public void setBoundValues(String boundValues) { + assertStarted(); + span.setAttribute("db.scylladb.bound_values", boundValues); + } + + @Override + public void setPartitionKey(String partitionKey) { + assertStarted(); + span.setAttribute("db.scylladb.partition_key", partitionKey); + } + + @Override + public void setTable(String table) { + assertStarted(); + span.setAttribute("db.scylladb.table", table); + } + + @Override + public void setOperationType(String operationType) { + assertStarted(); + span.setAttribute("db.operation", operationType); + } + + @Override + public void setReplicas(String replicas) { + assertStarted(); + span.setAttribute("db.scylladb.replicas", replicas); + } + + private io.opentelemetry.api.trace.StatusCode mapStatusCode(StatusCode code) { + switch (code) { + case OK: + return io.opentelemetry.api.trace.StatusCode.OK; + case ERROR: + return io.opentelemetry.api.trace.StatusCode.ERROR; + } + return null; + } + + @Override + public void recordException(Exception exception) { + assertStarted(); + span.recordException(exception); + } + + @Override + public void setStatus(StatusCode code, String description) { + assertStarted(); + span.setStatus(mapStatusCode(code), description); + } + + @Override + public void setStatus(StatusCode code) { + assertStarted(); + span.setStatus(mapStatusCode(code)); + } + + @Override + public void tracingFinished() { + assertStarted(); + span.end(); + } + + private boolean currentPrecisionLevelIsAtLeast(PrecisionLevel requiredLevel) { + return requiredLevel.compareTo(precision) <= 0; + } +} diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java new file mode 100644 index 00000000000..64ff29f82a9 --- /dev/null +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.opentelemetry; + +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.PrecisionLevel; +import com.datastax.driver.core.tracing.TracingInfo; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; + +public class OpenTelemetryTracingInfoFactory implements TracingInfoFactory { + private final Tracer tracer; + private final PrecisionLevel precision; + + public OpenTelemetryTracingInfoFactory(final Tracer tracer) { + this(tracer, PrecisionLevel.NORMAL); + } + + public OpenTelemetryTracingInfoFactory(final Tracer tracer, final PrecisionLevel precision) { + this.tracer = tracer; + this.precision = precision; + } + + @Override + public TracingInfo buildTracingInfo() { + final Context current = Context.current(); + return new OpenTelemetryTracingInfo(tracer, current, precision); + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + if (parent instanceof OpenTelemetryTracingInfo) { + final OpenTelemetryTracingInfo castedParent = (OpenTelemetryTracingInfo) parent; + return new OpenTelemetryTracingInfo( + castedParent.getTracer(), castedParent.getContext(), castedParent.getPrecision()); + } + + return new NoopTracingInfoFactory().buildTracingInfo(); + } + + public TracingInfo buildTracingInfo(Span parent) { + final Context current = Context.current().with(parent); + return new OpenTelemetryTracingInfo(tracer, current, precision); + } +} diff --git a/pom.xml b/pom.xml index 307e2df8864..e925b5a3c6e 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,7 @@ driver-examples driver-tests driver-dist + driver-opentelemetry @@ -133,6 +134,12 @@ ${project.parent.version} + + com.scylladb + scylla-driver-opentelemetry + ${project.parent.version} + + com.google.guava guava @@ -727,6 +734,9 @@ java18 1.0 + + io.opentelemetry:* + From e30928a7db4f83bc7bdb9dbc925e7748476231a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 11:06:29 +0200 Subject: [PATCH 09/10] Tracing: added OpenTelemetry tracing example Added OpenTelemetry and Zipkin connection configuration classes, along with docker-compose file, example run script and related changes in README. --- driver-examples/README.md | 13 +- driver-examples/docker-compose.yaml | 12 + driver-examples/pom.xml | 34 +++ driver-examples/runOpenTelemetryExample.sh | 1 + .../OpenTelemetryConfiguration.java | 70 ++++++ .../examples/opentelemetry/ZipkinUsage.java | 234 ++++++++++++++++++ 6 files changed, 363 insertions(+), 1 deletion(-) create mode 100644 driver-examples/docker-compose.yaml create mode 100755 driver-examples/runOpenTelemetryExample.sh create mode 100644 driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java create mode 100644 driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java diff --git a/driver-examples/README.md b/driver-examples/README.md index 553cdbc5117..e696e948716 100644 --- a/driver-examples/README.md +++ b/driver-examples/README.md @@ -5,6 +5,17 @@ Apache Cassandra. ## Usage -Unless otherwise stated, all examples assume that you have a single-node Cassandra 3.0 cluster +Unless otherwise stated, all examples assume that you have a single-node Cassandra 3.0 cluster listening on localhost:9042. +Before running examples, make sure you installed repo artifacts (in root driver directory): +``` +mvn install -q -Dmaven.test.skip=true +``` + +To conveniently run the example showing OpenTelemetry integration (ZipkinConfiguration), +you can use the provided ```docker-compose.yaml``` file (which runs both Scylla cluster and Zipkin instance): +``` +docker-compose up +./runOpenTelemetryExample.sh +``` diff --git a/driver-examples/docker-compose.yaml b/driver-examples/docker-compose.yaml new file mode 100644 index 00000000000..1daf0c46e65 --- /dev/null +++ b/driver-examples/docker-compose.yaml @@ -0,0 +1,12 @@ +version: '3.4' + +services: + zipkin: + image: openzipkin/zipkin + ports: + - "9411:9411" + scylla_node: + image: scylladb/scylla + ports: + - "9042:9042" + command: "--smp 1 --skip-wait-for-gossip-to-settle 0" diff --git a/driver-examples/pom.xml b/driver-examples/pom.xml index 5f28572501b..b47107fbcc7 100644 --- a/driver-examples/pom.xml +++ b/driver-examples/pom.xml @@ -48,6 +48,11 @@ true + + com.scylladb + scylla-driver-opentelemetry + + @@ -134,6 +139,26 @@ logback-classic + + + + io.opentelemetry + opentelemetry-sdk + 1.9.1 + + + + io.opentelemetry + opentelemetry-semconv + 1.9.0-alpha + + + + io.opentelemetry + opentelemetry-exporter-zipkin + 1.9.1 + + @@ -184,6 +209,15 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + diff --git a/driver-examples/runOpenTelemetryExample.sh b/driver-examples/runOpenTelemetryExample.sh new file mode 100755 index 00000000000..7603ca64d18 --- /dev/null +++ b/driver-examples/runOpenTelemetryExample.sh @@ -0,0 +1 @@ +mvn -q exec:java -Dexec.mainClass="com.datastax.driver.examples.opentelemetry.ZipkinUsage" diff --git a/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java new file mode 100644 index 00000000000..d959b71f77a --- /dev/null +++ b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.examples.opentelemetry; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; + +/** Example showing how to configure OpenTelemetry for tracing with Scylla Java Driver. */ +class OpenTelemetryConfiguration { + private static final String SERVICE_NAME = "Scylla Java driver"; + + public static OpenTelemetry initialize(SpanExporter spanExporter) { + Resource serviceNameResource = + Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, SERVICE_NAME)); + + // Set to process the spans by the spanExporter. + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).buildAndRegisterGlobal(); + + // Add a shutdown hook to shut down the SDK. + Runtime.getRuntime() + .addShutdownHook( + new Thread( + new Runnable() { + @Override + public void run() { + tracerProvider.close(); + } + })); + + // Return the configured instance so it can be used for instrumentation. + return openTelemetry; + } + + public static OpenTelemetry initializeForZipkin(String ip, int port) { + String endpointPath = "/api/v2/spans"; + String httpUrl = String.format("http://%s:%s", ip, port); + + SpanExporter exporter = + ZipkinSpanExporter.builder().setEndpoint(httpUrl + endpointPath).build(); + + return initialize(exporter); + } +} diff --git a/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java new file mode 100644 index 00000000000..43490c58b3b --- /dev/null +++ b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java @@ -0,0 +1,234 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2021 ScyllaDB + * + * Modified by ScyllaDB + */ +package com.datastax.driver.examples.opentelemetry; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.opentelemetry.OpenTelemetryTracingInfoFactory; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; + +/** + * Creates a keyspace and tables, and loads some data into them. Sends OpenTelemetry tracing data to + * Zipkin tracing backend + * + *

Preconditions: - a Scylla cluster is running and accessible through the contacts points + * identified by CONTACT_POINTS and PORT and Zipkin backend is running and accessible through the + * contacts points identified by ZIPKIN_CONTACT_POINT and ZIPKIN_PORT. + * + *

Side effects: - creates a new keyspace "simplex" in the cluster. If a keyspace with this name + * already exists, it will be reused; - creates two tables "simplex.songs" and "simplex.playlists". + * If they exist already, they will be reused; - inserts a row in each table. + */ +public class ZipkinUsage { + private static final String CONTACT_POINT = "127.0.0.1"; + private static final int PORT = 9042; + + private static final String ZIPKIN_CONTACT_POINT = "127.0.0.1"; + private static final int ZIPKIN_PORT = 9411; + + private Cluster cluster; + private Session session; + + private Tracer tracer; + + public static void main(String[] args) { + // Workaround for setting ContextStorage to ThreadLocalContextStorage. + System.setProperty("io.opentelemetry.context.contextStorageProvider", "default"); + + ZipkinUsage client = new ZipkinUsage(); + + try { + client.connect(); + client.createSchema(); + client.loadData(); + client.querySchema(); + System.out.println( + "All requests have been completed. Now you can visit Zipkin at " + + "http://" + + ZIPKIN_CONTACT_POINT + + ":" + + ZIPKIN_PORT + + " and examine the produced trace."); + } finally { + client.close(); + } + } + + /** Initiates a connection to the cluster. */ + public void connect() { + cluster = Cluster.builder().addContactPoints(CONTACT_POINT).withPort(PORT).build(); + + System.out.printf("Connected to cluster: %s%n", cluster.getMetadata().getClusterName()); + + OpenTelemetry openTelemetry = + OpenTelemetryConfiguration.initializeForZipkin(ZIPKIN_CONTACT_POINT, ZIPKIN_PORT); + tracer = openTelemetry.getTracerProvider().get("this"); + TracingInfoFactory tracingInfoFactory = new OpenTelemetryTracingInfoFactory(tracer); + cluster.setTracingInfoFactory(tracingInfoFactory); + + session = cluster.connect(); + } + + /** Creates the schema (keyspace) and tables for this example. */ + public void createSchema() { + session.execute("DROP KEYSPACE IF EXISTS simplex;"); + Span parentSpan = tracer.spanBuilder("create schema").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + { + Span span = tracer.spanBuilder("create simplex").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute( + "CREATE KEYSPACE IF NOT EXISTS simplex WITH replication " + + "= {'class':'SimpleStrategy', 'replication_factor':1};"); + + } finally { + span.end(); + } + } + { + Span span = tracer.spanBuilder("create simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + ResultSetFuture f = + session.executeAsync( + "CREATE TABLE IF NOT EXISTS simplex.songs (" + + "id uuid," + + "title text," + + "album text," + + "artist text," + + "tags set," + + "data blob," + + "PRIMARY KEY ((title, artist), album)" + + ");"); + f.getUninterruptibly(); + } finally { + span.end(); + } + } + { + Span span = tracer.spanBuilder("create simplex.playlists").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute( + "CREATE TABLE IF NOT EXISTS simplex.playlists (" + + "id uuid," + + "title text," + + "album text, " + + "artist text," + + "song_id uuid," + + "PRIMARY KEY (id, title, album, artist)" + + ");"); + + } finally { + span.end(); + } + } + } finally { + parentSpan.end(); + } + } + + /** Inserts data into the tables. */ + public void loadData() { + Span parentSpan = tracer.spanBuilder("load data").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + + Span prepareSpan = tracer.spanBuilder("prepare").startSpan(); + PreparedStatement ps; + try (Scope prepareScope = prepareSpan.makeCurrent()) { + ps = + session.prepare( + "INSERT INTO simplex.songs (id, title, album, artist, tags) " + + "VALUES (" + + "756716f7-2e54-4715-9f00-91dcbea6cf50," + + "?," + + "?," + + "?," + + "{'jazz', '2013'})" + + ";"); + } finally { + prepareSpan.end(); + } + + Span bindSpan = tracer.spanBuilder("bind").startSpan(); + BoundStatement bound; + try (Scope bindScope = bindSpan.makeCurrent()) { + bound = ps.bind("La Petite Tonkinoise", "Bye Bye Blackbird", "Joséphine Baker"); + } finally { + bindSpan.end(); + } + + Span span = tracer.spanBuilder("insert simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute(bound); + } finally { + span.end(); + } + + } finally { + parentSpan.end(); + } + } + + public void querySchema() { + Span parentSpan = tracer.spanBuilder("query schema").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + + Span prepareSpan = tracer.spanBuilder("prepare").startSpan(); + PreparedStatement ps; + try (Scope prepareScope = prepareSpan.makeCurrent()) { + ps = session.prepare("SELECT * FROM simplex.songs WHERE artist = ? AND title = ?;"); + } finally { + prepareSpan.end(); + } + + Span bindSpan = tracer.spanBuilder("bind").startSpan(); + BoundStatement bound; + try (Scope bindScope = bindSpan.makeCurrent()) { + bound = ps.bind("Joséphine Baker", "La Petite Tonkinoise"); + } finally { + bindSpan.end(); + } + + Span span = tracer.spanBuilder("query simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute(bound); + } finally { + span.end(); + } + + } finally { + parentSpan.end(); + } + } + + /** Closes the session and the cluster. */ + public void close() { + if (session != null) session.close(); + if (cluster != null) cluster.close(); + } +} From cc23cebd5f29e498f27fe39a97c7552db8452955 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 11:08:15 +0200 Subject: [PATCH 10/10] Tracing: added OpenTelemetry integration test The test verifies that span tree structure and status code are valid. Speculative executions are run parallel to the main thread, so some of them can finish only after query result has been returned. Thus, in order to collect span data from entire request, we decided to wait until all speculative executions end. The main thread uses conditional variable `allEnded` to wait for them and lock is used for concurrent mutation of activeSpans. --- .../OpenTelemetryTracingInfo.java | 16 +- .../OpenTelemetryTracingInfoFactory.java | 10 +- .../opentelemetry/OpenTelemetryTest.java | 889 ++++++++++++++++++ 3 files changed, 902 insertions(+), 13 deletions(-) create mode 100644 driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java index ad20d82420d..de1463ce293 100644 --- a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java @@ -20,8 +20,8 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; -import com.datastax.driver.core.tracing.PrecisionLevel; import com.datastax.driver.core.tracing.TracingInfo; +import com.datastax.driver.core.tracing.VerbosityLevel; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; @@ -32,9 +32,9 @@ public class OpenTelemetryTracingInfo implements TracingInfo { private final Tracer tracer; private final Context context; private boolean tracingStarted; - private final PrecisionLevel precision; + private final VerbosityLevel precision; - protected OpenTelemetryTracingInfo(Tracer tracer, Context context, PrecisionLevel precision) { + protected OpenTelemetryTracingInfo(Tracer tracer, Context context, VerbosityLevel precision) { this.tracer = tracer; this.context = context; this.precision = precision; @@ -53,7 +53,7 @@ private void assertStarted() { assert tracingStarted : "TracingInfo.setStartTime must be called before any other method"; } - public PrecisionLevel getPrecision() { + public VerbosityLevel getVerbosity() { return precision; } @@ -72,14 +72,14 @@ public void setConsistencyLevel(ConsistencyLevel consistency) { public void setStatement(String statement) { assertStarted(); - if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { span.setAttribute("db.scylladb.statement", statement); } } public void setHostname(String hostname) { assertStarted(); - if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { span.setAttribute("net.peer.name", hostname); } } @@ -168,7 +168,7 @@ public void setRowsCount(int rowsCount) { @Override public void setStatement(String statement, int limit) { assertStarted(); - if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (currentVerbosityLevelIsAtLeast(VerbosityLevel.FULL)) { if (statement.length() > limit) statement = statement.substring(0, limit); span.setAttribute("db.scylladb.statement", statement); } @@ -244,7 +244,7 @@ public void tracingFinished() { span.end(); } - private boolean currentPrecisionLevelIsAtLeast(PrecisionLevel requiredLevel) { + private boolean currentVerbosityLevelIsAtLeast(VerbosityLevel requiredLevel) { return requiredLevel.compareTo(precision) <= 0; } } diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java index 64ff29f82a9..91fab8e9c8a 100644 --- a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java @@ -17,22 +17,22 @@ package com.datastax.driver.opentelemetry; import com.datastax.driver.core.tracing.NoopTracingInfoFactory; -import com.datastax.driver.core.tracing.PrecisionLevel; import com.datastax.driver.core.tracing.TracingInfo; import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.core.tracing.VerbosityLevel; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; public class OpenTelemetryTracingInfoFactory implements TracingInfoFactory { private final Tracer tracer; - private final PrecisionLevel precision; + private final VerbosityLevel precision; public OpenTelemetryTracingInfoFactory(final Tracer tracer) { - this(tracer, PrecisionLevel.NORMAL); + this(tracer, VerbosityLevel.NORMAL); } - public OpenTelemetryTracingInfoFactory(final Tracer tracer, final PrecisionLevel precision) { + public OpenTelemetryTracingInfoFactory(final Tracer tracer, final VerbosityLevel precision) { this.tracer = tracer; this.precision = precision; } @@ -48,7 +48,7 @@ public TracingInfo buildTracingInfo(TracingInfo parent) { if (parent instanceof OpenTelemetryTracingInfo) { final OpenTelemetryTracingInfo castedParent = (OpenTelemetryTracingInfo) parent; return new OpenTelemetryTracingInfo( - castedParent.getTracer(), castedParent.getContext(), castedParent.getPrecision()); + castedParent.getTracer(), castedParent.getContext(), castedParent.getVerbosity()); } return new NoopTracingInfoFactory().buildTracingInfo(); diff --git a/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java new file mode 100644 index 00000000000..4026b360078 --- /dev/null +++ b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java @@ -0,0 +1,889 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.opentelemetry; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.SyntaxError; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.core.tracing.VerbosityLevel; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.testng.annotations.Test; + +/** Tests for OpenTelemetry integration. */ +public class OpenTelemetryTest extends CCMTestsSupport { + /** Collects and saves spans. */ + private static final class BookkeepingSpanProcessor implements SpanProcessor { + final Lock lock = new ReentrantLock(); + final Condition allEnded = lock.newCondition(); + + final Collection startedSpans = new ArrayList<>(); + final Collection spans = new ArrayList<>(); + + int activeSpans = 0; + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) { + lock.lock(); + + startedSpans.add(span); + ++activeSpans; + + lock.unlock(); + } + + @Override + public boolean isStartRequired() { + return true; + } + + @Override + public void onEnd(ReadableSpan span) { + lock.lock(); + + spans.add(span); + --activeSpans; + + if (activeSpans == 0) allEnded.signal(); + + lock.unlock(); + } + + @Override + public boolean isEndRequired() { + return true; + } + + public Collection getSpans() { + lock.lock(); + + try { + while (activeSpans > 0) allEnded.await(); + + for (ReadableSpan span : startedSpans) { + assertTrue(span.hasEnded()); + } + } catch (InterruptedException e) { + assert false; + } finally { + lock.unlock(); + } + + return spans; + } + } + + private Session session; + + /** + * Prepare OpenTelemetry configuration and run test with it. + * + * @param precisionLevel precision level of tracing for the tests. + * @param test test to run. + * @return collected spans. + */ + private Collection collectSpans( + VerbosityLevel precisionLevel, BiConsumer test) { + final Resource serviceNameResource = + Resource.create( + Attributes.of(ResourceAttributes.SERVICE_NAME, "Scylla Java driver - test")); + + final BookkeepingSpanProcessor collector = new BookkeepingSpanProcessor(); + + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(collector) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + final OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); + + final Tracer tracer = openTelemetry.getTracerProvider().get("this"); + final OpenTelemetryTracingInfoFactory tracingInfoFactory = + new OpenTelemetryTracingInfoFactory(tracer, precisionLevel); + cluster().setTracingInfoFactory(tracingInfoFactory); + + session = cluster().connect(); + session.execute("USE " + keyspace); + session.execute("DROP TABLE IF EXISTS t"); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + + BatchStatement bs = new BatchStatement(); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (12, 3)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (1, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (5, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (6, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (7, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (8, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (9, 7)")); + bs.add(new SimpleStatement("INSERT INTO t(k, v) VALUES (10, 7)")); + bs.setConsistencyLevel(ConsistencyLevel.ALL); + session.execute(bs); + + collector.getSpans().clear(); + + test.accept(tracer, tracingInfoFactory); + session.close(); + + tracerProvider.close(); + cluster().setTracingInfoFactory(new NoopTracingInfoFactory()); + + return collector.getSpans(); + } + + private Collection getChildrenOfSpans( + Collection allSpans, Collection parentSpans) { + return allSpans.stream() + .filter( + span -> + parentSpans.stream() + .filter( + parentSpan -> + parentSpan.getSpanContext().equals(span.getParentSpanContext())) + .findAny() + .isPresent()) + .collect(Collectors.toList()); + } + + /** Basic test for creating spans with INSERT statement. */ + @Test(groups = "short") + public void simpleTracingInsertTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("INSERT INTO t(k, v) VALUES (4, 2)"); + session.execute("INSERT INTO t(k, v) VALUES (2, 1)"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 2); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such information with operation INSERT: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with UPDATE statement. */ + @Test(groups = "short") + public void simpleTracingUpdateTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + final BatchStatement batch = new BatchStatement(); + batch.addAll( + new ArrayList() { + { + add("UPDATE t SET v=0 WHERE k=1"); + add("UPDATE t SET v=0 WHERE k=2"); + add("UPDATE t SET v=0 WHERE k=3"); + add("UPDATE t SET v=0 WHERE k=4"); + } + }.stream().map(SimpleStatement::new).collect(Collectors.toList())); + + session.execute(batch); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 1); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 1); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 1); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "batch"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + // tags specific to BatchStatement + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.batch_size")), "4"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in BatchStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such information with operation UPDATE: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with DELETE statement. */ + @Test(groups = "short") + public void simpleTracingDeleteTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + final PreparedStatement ps = session.prepare("DELETE FROM t WHERE k=?"); + + session.execute(ps.bind(7)); + session.execute(ps.bind(8)); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter( + span -> + span.getParentSpanContext().equals(userSpan.getSpanContext()) + && span.toSpanData() + .getAttributes() + .get(AttributeKey.stringKey("db.scylladb.statement_type")) + != null) // to exclude preparation spans + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 2); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "prepared"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + // tags specific to PreparedStatement + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.keyspace")), keyspace); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.table")), "t"); + String partitionKey = tags.get(AttributeKey.stringKey("db.scylladb.partition_key")); + assertTrue(partitionKey.equals("k=7") || partitionKey.equals("k=8")); + String boundValues = tags.get(AttributeKey.stringKey("db.scylladb.bound_values")); + assertTrue(boundValues.equals("k=7") || boundValues.equals("k=8")); + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.db.operation"))); // not supported so far + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in BatchStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such information with operation DELETE: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with TRUNCATE statement. */ + @Test(groups = "short") + public void simpleTracingTruncateTest() { + final Collection spans = + collectSpans( + VerbosityLevel.FULL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("TRUNCATE t"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 1); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 1); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 1); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylladb.fetch_size"))); // was not set explicitly + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // present with VerbosityLevel.FULL: + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.statement")), "TRUNCATE t"); + // no such information with operation TRUNCATE: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with SELECT statement. */ + @Test(groups = "short") + public void simpleTracingSelectTest() { + final Collection spans = + collectSpans( + VerbosityLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + SimpleStatement s = + new SimpleStatement("SELECT k FROM t WHERE v = 7 ALLOW FILTERING"); + s.setFetchSize(2); + s.setIdempotent(true); + s.setRetryPolicy(FallthroughRetryPolicy.INSTANCE); + s.setConsistencyLevel(ConsistencyLevel.ALL); + + assertEquals(session.execute(s).all().size(), 7); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assert requestSpans.size() >= 4; + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assert speculativeExecutionsSpans.size() >= 4; + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assert attemptSpans.size() >= 4; + + boolean wasNoMorePages = false; + int totalRows = 0; + + for (ReadableSpan requestSpan : requestSpans) { + SpanData spanData = requestSpan.toSpanData(); + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.consistency_level")), "ALL"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.fetch_size")), "2"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylladb.retry_policy")), "FallthroughRetryPolicy"); + + // tags specific for SELECT statement + final String hasMorePages = tags.get(AttributeKey.stringKey("db.scylladb.has_more_pages")); + assertNotNull(hasMorePages); + if (hasMorePages.equals("false")) wasNoMorePages = true; + assertTrue(!(wasNoMorePages && hasMorePages.equals("true"))); + final String rowsCount = tags.get(AttributeKey.stringKey("db.scylladb.rows_count")); + assertNotNull(rowsCount); + totalRows += Integer.parseInt(rowsCount); + + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.db.operation"))); + // no such information with VerbosityLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.statement"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + } + + assertTrue(wasNoMorePages); + assertEquals(totalRows, 7); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylladb.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylladb.shard_id"))); + }); + } + + /** Basic test for creating spans with an erroneous statement. */ + @Test(groups = "short") + public void simpleRequestErrorTracingTest() { + final Collection spans = + collectSpans( + VerbosityLevel.FULL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + try { + session.execute("INSERT ONTO t(k, v) VALUES (4, 2)"); + // ^ syntax error here + assert false; // exception should be thrown before this line is executed + } catch (SyntaxError error) { + // pass + } + + try { + session.execute("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)"); + // ^ too many values + assert false; // exception should be thrown before this line is executed + } catch (InvalidQueryException error) { + // pass + } + + scope.close(); + userSpan.end(); + }); + + // Retrieve span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.ERROR); + final String collectedStatement = + spanData.getAttributes().get(AttributeKey.stringKey("db.scylladb.statement")); + assert collectedStatement.equals("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)") + || collectedStatement.equals("INSERT ONTO t(k, v) VALUES (4, 2)") + : "Bad statement gathered"; + }); + } +}