From 4a6534b91cc14e01fe81eba5fe107f50784a9170 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 11:03:12 +0200 Subject: [PATCH 01/11] Added abstract tracing classes. In order to be generic and to still stick to Java 6 requirement for driver-core, abstract classes: TracingInfo and TracingInfoFactory are introduced. Further implementations of tracing are expected to be placed apart from driver-core and hence keep Java 8 (or higher) requirements away. --- .../driver/core/tracing/TracingInfo.java | 35 +++++++++++++++++++ .../core/tracing/TracingInfoFactory.java | 31 ++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java create mode 100644 driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java new file mode 100644 index 00000000000..f5914a5bf19 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** + * An abstraction layer over instrumentation library API, corresponding to a logical span in the + * trace. + */ +public interface TracingInfo { + + /** + * Starts a span corresponding to this {@link TracingInfo} object. Must be called exactly once, + * before any other method, at the beginning of the traced execution. + * + * @param name the name given to the span being created. + */ + void setNameAndStartTime(String name); + + /** Must be always called exactly once at the logical end of traced execution. */ + void tracingFinished(); +} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java new file mode 100644 index 00000000000..25c20f3d9ec --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfoFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** Factory of trace info objects. */ +public interface TracingInfoFactory { + + /** Creates new trace info object, inheriting global context. */ + TracingInfo buildTracingInfo(); + + /** + * Creates new trace info object, inheriting context from provided another trace info object. + * + * @param parent the trace info object to be set as the parent of newly created trace info object. + */ + TracingInfo buildTracingInfo(TracingInfo parent); +} From b8d43dac79748d53e3d4c6fd6d0d5554427fc9d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 11:20:30 +0200 Subject: [PATCH 02/11] Added customizable precision level for tracing. --- .../driver/core/tracing/PrecisionLevel.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/tracing/PrecisionLevel.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/PrecisionLevel.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/PrecisionLevel.java new file mode 100644 index 00000000000..e2e13f541d8 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/PrecisionLevel.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +/** The precision level of tracing data that is to be collected. May be extended in the future. */ +public enum PrecisionLevel { + // Enum elements must be listed in ascending order of precision level (i.e. each next element + // listed adds some new information). + NORMAL, + FULL; +} From d77492b18c1e02c817e827430c7b5af9441dd7ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 13:06:52 +0200 Subject: [PATCH 03/11] Added abstract tracing to Cluster and Session. While building a Cluster object, one can pass a TracingInfo class implementation and it will be used for tracing in all sessions with this cluster from now on. --- clirr-ignores.xml | 6 +++ .../com/datastax/driver/core/Cluster.java | 23 +++++++++++ .../com/datastax/driver/core/Session.java | 14 +++++++ .../datastax/driver/core/SessionManager.java | 6 +++ .../core/tracing/NoopTracingInfoFactory.java | 40 +++++++++++++++++++ .../datastax/driver/core/AsyncQueryTest.java | 6 +++ .../driver/core/DelegatingClusterTest.java | 3 +- 7 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java diff --git a/clirr-ignores.xml b/clirr-ignores.xml index 9784b61c368..5cf67ffc00c 100644 --- a/clirr-ignores.xml +++ b/clirr-ignores.xml @@ -16,6 +16,12 @@ Modified by ScyllaDB --> + + 7012 + com/datastax/driver/core/Session + com.datastax.driver.core.tracing.TracingInfoFactory getTracingInfoFactory() + New method to get the TracingInfo data factory associated with this Session + 7004 com/datastax/driver/core/Metadata diff --git a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java index cdac5cff4d0..2a7ceeafca1 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Cluster.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Cluster.java @@ -39,6 +39,8 @@ import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Functions; @@ -146,6 +148,8 @@ public class Cluster implements Closeable { final Manager manager; + private TracingInfoFactory tracingInfoFactory = new NoopTracingInfoFactory(); + /** * Constructs a new Cluster instance. * @@ -197,6 +201,25 @@ private Cluster( this.manager = new Manager(name, contactPoints, configuration, listeners); } + /** + * The tracingInfo factory class used by this Cluster. + * + * @return the factory used currently by this Cluster. + */ + public TracingInfoFactory getTracingInfoFactory() { + return tracingInfoFactory; + } + + /** + * Sets desired factory for tracing information for this Cluster. By default it is {@link + * com.datastax.driver.core.tracing.NoopTracingInfoFactory} + * + * @param tracingInfoFactory the factory to be set for this Cluster. + */ + public void setTracingInfoFactory(TracingInfoFactory tracingInfoFactory) { + this.tracingInfoFactory = tracingInfoFactory; + } + /** * Initialize this Cluster instance. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/Session.java b/driver-core/src/main/java/com/datastax/driver/core/Session.java index a8e4a59b2a8..0e23caa8e78 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Session.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Session.java @@ -13,6 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/* + * Copyright (C) 2021 ScyllaDB + * + * Modified by ScyllaDB + */ package com.datastax.driver.core; import com.datastax.driver.core.exceptions.AuthenticationException; @@ -20,6 +26,7 @@ import com.datastax.driver.core.exceptions.QueryExecutionException; import com.datastax.driver.core.exceptions.QueryValidationException; import com.datastax.driver.core.exceptions.UnsupportedFeatureException; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.google.common.util.concurrent.ListenableFuture; import java.io.Closeable; import java.util.Collection; @@ -41,6 +48,13 @@ */ public interface Session extends Closeable { + /** + * The tracingInfo factory class used by this Session. + * + * @return the factory used currently by this Session. + */ + TracingInfoFactory getTracingInfoFactory(); + /** * The keyspace to which this Session is currently logged in, if any. * diff --git a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java index ada9cc1699c..99dbe520cd3 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java @@ -29,6 +29,7 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; @@ -76,6 +77,11 @@ class SessionManager extends AbstractSession { this.poolsState = new HostConnectionPool.PoolState(); } + @Override + public TracingInfoFactory getTracingInfoFactory() { + return cluster.getTracingInfoFactory(); + } + @Override public Session init() { try { diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java new file mode 100644 index 00000000000..3706c0239aa --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +public class NoopTracingInfoFactory implements TracingInfoFactory { + + private static class NoopTracingInfo implements TracingInfo { + @Override + public void setNameAndStartTime(String name) {} + + @Override + public void tracingFinished() {} + } + + public static final NoopTracingInfo INSTANCE = new NoopTracingInfo(); + + @Override + public TracingInfo buildTracingInfo() { + return INSTANCE; + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + return new NoopTracingInfo(); + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java b/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java index 098164caa88..b4b73356186 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java @@ -27,6 +27,7 @@ import static org.testng.Assert.assertTrue; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.CassandraVersion; import com.google.common.base.Function; import com.google.common.base.Throwables; @@ -286,6 +287,11 @@ public ResultSet execute(Statement statement) { return executeAsync(statement).getUninterruptibly(); } + @Override + public TracingInfoFactory getTracingInfoFactory() { + return session.getTracingInfoFactory(); + } + @Override public String getLoggedKeyspace() { return session.getLoggedKeyspace(); diff --git a/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java b/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java index 0bee5ec9a8d..03d01e18cb4 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/DelegatingClusterTest.java @@ -29,7 +29,8 @@ import org.testng.annotations.Test; public class DelegatingClusterTest { - private static final Set NON_DELEGATED_METHODS = ImmutableSet.of("getClusterName"); + private static final Set NON_DELEGATED_METHODS = + ImmutableSet.of("getClusterName", "setTracingInfoFactory", "getTracingInfoFactory"); /** * Checks that all methods of {@link DelegatingCluster} invoke their counterpart in {@link From 8561cf6f36875a32657223d7171d4c1e29fe248e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 13:33:49 +0200 Subject: [PATCH 04/11] Instrumenting RequestHandler with tracing: added empty spans. Every request is covered by one "request" span, and every speculative execution has its own "speculative_execution.n" span, where n is number of that (possibly speculative) execution. Retries are performed in the same execution and therefore do not increase that number, whereas speculative executions do. Each retry is covered by "query" span, effectively yielding a tree of form: "request" -1--many-> "speculative_execution.n" -1--many-> "query" --- .../datastax/driver/core/RequestHandler.java | 47 +++++++++++++++++-- .../datastax/driver/core/SessionManager.java | 19 ++++++-- 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index a8a05f093f5..88646b76910 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -40,6 +40,7 @@ import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.RetryPolicy.RetryDecision.Type; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy.SpeculativeExecutionPlan; +import com.datastax.driver.core.tracing.TracingInfo; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -95,6 +96,8 @@ class RequestHandler { private final AtomicBoolean isDone = new AtomicBoolean(); private final AtomicInteger executionIndex = new AtomicInteger(); + private final TracingInfo tracingInfo; + private Iterator getReplicas( String loggedKeyspace, Statement statement, Iterator fallback) { ProtocolVersion protocolVersion = manager.cluster.manager.protocolVersion(); @@ -120,7 +123,8 @@ private Iterator getReplicas( return replicas.iterator(); } - public RequestHandler(SessionManager manager, Callback callback, Statement statement) { + public RequestHandler( + SessionManager manager, Callback callback, Statement statement, TracingInfo tracingInfo) { this.id = Long.toString(System.identityHashCode(this)); if (logger.isTraceEnabled()) logger.trace("[{}] {}", id, statement); this.manager = manager; @@ -156,6 +160,9 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null; this.startTime = System.nanoTime(); + + this.tracingInfo = tracingInfo; + this.tracingInfo.setNameAndStartTime("request"); } void sendRequest() { @@ -274,6 +281,7 @@ private void setFinalResult( logServerWarnings(response.warnings); } callback.onSet(connection, response, info, statement, System.nanoTime() - startTime); + tracingInfo.tracingFinished(); } catch (Exception e) { callback.onException( connection, @@ -281,6 +289,8 @@ private void setFinalResult( "Unexpected exception while setting final result from " + response, e), System.nanoTime() - startTime, /*unused*/ 0); + + tracingInfo.tracingFinished(); } } @@ -305,6 +315,8 @@ private void setFinalException( cancelPendingExecutions(execution); + tracingInfo.tracingFinished(); + try { if (timerContext != null) timerContext.stop(); } finally { @@ -315,6 +327,7 @@ private void setFinalException( // Triggered when an execution reaches the end of the query plan. // This is only a failure if there are no other running executions. private void reportNoMoreHosts(SpeculativeExecution execution) { + execution.parentTracingInfo.tracingFinished(); runningExecutions.remove(execution); if (runningExecutions.isEmpty()) setFinalException( @@ -383,11 +396,17 @@ class SpeculativeExecution implements Connection.ResponseCallback { private volatile Connection.ResponseHandler connectionHandler; + private final TracingInfo parentTracingInfo; + private TracingInfo currentChildTracingInfo; + SpeculativeExecution(Message.Request request, int position) { this.id = RequestHandler.this.id + "-" + position; this.request = request; this.position = position; this.queryStateRef = new AtomicReference(QueryState.INITIAL); + this.parentTracingInfo = + manager.getTracingInfoFactory().buildTracingInfo(RequestHandler.this.tracingInfo); + this.parentTracingInfo.setNameAndStartTime("speculative_execution." + position); if (logger.isTraceEnabled()) logger.trace("[{}] Starting", id); } @@ -429,6 +448,9 @@ private boolean query(final Host host) { if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", id, host); + currentChildTracingInfo = manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); + currentChildTracingInfo.setNameAndStartTime("query"); + if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true)) scheduleExecution(speculativeExecutionPlan.nextExecution(host)); @@ -647,6 +669,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.tracingFinished(); return; } else if (!previous.inProgress && queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_COMPLETE)) { @@ -659,6 +682,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.tracingFinished(); return; } } @@ -674,6 +698,8 @@ public Message.Request request() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -832,7 +858,10 @@ public void onSet( toPrepare.getQueryKeyspace(), connection.endPoint); - write(connection, prepareAndRetry(toPrepare.getQueryString())); + TracingInfo prepareTracingInfo = + manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); + prepareTracingInfo.setNameAndStartTime("prepare"); + write(connection, prepareAndRetry(toPrepare.getQueryString(), prepareTracingInfo)); // we're done for now, the prepareAndRetry callback will handle the rest return; case READ_FAILURE: @@ -878,7 +907,8 @@ public void onSet( } } - private Connection.ResponseCallback prepareAndRetry(final String toPrepare) { + private Connection.ResponseCallback prepareAndRetry( + final String toPrepare, final TracingInfo prepareTracingInfo) { // do not bother inspecting retry policy at this step, no other decision // makes sense than retry on the same host if the query was prepared, // or on another host, if an error/timeout occurred. @@ -902,6 +932,8 @@ public int retryCount() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -944,11 +976,14 @@ public void onSet( @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); SpeculativeExecution.this.onException(connection, exception, latency, retryCount); } @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + prepareTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -973,6 +1008,8 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) { @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -1010,6 +1047,8 @@ public void onException( @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + currentChildTracingInfo.tracingFinished(); + QueryState queryState = queryStateRef.get(); if (!queryState.isInProgressAt(retryCount) || !queryStateRef.compareAndSet(queryState, queryState.complete())) { @@ -1051,10 +1090,12 @@ public int retryCount() { } private void setFinalException(Connection connection, Exception exception) { + parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalException(this, connection, exception); } private void setFinalResult(Connection connection, Message.Response response) { + parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalResult(this, connection, response); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java index 99dbe520cd3..4a4dcb2c159 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java +++ b/driver-core/src/main/java/com/datastax/driver/core/SessionManager.java @@ -29,6 +29,7 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; +import com.datastax.driver.core.tracing.TracingInfo; import com.datastax.driver.core.tracing.TracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.base.Functions; @@ -161,6 +162,7 @@ public ResultSetFuture executeAsync(final Statement statement) { // Because of the way the future is built, we need another 'proxy' future that we can return // now. final ChainedResultSetFuture chainedFuture = new ChainedResultSetFuture(); + final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo(); this.initAsync() .addListener( new Runnable() { @@ -171,7 +173,7 @@ public void run() { SessionManager.this, cluster.manager.protocolVersion(), makeRequestMessage(statement, null)); - execute(actualFuture, statement); + execute(actualFuture, statement, tracingInfo); chainedFuture.setSource(actualFuture); } }, @@ -712,25 +714,34 @@ else if (fetchSize != Integer.MAX_VALUE) *

This method will find a suitable node to connect to using the {@link LoadBalancingPolicy} * and handle host failover. */ - void execute(final RequestHandler.Callback callback, final Statement statement) { + void execute( + final RequestHandler.Callback callback, + final Statement statement, + final TracingInfo tracingInfo) { if (this.isClosed()) { callback.onException( null, new IllegalStateException("Could not send request, session is closed"), 0, 0); return; } - if (isInit) new RequestHandler(this, callback, statement).sendRequest(); + if (isInit) new RequestHandler(this, callback, statement, tracingInfo).sendRequest(); else this.initAsync() .addListener( new Runnable() { @Override public void run() { - new RequestHandler(SessionManager.this, callback, statement).sendRequest(); + new RequestHandler(SessionManager.this, callback, statement, tracingInfo) + .sendRequest(); } }, executor()); } + void execute(final RequestHandler.Callback callback, final Statement statement) { + final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo(); + execute(callback, statement, tracingInfo); + } + private ListenableFuture prepare( final PreparedStatement statement, EndPoint toExclude) { final String query = statement.getQueryString(); From bfaf37d7b7ec72d8e074fc284cde065e736f4f14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 14:11:18 +0200 Subject: [PATCH 05/11] Instrumenting RequestHandler: added status codes and exceptions. After a request is completed, its status code (OK or ERROR) is added to trace data, as well as exception information (if one occured). --- .../datastax/driver/core/RequestHandler.java | 5 ++++ .../core/tracing/NoopTracingInfoFactory.java | 9 ++++++ .../driver/core/tracing/TracingInfo.java | 28 +++++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index 88646b76910..3f74eb4ac5c 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -281,6 +281,7 @@ private void setFinalResult( logServerWarnings(response.warnings); } callback.onSet(connection, response, info, statement, System.nanoTime() - startTime); + tracingInfo.setStatus(TracingInfo.StatusCode.OK); tracingInfo.tracingFinished(); } catch (Exception e) { callback.onException( @@ -290,6 +291,8 @@ private void setFinalResult( System.nanoTime() - startTime, /*unused*/ 0); + tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, e.toString()); + tracingInfo.recordException(e); tracingInfo.tracingFinished(); } } @@ -315,6 +318,8 @@ private void setFinalException( cancelPendingExecutions(execution); + tracingInfo.recordException(exception); + tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, exception.toString()); tracingInfo.tracingFinished(); try { diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java index 3706c0239aa..a2c39d6cee3 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java @@ -22,6 +22,15 @@ private static class NoopTracingInfo implements TracingInfo { @Override public void setNameAndStartTime(String name) {} + @Override + public void recordException(Exception exception) {} + + @Override + public void setStatus(StatusCode code, String description) {} + + @Override + public void setStatus(StatusCode code) {} + @Override public void tracingFinished() {} } diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java index f5914a5bf19..090b32e41af 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java @@ -22,6 +22,12 @@ */ public interface TracingInfo { + /** Final status of the traced execution. */ + enum StatusCode { + OK, + ERROR, + } + /** * Starts a span corresponding to this {@link TracingInfo} object. Must be called exactly once, * before any other method, at the beginning of the traced execution. @@ -30,6 +36,28 @@ public interface TracingInfo { */ void setNameAndStartTime(String name); + /** + * Records in the trace that the provided exception occured. + * + * @param exception the exception to be recorded. + */ + void recordException(Exception exception); + + /** + * Sets the final status of the traced execution. + * + * @param code the status code to be set. + */ + void setStatus(StatusCode code); + + /** + * Sets the final status of the traced execution, with additional description. + * + * @param code the status code to be set. + * @param description the additional description of the status. + */ + void setStatus(StatusCode code, String description); + /** Must be always called exactly once at the logical end of traced execution. */ void tracingFinished(); } From 6fea490d436910bd028ea6e3ae6e5f64c99fc48d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 25 Jul 2022 14:48:27 +0200 Subject: [PATCH 06/11] Instrumenting RequestHandler: added various driver-side tags. All data that constitute added tags are collected offline, i.e. not fetched from the cluster, but known instantly by the driver. --- clirr-ignores.xml | 6 + .../datastax/driver/core/BoundStatement.java | 7 + .../driver/core/DefaultPreparedStatement.java | 8 ++ .../driver/core/PreparedStatement.java | 3 + .../datastax/driver/core/RequestHandler.java | 90 +++++++++++- .../core/tracing/NoopTracingInfoFactory.java | 59 ++++++++ .../driver/core/tracing/TracingInfo.java | 133 ++++++++++++++++++ 7 files changed, 305 insertions(+), 1 deletion(-) diff --git a/clirr-ignores.xml b/clirr-ignores.xml index 5cf67ffc00c..fbe39834e5a 100644 --- a/clirr-ignores.xml +++ b/clirr-ignores.xml @@ -16,6 +16,12 @@ Modified by ScyllaDB --> + + 7012 + com/datastax/driver/core/PreparedStatement + java.lang.String getOperationType() + New method to get the type of operation performed by this PreparedStatement + 7012 com/datastax/driver/core/Session diff --git a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java index 9317bd0a58b..cc5117be68c 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java @@ -74,6 +74,8 @@ public class BoundStatement extends Statement private ByteBuffer routingKey; + private final String operationType; + /** * Creates a new {@code BoundStatement} from the provided prepared statement. * @@ -92,6 +94,7 @@ public BoundStatement(PreparedStatement statement) { this.setSerialConsistencyLevel(statement.getSerialConsistencyLevel()); if (statement.isTracing()) this.enableTracing(); if (statement.getRetryPolicy() != null) this.setRetryPolicy(statement.getRetryPolicy()); + this.operationType = statement.getOperationType(); if (statement.getOutgoingPayload() != null) this.setOutgoingPayload(statement.getOutgoingPayload()); else @@ -104,6 +107,10 @@ public BoundStatement(PreparedStatement statement) { } } + public String getOperationType() { + return operationType; + } + @Override public boolean isLWT() { return statement.isLWT(); diff --git a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java index 287a55cbdb3..6297660e5a2 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java @@ -41,6 +41,7 @@ public class DefaultPreparedStatement implements PreparedStatement { final Cluster cluster; final boolean isLWT; final Token.Factory partitioner; + final String operationType; volatile ByteBuffer routingKey; @@ -66,6 +67,7 @@ private DefaultPreparedStatement( this.cluster = cluster; this.isLWT = isLWT; this.partitioner = partitioner; + this.operationType = null; } static DefaultPreparedStatement fromMessage( @@ -315,4 +317,10 @@ public Boolean isIdempotent() { public boolean isLWT() { return isLWT; } + + /** {@inheritDoc} */ + @Override + public String getOperationType() { + return operationType; + } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java index e87cbc0341b..779c35a1672 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java @@ -361,4 +361,7 @@ public interface PreparedStatement { /** Whether a prepared statement is LWT statement */ public boolean isLWT(); + + /** Type of prepared operation (e.g. SELECT) */ + public String getOperationType(); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index 3f74eb4ac5c..0726e3c575d 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -48,6 +48,7 @@ import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Iterator; @@ -74,6 +75,10 @@ class RequestHandler { private static final QueryLogger QUERY_LOGGER = QueryLogger.builder().build(); static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS"; + private static final int STATEMENT_MAX_LENGTH = 1000; + private static final int PARTITION_KEY_MAX_LENGTH = 1000; + private static final int REPLICAS_MAX_LENGTH = 1000; + final String id; private final SessionManager manager; @@ -161,8 +166,70 @@ public RequestHandler( this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null; this.startTime = System.nanoTime(); + ConsistencyLevel consistency = statement.getConsistencyLevel(); + if (consistency == null) consistency = Statement.DEFAULT.getConsistencyLevel(); + + String statementType = null; + String statementText = null; + int batchSize = 1; + + String keyspace = null; + String partitionKey = null; + String table = null; + String operationType = null; + + if (statement instanceof BatchStatement) { + statementType = "batch"; + batchSize = ((BatchStatement) statement).size(); + StringBuilder statementTextBuilder = new StringBuilder(STATEMENT_MAX_LENGTH); + for (Statement subStatement : ((BatchStatement) statement).getStatements()) { + if (subStatement instanceof BoundStatement) + statementTextBuilder.append(((BoundStatement) subStatement).statement.getQueryString()); + else statementTextBuilder.append(subStatement.toString()); + } + statementText = statementTextBuilder.toString(); + } else if (statement instanceof BoundStatement) { + statementType = "prepared"; + statementText = ((BoundStatement) statement).statement.getQueryString(); + keyspace = ((BoundStatement) statement).getKeyspace(); + operationType = ((BoundStatement) statement).getOperationType(); + + ColumnDefinitions boundColumns = + ((BoundStatement) statement).statement.getPreparedId().boundValuesMetadata.variables; + + StringBuilder partitionKeyBuilder = new StringBuilder(PARTITION_KEY_MAX_LENGTH); + int[] rkIndexes = ((BoundStatement) statement).statement.getPreparedId().routingKeyIndexes; + if (rkIndexes != null) { + for (int i : rkIndexes) { + Object value = ((BoundStatement) statement).getObject(i); + String valueString = (value == null) ? "NULL" : value.toString(); + if (partitionKeyBuilder.length() > 0) partitionKeyBuilder.append(", "); + String columnName = boundColumns.getName(i); + partitionKeyBuilder.append(columnName); + partitionKeyBuilder.append('='); + partitionKeyBuilder.append(valueString); + } + } + partitionKey = partitionKeyBuilder.toString(); + + if (boundColumns.size() > 0) table = boundColumns.getTable(0); + } else if (statement instanceof RegularStatement) { + statementType = "regular"; + statementText = ((RegularStatement) statement).toString(); + } + this.tracingInfo = tracingInfo; this.tracingInfo.setNameAndStartTime("request"); + this.tracingInfo.setConsistencyLevel(consistency); + this.tracingInfo.setRetryPolicy(retryPolicy()); + this.tracingInfo.setBatchSize(batchSize); + this.tracingInfo.setLoadBalancingPolicy(manager.loadBalancingPolicy()); + if (statementType != null) this.tracingInfo.setStatementType(statementType); + if (statementText != null) this.tracingInfo.setStatement(statementText, STATEMENT_MAX_LENGTH); + if (keyspace != null) this.tracingInfo.setKeyspace(keyspace); + if (partitionKey != null) this.tracingInfo.setPartitionKey(partitionKey); + if (table != null) this.tracingInfo.setTable(table); + if (operationType != null) this.tracingInfo.setOperationType(operationType); } void sendRequest() { @@ -281,6 +348,15 @@ private void setFinalResult( logServerWarnings(response.warnings); } callback.onSet(connection, response, info, statement, System.nanoTime() - startTime); + + if (response.type == Message.Response.Type.RESULT) { + Responses.Result rm = (Responses.Result) response; + if (rm.kind == Responses.Result.Kind.ROWS) { + Responses.Result.Rows r = (Responses.Result.Rows) rm; + tracingInfo.setRowsCount(r.data.size()); + } + } + tracingInfo.setQueryPaged(info.getPagingState() != null); tracingInfo.setStatus(TracingInfo.StatusCode.OK); tracingInfo.tracingFinished(); } catch (Exception e) { @@ -291,8 +367,8 @@ private void setFinalResult( System.nanoTime() - startTime, /*unused*/ 0); - tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, e.toString()); tracingInfo.recordException(e); + tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, e.toString()); tracingInfo.tracingFinished(); } } @@ -332,6 +408,7 @@ private void setFinalException( // Triggered when an execution reaches the end of the query plan. // This is only a failure if there are no other running executions. private void reportNoMoreHosts(SpeculativeExecution execution) { + execution.parentTracingInfo.setRetryCount(execution.retryCount()); execution.parentTracingInfo.tracingFinished(); runningExecutions.remove(execution); if (runningExecutions.isEmpty()) @@ -455,6 +532,10 @@ private boolean query(final Host host) { currentChildTracingInfo = manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo); currentChildTracingInfo.setNameAndStartTime("query"); + InetSocketAddress hostAddress = host.getEndPoint().resolve(); + currentChildTracingInfo.setPeerName(hostAddress.getHostName()); + currentChildTracingInfo.setPeerIP(hostAddress.getAddress()); + currentChildTracingInfo.setPeerPort(hostAddress.getPort()); if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true)) scheduleExecution(speculativeExecutionPlan.nextExecution(host)); @@ -674,6 +755,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setRetryCount(retryCount()); parentTracingInfo.tracingFinished(); return; } else if (!previous.inProgress @@ -687,6 +769,7 @@ void cancel() { CancelledSpeculativeExecutionException.INSTANCE, System.nanoTime() - startTime); } + parentTracingInfo.setRetryCount(retryCount()); parentTracingInfo.tracingFinished(); return; } @@ -703,6 +786,7 @@ public Message.Request request() { @Override public void onSet( Connection connection, Message.Response response, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); currentChildTracingInfo.tracingFinished(); QueryState queryState = queryStateRef.get(); @@ -1013,6 +1097,7 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) { @Override public void onException( Connection connection, Exception exception, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); currentChildTracingInfo.tracingFinished(); QueryState queryState = queryStateRef.get(); @@ -1052,6 +1137,7 @@ public void onException( @Override public boolean onTimeout(Connection connection, long latency, int retryCount) { + currentChildTracingInfo.setShardID(connection.shardId()); currentChildTracingInfo.tracingFinished(); QueryState queryState = queryStateRef.get(); @@ -1095,11 +1181,13 @@ public int retryCount() { } private void setFinalException(Connection connection, Exception exception) { + parentTracingInfo.setRetryCount(retryCount()); parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalException(this, connection, exception); } private void setFinalResult(Connection connection, Message.Response response) { + parentTracingInfo.setRetryCount(retryCount()); parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalResult(this, connection, response); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java index a2c39d6cee3..c499c0a496e 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java @@ -16,12 +16,71 @@ package com.datastax.driver.core.tracing; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import java.net.InetAddress; + public class NoopTracingInfoFactory implements TracingInfoFactory { private static class NoopTracingInfo implements TracingInfo { @Override public void setNameAndStartTime(String name) {} + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) {} + + @Override + public void setStatementType(String statementType) {} + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) {} + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) {}; + + @Override + public void setBatchSize(int batchSize) {} + + @Override + public void setRetryCount(int retryCount) {} + + @Override + public void setShardID(int shardID) {} + + @Override + public void setPeerName(String peerName) {} + + @Override + public void setPeerIP(InetAddress peerIP) {} + + @Override + public void setPeerPort(int peerPort) {} + + @Override + public void setQueryPaged(Boolean queryPaged) {} + + @Override + public void setRowsCount(int rowsCount) {} + + @Override + public void setStatement(String statement, int limit) {} + + @Override + public void setKeyspace(String keyspace) {} + + @Override + public void setPartitionKey(String partitionKey) {} + + @Override + public void setTable(String table) {} + + @Override + public void setOperationType(String operationType) {} + + @Override + public void setReplicas(String replicas) {} + @Override public void recordException(Exception exception) {} diff --git a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java index 090b32e41af..cb398b993ee 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java +++ b/driver-core/src/main/java/com/datastax/driver/core/tracing/TracingInfo.java @@ -16,6 +16,11 @@ package com.datastax.driver.core.tracing; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import java.net.InetAddress; + /** * An abstraction layer over instrumentation library API, corresponding to a logical span in the * trace. @@ -36,6 +41,134 @@ enum StatusCode { */ void setNameAndStartTime(String name); + /** + * Adds provided consistency level to the trace. + * + * @param consistency the consistency level to be set. + */ + void setConsistencyLevel(ConsistencyLevel consistency); + + /** + * Adds provided statement type to the trace. + * + * @param statementType the statement type to be set. + */ + void setStatementType(String statementType); + + /** + * Adds provided retry policy to the trace. + * + * @param retryPolicy the retry policy to be set. + */ + void setRetryPolicy(RetryPolicy retryPolicy); + + /** + * Adds provided load balancing policy to the trace. + * + * @param loadBalancingPolicy the load balancing policy to be set. + */ + void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy); + + /** + * Adds provided batch size to the trace. + * + * @param batchSize the batch size to be set. + */ + void setBatchSize(int batchSize); + + /** + * Adds provided retry count to the trace. + * + * @param retryCount the retry count to be set. + */ + void setRetryCount(int retryCount); + + /** + * Adds provided shard ID to the trace. + * + * @param shardID the shard ID to be set. + */ + void setShardID(int shardID); + + /** + * Adds provided peer name to the trace. + * + * @param peerName the peer name to be set. + */ + void setPeerName(String peerName); + + /** + * Adds provided peer IP to the trace. + * + * @param peerIP the peer IP to be set. + */ + void setPeerIP(InetAddress peerIP); + + /** + * Adds provided peer port to the trace. + * + * @param peerPort the peer port to be set. + */ + void setPeerPort(int peerPort); + + /** + * Adds information whether the query was paged to the trace. + * + * @param queryPaged information whether the query was paged. + */ + void setQueryPaged(Boolean queryPaged); + + /** + * Adds provided number of returned rows to the trace. + * + * @param rowsCount the number of returned rows to be set. + */ + void setRowsCount(int rowsCount); + + /** + * Adds provided statement text to the trace. If the statement length is greater than given limit, + * the statement is trimmed to the first {@param limit} signs. + * + * @param statement the statement text to be set. + * @param limit the statement length limit. + */ + void setStatement(String statement, int limit); + + /** + * Adds provided keyspace to the trace. + * + * @param keyspace the keyspace to be set. + */ + void setKeyspace(String keyspace); + + /** + * Adds provided partition key string to the trace. + * + * @param partitionKey the partitionKey to be set. + */ + void setPartitionKey(String partitionKey); + + /** + * Adds provided table name to the trace. + * + * @param table the table name to be set. + */ + void setTable(String table); + + /** + * Adds provided operation type (e.g. SELECT) to the trace. + * + * @param operationType the operation type to be set. + */ + void setOperationType(String operationType); + + /** + * Adds provided list of contacted replicas to the trace. + * + * @param replicas the list of contacted replicas to be set. + */ + void setReplicas(String replicas); + /** * Records in the trace that the provided exception occured. * From 0ba6d874ea6db34fde7eee0bbd34ca0f8fe78cd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 09:49:39 +0200 Subject: [PATCH 07/11] Tracing: added abstract-based unit tests These tests verify that the span tree structure is valid and that spans are filled with tags of proper content, by providing a mock implementation of TracingInfo. --- .../driver/core/tracing/BasicTracingTest.java | 204 ++++++++++++ .../driver/core/tracing/TestTracingInfo.java | 291 ++++++++++++++++++ .../core/tracing/TestTracingInfoFactory.java | 62 ++++ 3 files changed, 557 insertions(+) create mode 100644 driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java create mode 100644 driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java create mode 100644 driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java new file mode 100644 index 00000000000..06c6bd85557 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/BasicTracingTest.java @@ -0,0 +1,204 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.PagingOptimizingLoadBalancingPolicy; +import java.util.ArrayList; +import java.util.Collection; +import org.testng.annotations.Test; + +public class BasicTracingTest extends CCMTestsSupport { + private static TestTracingInfoFactory testTracingInfoFactory; + private Session session; + + @Override + public void onTestContextInitialized() { + initializeTestTracing(); + session.execute("USE " + keyspace); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + + Collection spans = testTracingInfoFactory.getSpans(); + spans.clear(); + } + + @Test(groups = "short") + public void simpleTracingTest() { + session.execute("INSERT INTO t(k, v) VALUES (1, 7)"); + + Collection spans = testTracingInfoFactory.getSpans(); + assertNotEquals(spans.size(), 0); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + spans.clear(); + } + + @Test(groups = "short") + public void tagsTest() { + PreparedStatement prepared = session.prepare("INSERT INTO t(k, v) VALUES (?, ?)"); + + Collection prepareSpans = testTracingInfoFactory.getSpans(); + assertNotEquals(prepareSpans.size(), 0); + assertTrue(getRoot(prepareSpans) instanceof TestTracingInfo); + prepareSpans.clear(); + + BoundStatement bound = prepared.bind(1, 7); + session.execute(bound); + + Collection spans = testTracingInfoFactory.getSpans(); + assertNotEquals(spans.size(), 0); + + TracingInfo rootSpan = getRoot(spans); + assertTrue(rootSpan instanceof TestTracingInfo); + TestTracingInfo root = (TestTracingInfo) rootSpan; + + assertTrue(root.isSpanStarted()); + assertTrue(root.isSpanFinished()); + assertEquals(root.getStatusCode(), TracingInfo.StatusCode.OK); + + // these tags should be set for request span + assertEquals(root.getStatementType(), "prepared"); + assertEquals(root.getBatchSize(), new Integer(1)); + assertEquals(root.getConsistencyLevel(), ConsistencyLevel.ONE); + assertNull(root.getRowsCount()); // no rows are returned in insert + assertTrue(root.getLoadBalancingPolicy() instanceof PagingOptimizingLoadBalancingPolicy); + assertTrue(root.getRetryPolicy() instanceof DefaultRetryPolicy); + assertFalse(root.getQueryPaged()); + assertNull(root.getStatement()); // because of precision level NORMAL + // these are tags specific to bound statement + assertEquals(root.getKeyspace(), keyspace); + assertEquals(root.getPartitionKey(), "k=1"); + assertEquals(root.getTable(), "t"); + + // these tags should not be set for request span + assertNull(root.getPeerName()); + assertNull(root.getPeerIP()); + assertNull(root.getPeerPort()); + assertNull(root.getRetryCount()); + + ArrayList speculativeExecutions = getChildren(spans, root); + assertTrue(speculativeExecutions.size() > 0); + + for (TracingInfo speculativeExecutionSpan : speculativeExecutions) { + assertTrue(speculativeExecutionSpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) speculativeExecutionSpan; + + // these tags should not be set for speculative execution span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getQueryPaged()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getPeerName()); + assertNull(tracingInfo.getPeerIP()); + assertNull(tracingInfo.getPeerPort()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // this tag should be set for speculative execution span + assertTrue(tracingInfo.getRetryCount() >= 0); + } + + ArrayList queries = new ArrayList(); + for (TracingInfo tracingInfo : speculativeExecutions) { + queries.addAll(getChildren(spans, tracingInfo)); + } + assertTrue(queries.size() > 0); + + for (TracingInfo querySpan : queries) { + assertTrue(querySpan instanceof TestTracingInfo); + TestTracingInfo tracingInfo = (TestTracingInfo) querySpan; + + // these tags should not be set for query span + assertNull(tracingInfo.getStatementType()); + assertNull(tracingInfo.getBatchSize()); + assertNull(tracingInfo.getConsistencyLevel()); + assertNull(tracingInfo.getRowsCount()); + assertNull(tracingInfo.getLoadBalancingPolicy()); + assertNull(tracingInfo.getRetryPolicy()); + assertNull(tracingInfo.getQueryPaged()); + assertNull(tracingInfo.getStatement()); + assertNull(tracingInfo.getRetryCount()); + // these are tags specific to bound statement + assertNull(tracingInfo.getKeyspace()); + assertNull(tracingInfo.getPartitionKey()); + assertNull(tracingInfo.getTable()); + + // these tags should be set for query span + assertNotNull(tracingInfo.getPeerName()); + assertNotNull(tracingInfo.getPeerIP()); + assertNotNull(tracingInfo.getPeerPort()); + assertTrue(tracingInfo.getPeerPort() >= 0 && tracingInfo.getPeerPort() <= 65535); + } + + spans.clear(); + } + + private void initializeTestTracing() { + testTracingInfoFactory = new TestTracingInfoFactory(PrecisionLevel.NORMAL); + cluster().setTracingInfoFactory(testTracingInfoFactory); + session = cluster().connect(); + } + + private TracingInfo getRoot(Collection spans) { + TracingInfo root = null; + for (TracingInfo tracingInfo : spans) { + if (tracingInfo instanceof TestTracingInfo + && ((TestTracingInfo) tracingInfo).getParent() == null) { + assertNull(root); // There should be only one root. + root = tracingInfo; + } + } + + return root; + } + + private ArrayList getChildren(Collection spans, TracingInfo parent) { + ArrayList children = new ArrayList(); + for (TracingInfo tracingInfo : spans) { + if (tracingInfo instanceof TestTracingInfo + && ((TestTracingInfo) tracingInfo).getParent() == parent) { + children.add(tracingInfo); + } + } + return children; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java new file mode 100644 index 00000000000..903c8415f7e --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfo.java @@ -0,0 +1,291 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; + +public class TestTracingInfo implements TracingInfo { + + private final PrecisionLevel precision; + private TracingInfo parent = null; + + private Boolean spanStarted = false; + private Boolean spanFinished = false; + private String spanName; + private ConsistencyLevel consistencyLevel; + private String statement; + private String statementType; + private Collection exceptions; + private StatusCode statusCode; + private String description; + private InetAddress peerIP; + private RetryPolicy retryPolicy; + private LoadBalancingPolicy loadBalancingPolicy; + private Integer batchSize; + private Integer retryCount; + private Integer shardID; + private String peerName; + private Integer peerPort; + private Boolean queryPaged; + private Integer rowsCount; + private String keyspace; + private String partitionKey; + private String table; + private String operationType; + private String replicas; + + public TestTracingInfo(PrecisionLevel precision) { + this.precision = precision; + } + + public TestTracingInfo(PrecisionLevel precision, TracingInfo parent) { + this(precision); + this.parent = parent; + } + + public PrecisionLevel getPrecision() { + return precision; + } + + @Override + public void setNameAndStartTime(String name) { + this.spanStarted = true; + this.spanName = name; + } + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) { + this.consistencyLevel = consistency; + } + + @Override + public void setStatementType(String statementType) { + this.statementType = statementType; + } + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + } + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) { + this.loadBalancingPolicy = loadBalancingPolicy; + } + + @Override + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + @Override + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + @Override + public void setShardID(int shardID) { + this.shardID = shardID; + } + + @Override + public void setPeerName(String peerName) { + this.peerName = peerName; + } + + @Override + public void setPeerIP(InetAddress peerIP) { + this.peerIP = peerIP; + } + + @Override + public void setPeerPort(int peerPort) { + this.peerPort = peerPort; + } + + @Override + public void setQueryPaged(Boolean queryPaged) { + this.queryPaged = queryPaged; + } + + @Override + public void setRowsCount(int rowsCount) { + this.rowsCount = rowsCount; + } + + @Override + public void setStatement(String statement, int limit) { + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (statement.length() > limit) statement = statement.substring(0, limit); + this.statement = statement; + } + } + + @Override + public void setKeyspace(String keyspace) { + this.keyspace = keyspace; + } + + @Override + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + + @Override + public void setTable(String table) { + this.table = table; + } + + @Override + public void setOperationType(String operationType) { + this.operationType = operationType; + } + + @Override + public void setReplicas(String replicas) { + this.replicas = replicas; + } + + @Override + public void recordException(Exception exception) { + if (this.exceptions == null) { + this.exceptions = new ArrayList(); + } + this.exceptions.add(exception); + } + + @Override + public void setStatus(StatusCode code) { + this.statusCode = code; + } + + @Override + public void setStatus(StatusCode code, String description) { + this.statusCode = code; + this.description = description; + } + + @Override + public void tracingFinished() { + this.spanFinished = true; + } + + private boolean currentPrecisionLevelIsAtLeast(PrecisionLevel requiredLevel) { + return requiredLevel.compareTo(precision) <= 0; + } + + public boolean isSpanStarted() { + return spanStarted; + } + + public boolean isSpanFinished() { + return spanFinished; + } + + public String getSpanName() { + return spanName; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public String getStatementType() { + return statementType; + } + + public RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + public LoadBalancingPolicy getLoadBalancingPolicy() { + return loadBalancingPolicy; + } + + public Integer getBatchSize() { + return batchSize; + } + + public Integer getRetryCount() { + return retryCount; + } + + public Integer getShardID() { + return shardID; + } + + public String getPeerName() { + return peerName; + } + + public InetAddress getPeerIP() { + return peerIP; + } + + public Integer getPeerPort() { + return peerPort; + } + + public Boolean getQueryPaged() { + return queryPaged; + } + + public Integer getRowsCount() { + return rowsCount; + } + + public String getStatement() { + return statement; + } + + public String getKeyspace() { + return keyspace; + } + + public String getPartitionKey() { + return partitionKey; + } + + public String getTable() { + return table; + } + + public String getOperationType() { + return operationType; + } + + public String getReplicas() { + return replicas; + } + + public StatusCode getStatusCode() { + return statusCode; + } + + public String getDescription() { + return description; + } + + public TracingInfo getParent() { + return parent; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java new file mode 100644 index 00000000000..236d0da4c7c --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/tracing/TestTracingInfoFactory.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.core.tracing; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +public class TestTracingInfoFactory implements TracingInfoFactory { + private final PrecisionLevel precision; + private Collection spans = + Collections.synchronizedList(new ArrayList()); + + public TestTracingInfoFactory() { + this.precision = PrecisionLevel.NORMAL; + } + + public TestTracingInfoFactory(final PrecisionLevel precision) { + this.precision = precision; + } + + @Override + public TracingInfo buildTracingInfo() { + TracingInfo tracingInfo = new TestTracingInfo(precision); + spans.add(tracingInfo); + return tracingInfo; + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + TracingInfo tracingInfo; + + if (parent instanceof TestTracingInfo) { + final TestTracingInfo castedParent = (TestTracingInfo) parent; + tracingInfo = new TestTracingInfo(castedParent.getPrecision(), parent); + spans.add(tracingInfo); + return tracingInfo; + } + + tracingInfo = new NoopTracingInfoFactory().buildTracingInfo(); + spans.add(tracingInfo); + return tracingInfo; + } + + public Collection getSpans() { + return spans; + } +} From 142b9af26bd48351a658d5bbc040d3649cc22842 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 11:04:40 +0200 Subject: [PATCH 08/11] Tracing: added OpenTelemetry implementation. Added TracingInfo and TracingInfoFactory implementations in separate driver-opentelemetry module. --- driver-opentelemetry/pom.xml | 149 ++++++++++++ .../OpenTelemetryTracingInfo.java | 230 ++++++++++++++++++ .../OpenTelemetryTracingInfoFactory.java | 61 +++++ pom.xml | 10 + 4 files changed, 450 insertions(+) create mode 100644 driver-opentelemetry/pom.xml create mode 100644 driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java create mode 100644 driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java diff --git a/driver-opentelemetry/pom.xml b/driver-opentelemetry/pom.xml new file mode 100644 index 00000000000..6c59be7b9ad --- /dev/null +++ b/driver-opentelemetry/pom.xml @@ -0,0 +1,149 @@ + + + + + 4.0.0 + + + com.scylladb + scylla-driver-parent + 3.11.2.1-SNAPSHOT + + + scylla-driver-opentelemetry + Java Driver for Scylla and Apache Cassandra - OpenTelemetry integration + An extension of Java Driver for Scylla and Apache Cassandra by adding + functionality of creating traces and spans in OpenTelemetry format. + + + + + + + + io.opentelemetry + opentelemetry-bom + 1.9.1 + pom + import + + + + + + + + + + + + com.scylladb + scylla-driver-core + + + + + + io.opentelemetry + opentelemetry-api + 1.9.1 + + + + + + com.scylladb + scylla-driver-core + test-jar + test + + + + io.opentelemetry + opentelemetry-sdk + test + + + + org.apache.commons + commons-exec + test + + + + org.testng + testng + test + + + + org.slf4j + slf4j-log4j12 + test + + + + + + + + + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + org.codehaus.mojo + animal-sniffer-maven-plugin + 1.15 + + + check-jdk6 + process-classes + + check + + + true + + + + check-jdk8 + + check + + + true + + + + + + + + + + diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java new file mode 100644 index 00000000000..3db5724c623 --- /dev/null +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java @@ -0,0 +1,230 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.opentelemetry; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.tracing.PrecisionLevel; +import com.datastax.driver.core.tracing.TracingInfo; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import java.net.InetAddress; + +public class OpenTelemetryTracingInfo implements TracingInfo { + private Span span; + private final Tracer tracer; + private final Context context; + private boolean tracingStarted; + private final PrecisionLevel precision; + + protected OpenTelemetryTracingInfo(Tracer tracer, Context context, PrecisionLevel precision) { + this.tracer = tracer; + this.context = context; + this.precision = precision; + this.tracingStarted = false; + } + + public Tracer getTracer() { + return tracer; + } + + public Context getContext() { + return context.with(span); + } + + private void assertStarted() { + assert tracingStarted : "TracingInfo.setStartTime must be called before any other method"; + } + + public PrecisionLevel getPrecision() { + return precision; + } + + @Override + public void setNameAndStartTime(String name) { + assert !tracingStarted : "TracingInfo.setStartTime may only be called once."; + tracingStarted = true; + span = tracer.spanBuilder(name).setParent(context).startSpan(); + } + + @Override + public void setConsistencyLevel(ConsistencyLevel consistency) { + assertStarted(); + span.setAttribute("db.scylla.consistency_level", consistency.toString()); + } + + public void setStatement(String statement) { + assertStarted(); + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + span.setAttribute("db.scylla.statement", statement); + } + } + + public void setHostname(String hostname) { + assertStarted(); + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + span.setAttribute("net.peer.name", hostname); + } + } + + @Override + public void setStatementType(String statementType) { + assertStarted(); + span.setAttribute("db.scylla.statement_type", statementType); + } + + @Override + public void setRetryPolicy(RetryPolicy retryPolicy) { + assertStarted(); + span.setAttribute("db.scylla.retry_policy", retryPolicy.getClass().getSimpleName()); + } + + @Override + public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) { + assertStarted(); + span.setAttribute( + "db.scylla.load_balancing_policy", loadBalancingPolicy.getClass().getSimpleName()); + } + + @Override + public void setBatchSize(int batchSize) { + assertStarted(); + span.setAttribute("db.scylla.batch_size", String.valueOf(batchSize)); + } + + @Override + public void setRetryCount(int retryCount) { + assertStarted(); + span.setAttribute("db.scylla.retry_count", String.valueOf(retryCount)); + } + + @Override + public void setShardID(int shardID) { + assertStarted(); + span.setAttribute("db.scylla.shard_id", String.valueOf(shardID)); + } + + @Override + public void setPeerName(String peerName) { + assertStarted(); + span.setAttribute("net.peer.name", peerName); + } + + @Override + public void setPeerIP(InetAddress peerIP) { + assertStarted(); + span.setAttribute("net.peer.ip", peerIP.getHostAddress()); + } + + @Override + public void setPeerPort(int peerPort) { + assertStarted(); + span.setAttribute("net.peer.port", String.valueOf(peerPort)); + } + + @Override + public void setQueryPaged(Boolean queryPaged) { + assertStarted(); + if (queryPaged) span.setAttribute("db.scylla.query_paged", "true"); + else span.setAttribute("db.scylla.query_paged", "false"); + } + + @Override + public void setRowsCount(int rowsCount) { + assertStarted(); + span.setAttribute("db.scylla.rows_count", rowsCount); + } + + @Override + public void setStatement(String statement, int limit) { + assertStarted(); + if (currentPrecisionLevelIsAtLeast(PrecisionLevel.FULL)) { + if (statement.length() > limit) statement = statement.substring(0, limit); + span.setAttribute("db.scylla.statement", statement); + } + } + + @Override + public void setKeyspace(String keyspace) { + assertStarted(); + span.setAttribute("db.scylla.keyspace", keyspace); + } + + @Override + public void setPartitionKey(String partitionKey) { + assertStarted(); + span.setAttribute("db.scylla.partition_key", partitionKey); + } + + @Override + public void setTable(String table) { + assertStarted(); + span.setAttribute("db.scylla.table", table); + } + + @Override + public void setOperationType(String operationType) { + assertStarted(); + span.setAttribute("db.operation", operationType); + } + + @Override + public void setReplicas(String replicas) { + assertStarted(); + span.setAttribute("db.scylla.replicas", replicas); + } + + private io.opentelemetry.api.trace.StatusCode mapStatusCode(StatusCode code) { + switch (code) { + case OK: + return io.opentelemetry.api.trace.StatusCode.OK; + case ERROR: + return io.opentelemetry.api.trace.StatusCode.ERROR; + } + return null; + } + + @Override + public void recordException(Exception exception) { + assertStarted(); + span.recordException(exception); + } + + @Override + public void setStatus(StatusCode code, String description) { + assertStarted(); + span.setStatus(mapStatusCode(code), description); + } + + @Override + public void setStatus(StatusCode code) { + assertStarted(); + span.setStatus(mapStatusCode(code)); + } + + @Override + public void tracingFinished() { + assertStarted(); + span.end(); + } + + private boolean currentPrecisionLevelIsAtLeast(PrecisionLevel requiredLevel) { + return requiredLevel.compareTo(precision) <= 0; + } +} diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java new file mode 100644 index 00000000000..64ff29f82a9 --- /dev/null +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfoFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.opentelemetry; + +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.PrecisionLevel; +import com.datastax.driver.core.tracing.TracingInfo; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; + +public class OpenTelemetryTracingInfoFactory implements TracingInfoFactory { + private final Tracer tracer; + private final PrecisionLevel precision; + + public OpenTelemetryTracingInfoFactory(final Tracer tracer) { + this(tracer, PrecisionLevel.NORMAL); + } + + public OpenTelemetryTracingInfoFactory(final Tracer tracer, final PrecisionLevel precision) { + this.tracer = tracer; + this.precision = precision; + } + + @Override + public TracingInfo buildTracingInfo() { + final Context current = Context.current(); + return new OpenTelemetryTracingInfo(tracer, current, precision); + } + + @Override + public TracingInfo buildTracingInfo(TracingInfo parent) { + if (parent instanceof OpenTelemetryTracingInfo) { + final OpenTelemetryTracingInfo castedParent = (OpenTelemetryTracingInfo) parent; + return new OpenTelemetryTracingInfo( + castedParent.getTracer(), castedParent.getContext(), castedParent.getPrecision()); + } + + return new NoopTracingInfoFactory().buildTracingInfo(); + } + + public TracingInfo buildTracingInfo(Span parent) { + final Context current = Context.current().with(parent); + return new OpenTelemetryTracingInfo(tracer, current, precision); + } +} diff --git a/pom.xml b/pom.xml index 307e2df8864..e925b5a3c6e 100644 --- a/pom.xml +++ b/pom.xml @@ -44,6 +44,7 @@ driver-examples driver-tests driver-dist + driver-opentelemetry @@ -133,6 +134,12 @@ ${project.parent.version} + + com.scylladb + scylla-driver-opentelemetry + ${project.parent.version} + + com.google.guava guava @@ -727,6 +734,9 @@ java18 1.0 + + io.opentelemetry:* + From 05589b82aa2ba2cadb8486db7134d6d295852480 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 11:06:29 +0200 Subject: [PATCH 09/11] Tracing: added OpenTelemetry tracing example. Added OpenTelemetry and Zipkin connection configuration classes, along with docker-compose file, example run script and related changes in README. --- driver-examples/README.md | 13 +- driver-examples/docker-compose.yaml | 12 + driver-examples/pom.xml | 34 +++ driver-examples/runOpenTelemetryExample.sh | 1 + .../OpenTelemetryConfiguration.java | 70 ++++++ .../examples/opentelemetry/ZipkinUsage.java | 231 ++++++++++++++++++ 6 files changed, 360 insertions(+), 1 deletion(-) create mode 100644 driver-examples/docker-compose.yaml create mode 100755 driver-examples/runOpenTelemetryExample.sh create mode 100644 driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java create mode 100644 driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java diff --git a/driver-examples/README.md b/driver-examples/README.md index 553cdbc5117..e696e948716 100644 --- a/driver-examples/README.md +++ b/driver-examples/README.md @@ -5,6 +5,17 @@ Apache Cassandra. ## Usage -Unless otherwise stated, all examples assume that you have a single-node Cassandra 3.0 cluster +Unless otherwise stated, all examples assume that you have a single-node Cassandra 3.0 cluster listening on localhost:9042. +Before running examples, make sure you installed repo artifacts (in root driver directory): +``` +mvn install -q -Dmaven.test.skip=true +``` + +To conveniently run the example showing OpenTelemetry integration (ZipkinConfiguration), +you can use the provided ```docker-compose.yaml``` file (which runs both Scylla cluster and Zipkin instance): +``` +docker-compose up +./runOpenTelemetryExample.sh +``` diff --git a/driver-examples/docker-compose.yaml b/driver-examples/docker-compose.yaml new file mode 100644 index 00000000000..1daf0c46e65 --- /dev/null +++ b/driver-examples/docker-compose.yaml @@ -0,0 +1,12 @@ +version: '3.4' + +services: + zipkin: + image: openzipkin/zipkin + ports: + - "9411:9411" + scylla_node: + image: scylladb/scylla + ports: + - "9042:9042" + command: "--smp 1 --skip-wait-for-gossip-to-settle 0" diff --git a/driver-examples/pom.xml b/driver-examples/pom.xml index 5f28572501b..b47107fbcc7 100644 --- a/driver-examples/pom.xml +++ b/driver-examples/pom.xml @@ -48,6 +48,11 @@ true + + com.scylladb + scylla-driver-opentelemetry + + @@ -134,6 +139,26 @@ logback-classic + + + + io.opentelemetry + opentelemetry-sdk + 1.9.1 + + + + io.opentelemetry + opentelemetry-semconv + 1.9.0-alpha + + + + io.opentelemetry + opentelemetry-exporter-zipkin + 1.9.1 + + @@ -184,6 +209,15 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + diff --git a/driver-examples/runOpenTelemetryExample.sh b/driver-examples/runOpenTelemetryExample.sh new file mode 100755 index 00000000000..7603ca64d18 --- /dev/null +++ b/driver-examples/runOpenTelemetryExample.sh @@ -0,0 +1 @@ +mvn -q exec:java -Dexec.mainClass="com.datastax.driver.examples.opentelemetry.ZipkinUsage" diff --git a/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java new file mode 100644 index 00000000000..f68bca6bafd --- /dev/null +++ b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/OpenTelemetryConfiguration.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datastax.driver.examples.opentelemetry; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; + +/** Example showing how to configure OpenTelemetry for tracing with Scylla Java Driver. */ +class OpenTelemetryConfiguration { + private static final String SERVICE_NAME = "Scylla Java driver"; + + public static OpenTelemetry initialize(SpanExporter spanExporter) { + Resource serviceNameResource = + Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, SERVICE_NAME)); + + // Set to process the spans by the spanExporter. + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).buildAndRegisterGlobal(); + + // Add a shutdown hook to shut down the SDK. + Runtime.getRuntime() + .addShutdownHook( + new Thread( + new Runnable() { + @Override + public void run() { + tracerProvider.close(); + } + })); + + // Return the configured instance so it can be used for instrumentation. + return openTelemetry; + } + + public static OpenTelemetry initializeForZipkin(String ip, int port) { + String endpointPath = "/api/v2/spans"; + String httpUrl = String.format("http://%s:%s", ip, port); + + SpanExporter exporter = + ZipkinSpanExporter.builder().setEndpoint(httpUrl + endpointPath).build(); + + return initialize(exporter); + } +} diff --git a/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java new file mode 100644 index 00000000000..9dcc37f5ba2 --- /dev/null +++ b/driver-examples/src/main/java/com/datastax/driver/examples/opentelemetry/ZipkinUsage.java @@ -0,0 +1,231 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2021 ScyllaDB + * + * Modified by ScyllaDB + */ +package com.datastax.driver.examples.opentelemetry; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import com.datastax.driver.opentelemetry.OpenTelemetryTracingInfoFactory; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; + +/** + * Creates a keyspace and tables, and loads some data into them. Sends OpenTelemetry tracing data to + * Zipkin tracing backend + * + *

Preconditions: - a Scylla cluster is running and accessible through the contacts points + * identified by CONTACT_POINTS and PORT and Zipkin backend is running and accessible through the + * contacts points identified by ZIPKIN_CONTACT_POINT and ZIPKIN_PORT. + * + *

Side effects: - creates a new keyspace "simplex" in the cluster. If a keyspace with this name + * already exists, it will be reused; - creates two tables "simplex.songs" and "simplex.playlists". + * If they exist already, they will be reused; - inserts a row in each table. + */ +public class ZipkinUsage { + private static final String CONTACT_POINT = "127.0.0.1"; + private static final int PORT = 9042; + + private static final String ZIPKIN_CONTACT_POINT = "127.0.0.1"; + private static final int ZIPKIN_PORT = 9411; + + private Cluster cluster; + private Session session; + + private Tracer tracer; + + public static void main(String[] args) { + // Workaround for setting ContextStorage to ThreadLocalContextStorage. + System.setProperty("io.opentelemetry.context.contextStorageProvider", "default"); + + ZipkinUsage client = new ZipkinUsage(); + + try { + client.connect(); + client.createSchema(); + client.loadData(); + client.querySchema(); + System.out.println( + "All requests have been completed. Now you can visit Zipkin at " + + "http://" + + ZIPKIN_CONTACT_POINT + + ":" + + ZIPKIN_PORT + + " and examine the produced trace."); + } finally { + client.close(); + } + } + + /** Initiates a connection to the cluster. */ + public void connect() { + cluster = Cluster.builder().addContactPoints(CONTACT_POINT).withPort(PORT).build(); + + System.out.printf("Connected to cluster: %s%n", cluster.getMetadata().getClusterName()); + + OpenTelemetry openTelemetry = + OpenTelemetryConfiguration.initializeForZipkin(ZIPKIN_CONTACT_POINT, ZIPKIN_PORT); + tracer = openTelemetry.getTracerProvider().get("this"); + TracingInfoFactory tracingInfoFactory = new OpenTelemetryTracingInfoFactory(tracer); + cluster.setTracingInfoFactory(tracingInfoFactory); + + session = cluster.connect(); + } + + /** Creates the schema (keyspace) and tables for this example. */ + public void createSchema() { + session.execute("DROP KEYSPACE IF EXISTS simplex;"); + Span parentSpan = tracer.spanBuilder("create schema").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + { + Span span = tracer.spanBuilder("create simplex").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute( + "CREATE KEYSPACE IF NOT EXISTS simplex WITH replication " + + "= {'class':'SimpleStrategy', 'replication_factor':1};"); + + } finally { + span.end(); + } + } + { + Span span = tracer.spanBuilder("create simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.executeAsync( + "CREATE TABLE IF NOT EXISTS simplex.songs (" + + "id uuid," + + "title text," + + "album text," + + "artist text," + + "tags set," + + "data blob," + + "PRIMARY KEY ((title, artist), album)" + + ");"); + } finally { + span.end(); + } + } + { + Span span = tracer.spanBuilder("create simplex.playlists").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute( + "CREATE TABLE IF NOT EXISTS simplex.playlists (" + + "id uuid," + + "title text," + + "album text, " + + "artist text," + + "song_id uuid," + + "PRIMARY KEY (id, title, album, artist)" + + ");"); + + } finally { + span.end(); + } + } + } finally { + parentSpan.end(); + } + } + + /** Inserts data into the tables. */ + public void loadData() { + Span parentSpan = tracer.spanBuilder("load data").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + + Span prepareSpan = tracer.spanBuilder("prepare").startSpan(); + PreparedStatement ps; + try (Scope prepareScope = prepareSpan.makeCurrent()) { + ps = + session.prepare( + "INSERT INTO simplex.songs (id, title, album, artist, tags) " + + "VALUES (" + + "756716f7-2e54-4715-9f00-91dcbea6cf50," + + "?," + + "?," + + "?," + + "{'jazz', '2013'})" + + ";"); + } finally { + prepareSpan.end(); + } + + Span bindSpan = tracer.spanBuilder("bind").startSpan(); + BoundStatement bound; + try (Scope bindScope = bindSpan.makeCurrent()) { + bound = ps.bind("La Petite Tonkinoise", "Bye Bye Blackbird", "Joséphine Baker"); + } finally { + bindSpan.end(); + } + + Span span = tracer.spanBuilder("insert simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute(bound); + } finally { + span.end(); + } + + } finally { + parentSpan.end(); + } + } + + public void querySchema() { + Span parentSpan = tracer.spanBuilder("query schema").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + + Span prepareSpan = tracer.spanBuilder("prepare").startSpan(); + PreparedStatement ps; + try (Scope prepareScope = prepareSpan.makeCurrent()) { + ps = session.prepare("SELECT * FROM simplex.songs WHERE artist = ? AND title = ?;"); + } finally { + prepareSpan.end(); + } + + Span bindSpan = tracer.spanBuilder("bind").startSpan(); + BoundStatement bound; + try (Scope bindScope = bindSpan.makeCurrent()) { + bound = ps.bind("Joséphine Baker", "La Petite Tonkinoise"); + } finally { + bindSpan.end(); + } + + Span span = tracer.spanBuilder("query simplex.songs").startSpan(); + try (Scope scope = span.makeCurrent()) { + session.execute(bound); + } finally { + span.end(); + } + + } finally { + parentSpan.end(); + } + } + + /** Closes the session and the cluster. */ + public void close() { + session.close(); + cluster.close(); + } +} From 1f667e45733dcd71bf52baed6ac8dc52d3540ad4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 11:08:15 +0200 Subject: [PATCH 10/11] Tracing: added OpenTelemetry integration test. The test verifies that span tree structure and status code are valid. --- .../opentelemetry/OpenTelemetryTest.java | 171 ++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100644 driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java diff --git a/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java new file mode 100644 index 00000000000..f77c4941c90 --- /dev/null +++ b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java @@ -0,0 +1,171 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.opentelemetry; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.testng.annotations.Test; + +/** Tests for OpenTelemetry integration. */ +public class OpenTelemetryTest extends CCMTestsSupport { + /** Collects and saves spans. */ + private static final class SpansCollector implements SpanProcessor { + final Collection startedSpans = + Collections.synchronizedList(new ArrayList()); + final Collection spans = + Collections.synchronizedList(new ArrayList()); + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) { + startedSpans.add(span); + } + + @Override + public boolean isStartRequired() { + return true; + } + + @Override + public void onEnd(ReadableSpan span) { + spans.add(span); + } + + @Override + public boolean isEndRequired() { + return true; + } + + public Collection getSpans() { + for (ReadableSpan span : startedSpans) { + assertTrue(span.hasEnded()); + } + + return spans; + } + } + + private Session session; + + /** + * Prepare OpenTelemetry configuration and run test with it. + * + * @param test test to run. + * @return collected spans. + */ + private Collection collectSpans(BiConsumer test) { + final Resource serviceNameResource = + Resource.create( + Attributes.of(ResourceAttributes.SERVICE_NAME, "Scylla Java driver - test")); + + final SpansCollector collector = new SpansCollector(); + + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(collector) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + final OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).buildAndRegisterGlobal(); + + final Tracer tracer = openTelemetry.getTracerProvider().get("this"); + final OpenTelemetryTracingInfoFactory tracingInfoFactory = + new OpenTelemetryTracingInfoFactory(tracer); + cluster().setTracingInfoFactory(tracingInfoFactory); + session = cluster().connect(); + + session.execute("USE " + keyspace); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + collector.getSpans().clear(); + + test.accept(tracer, tracingInfoFactory); + + tracerProvider.close(); + cluster().setTracingInfoFactory(new NoopTracingInfoFactory()); + + return collector.getSpans(); + } + + /** Basic test for creating spans. */ + @Test(groups = "short") + public void simpleTracingTest() { + final Collection spans = + collectSpans( + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("INSERT INTO t(k, v) VALUES (4, 2)"); + session.execute("INSERT INTO t(k, v) VALUES (2, 1)"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection rootSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(rootSpans.size(), 2); + + rootSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + }); + } +} From c84abeee2196cc4d8f4c32d2c2013d2ac68ee9ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 12:08:45 +0200 Subject: [PATCH 11/11] Tracing: added cluster-side tracing awareness Disclaimer: please keep in mind that it is a proof-of-concept rather than a polished proposal. Added custom payload -based reception of tracing metadata from cluster. When a custom TracingInfoFactory is set by the user in Cluster, with every request the driver asks for custom payload (in a CQL flag), and this is interpreted by a compatible Scylla instance (NB: ignored by a noncompatible one / Cassandra instance) as request for tracing metadata, which then are sent with the response in the custom payload as a ByteMap under the key "opentelemetry". --- .../com/datastax/driver/core/Connection.java | 22 +++++++++++++------ .../driver/core/DefaultPreparedStatement.java | 14 ++++++++++-- .../driver/core/HostConnectionPool.java | 10 +++++++-- .../datastax/driver/core/RequestHandler.java | 17 ++++++++++++++ 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java index 346b63c32ab..eade71f952c 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java @@ -180,11 +180,11 @@ protected Connection(String name, EndPoint endPoint, Factory factory, Owner owne this(name, endPoint, factory, null); } - ListenableFuture initAsync() { - return initAsync(-1, 0); + ListenableFuture initAsync(boolean tracingRequested) { + return initAsync(-1, 0, tracingRequested); } - ListenableFuture initAsync(final int shardId, int serverPort) { + ListenableFuture initAsync(final int shardId, int serverPort, boolean tracingRequested) { if (factory.isShutdown) return Futures.immediateFailedFuture( new ConnectionException(endPoint, "Connection factory is shut down")); @@ -350,7 +350,9 @@ public void operationComplete(ChannelFuture future) throws Exception { ListenableFuture initializeTransportFuture = GuavaCompatibility.INSTANCE.transformAsync( - queryOptionsFuture, onOptionsReady(protocolVersion, initExecutor), initExecutor); + queryOptionsFuture, + onOptionsReady(protocolVersion, initExecutor, tracingRequested), + initExecutor); // Fallback on initializeTransportFuture so we can properly propagate specific exceptions. ListenableFuture initFuture = @@ -497,12 +499,17 @@ public ListenableFuture apply(Message.Response response) throws Exception } private AsyncFunction onOptionsReady( - final ProtocolVersion protocolVersion, final Executor initExecutor) { + final ProtocolVersion protocolVersion, + final Executor initExecutor, + final boolean tracingRequested) { return new AsyncFunction() { @Override public ListenableFuture apply(Void input) throws Exception { ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions(); Map extraOptions = new HashMap(); + if (tracingRequested) { + extraOptions.put("SCYLLA_OPENTELEMETRY_TRACING", "true"); + } LwtInfo lwtInfo = getHost().getLwtInfo(); if (lwtInfo != null) { lwtInfo.addOption(extraOptions); @@ -1288,7 +1295,7 @@ Connection open(Host host) Connection connection = new Connection(buildConnectionName(host), endPoint, this); // This method opens the connection synchronously, so wait until it's initialized try { - connection.initAsync().get(); + connection.initAsync(false).get(); return connection; } catch (ExecutionException e) { throw launderAsyncInitException(e); @@ -1306,10 +1313,11 @@ Connection open(HostConnectionPool pool, int shardId, int serverPort) throws ConnectionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException { pool.host.convictionPolicy.signalConnectionsOpening(1); + boolean tracingRequested = pool.isTracingRequested(); Connection connection = new Connection(buildConnectionName(pool.host), pool.host.getEndPoint(), this, pool); try { - connection.initAsync(shardId, serverPort).get(); + connection.initAsync(shardId, serverPort, tracingRequested).get(); return connection; } catch (ExecutionException e) { throw launderAsyncInitException(e); diff --git a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java index 6297660e5a2..0202fc938b0 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java @@ -26,6 +26,7 @@ import com.datastax.driver.core.policies.RetryPolicy; import com.google.common.collect.ImmutableMap; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,11 +64,20 @@ private DefaultPreparedStatement( this.preparedId = id; this.query = query; this.queryKeyspace = queryKeyspace; - this.incomingPayload = incomingPayload; + if (incomingPayload != null && incomingPayload.containsKey("opentelemetry")) { + Map incomingPayloadCopy = + new HashMap(incomingPayload); + ByteBuffer buf = incomingPayloadCopy.remove("opentelemetry"); + this.operationType = new String(buf.array(), buf.position(), buf.limit() - buf.position()); + if (incomingPayloadCopy.isEmpty()) this.incomingPayload = null; + else this.incomingPayload = ImmutableMap.copyOf(incomingPayloadCopy); + } else { + this.operationType = null; + this.incomingPayload = incomingPayload; + } this.cluster = cluster; this.isLWT = isLWT; this.partitioner = partitioner; - this.operationType = null; } static DefaultPreparedStatement fromMessage( diff --git a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java index d967bc937d2..d6fb79c874b 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java +++ b/driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java @@ -30,6 +30,7 @@ import com.datastax.driver.core.exceptions.BusyPoolException; import com.datastax.driver.core.exceptions.ConnectionException; import com.datastax.driver.core.exceptions.UnsupportedProtocolVersionException; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; import com.datastax.driver.core.utils.MoreFutures; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; @@ -267,6 +268,10 @@ public void run() { this.timeoutsExecutor = manager.getCluster().manager.connectionFactory.eventLoopGroup.next(); } + protected boolean isTracingRequested() { + return !(manager.getTracingInfoFactory() == NoopTracingInfoFactory.INSTANCE); + } + /** * @param reusedConnection an existing connection (from a reconnection attempt) that we want to * reuse as part of this pool. Might be null or already used by another pool. @@ -344,14 +349,15 @@ ListenableFuture initAsyncWithConnection(Connection reusedConnection) { } } - ListenableFuture connectionFuture = connection.initAsync(shardId, serverPort); + ListenableFuture connectionFuture = + connection.initAsync(shardId, serverPort, isTracingRequested()); connectionFutures.add(handleErrors(connectionFuture, initExecutor)); shardConnectionIndex++; } } else { for (Connection connection : newConnections) { - ListenableFuture connectionFuture = connection.initAsync(); + ListenableFuture connectionFuture = connection.initAsync(isTracingRequested()); connectionFutures.add(handleErrors(connectionFuture, initExecutor)); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index 0726e3c575d..fd581914d41 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -1188,6 +1188,23 @@ private void setFinalException(Connection connection, Exception exception) { private void setFinalResult(Connection connection, Message.Response response) { parentTracingInfo.setRetryCount(retryCount()); + + if (response.getCustomPayload() != null + && response.getCustomPayload().containsKey("opentelemetry")) { + ByteBuffer buf = response.getCustomPayload().get("opentelemetry"); + int rep = buf.getInt(); + StringBuilder replicasBuilder = new StringBuilder(REPLICAS_MAX_LENGTH); + for (int i = 0; i < rep; i++) { + int addrSize = (buf.get() & 0xFF); // convert to unsigned int + if (i > 0) replicasBuilder.append(", "); + for (int j = 0; j < addrSize; j++) { + if (j > 0) replicasBuilder.append('.'); + replicasBuilder.append(buf.get() & 0xFF); // convert to unsigned int + } + } + parentTracingInfo.setReplicas(replicasBuilder.toString()); + } + parentTracingInfo.tracingFinished(); RequestHandler.this.setFinalResult(this, connection, response); }