Skip to content

Commit

Permalink
Tracing: added OpenTelemetry integration test
Browse files Browse the repository at this point in the history
The test verifies that span tree structure and status code are valid.
Speculative executions are run parallel to the main thread, so some
of them can finish only after query result has been returned.
Thus, in order to collect span data from entire request, we decided
to wait until all speculative executions end. The main thread uses
conditional variable `allEnded` to wait for them and lock is used
for concurrent mutation of activeSpans.
  • Loading branch information
wprzytula committed Jul 29, 2022
1 parent c516a11 commit 5b6e60b
Show file tree
Hide file tree
Showing 2 changed files with 304 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ public void setBatchSize(int batchSize) {
}

@Override
public void setRetryCount(int retryCount) {
public void setAttemptCount(int attemptCount) {
assertStarted();
span.setAttribute("db.scylla.retry_count", String.valueOf(retryCount));
span.setAttribute("db.scylla.attempt_count", String.valueOf(attemptCount));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
/*
* Copyright (C) 2021 ScyllaDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.driver.opentelemetry;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.datastax.driver.core.CCMTestsSupport;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.SyntaxError;
import com.datastax.driver.core.tracing.NoopTracingInfoFactory;
import com.datastax.driver.core.tracing.PrecisionLevel;
import com.datastax.driver.core.tracing.TracingInfoFactory;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.testng.annotations.Test;

/** Tests for OpenTelemetry integration. */
public class OpenTelemetryTest extends CCMTestsSupport {
/** Collects and saves spans. */
private static final class BookkeepingSpanProcessor implements SpanProcessor {
final Lock lock = new ReentrantLock();
final Condition allEnded = lock.newCondition();

final Collection<ReadableSpan> startedSpans = new ArrayList<>();
final Collection<ReadableSpan> spans = new ArrayList<>();

int activeSpans = 0;

@Override
public void onStart(Context parentContext, ReadWriteSpan span) {
lock.lock();

startedSpans.add(span);
++activeSpans;

lock.unlock();
}

@Override
public boolean isStartRequired() {
return true;
}

@Override
public void onEnd(ReadableSpan span) {
lock.lock();

spans.add(span);
--activeSpans;

if (activeSpans == 0) allEnded.signal();

lock.unlock();
}

@Override
public boolean isEndRequired() {
return true;
}

public Collection<ReadableSpan> getSpans() {
lock.lock();

try {
while (activeSpans > 0) allEnded.await();

for (ReadableSpan span : startedSpans) {
assertTrue(span.hasEnded());
}
} catch (InterruptedException e) {
assert false;
} finally {
lock.unlock();
}

return spans;
}
}

private Session session;

/**
* Prepare OpenTelemetry configuration and run test with it.
*
* @param precisionLevel precision level of tracing for the tests.
* @param test test to run.
* @return collected spans.
*/
private Collection<ReadableSpan> collectSpans(
PrecisionLevel precisionLevel, BiConsumer<Tracer, TracingInfoFactory> test) {
final Resource serviceNameResource =
Resource.create(
Attributes.of(ResourceAttributes.SERVICE_NAME, "Scylla Java driver - test"));

final BookkeepingSpanProcessor collector = new BookkeepingSpanProcessor();

final SdkTracerProvider tracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(collector)
.setResource(Resource.getDefault().merge(serviceNameResource))
.build();
final OpenTelemetrySdk openTelemetry =
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();

final Tracer tracer = openTelemetry.getTracerProvider().get("this");
final OpenTelemetryTracingInfoFactory tracingInfoFactory =
new OpenTelemetryTracingInfoFactory(tracer, precisionLevel);
cluster().setTracingInfoFactory(tracingInfoFactory);
session = cluster().connect();

session.execute("USE " + keyspace);
session.execute("DROP TABLE IF EXISTS t");
session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)");
collector.getSpans().clear();

test.accept(tracer, tracingInfoFactory);

tracerProvider.close();
cluster().setTracingInfoFactory(new NoopTracingInfoFactory());

return collector.getSpans();
}

/** Basic test for creating spans. */
@Test(groups = "short")
public void simpleTracingTest() {
for (int i = 0; i < 10; i++) {
final Collection<ReadableSpan> spans =
collectSpans(
PrecisionLevel.NORMAL,
(tracer, tracingInfoFactory) -> {
Span userSpan = tracer.spanBuilder("user span").startSpan();
Scope scope = userSpan.makeCurrent();

session.execute("INSERT INTO t(k, v) VALUES (4, 2)");
session.execute("INSERT INTO t(k, v) VALUES (2, 1)");

scope.close();
userSpan.end();
});

// Retrieve span created directly by tracer.
final List<ReadableSpan> userSpans =
spans.stream()
.filter(span -> !span.getParentSpanContext().isValid())
.collect(Collectors.toList());
assertEquals(userSpans.size(), 1);
final ReadableSpan userSpan = userSpans.get(0);

for (ReadableSpan span : spans) {
assertTrue(span.getSpanContext().isValid());
assertTrue(
span.getSpanContext().equals(userSpan.getSpanContext())
|| span.getParentSpanContext().isValid());
}

// Retrieve spans representing requests.
final Collection<ReadableSpan> rootSpans =
spans.stream()
.filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext()))
.collect(Collectors.toList());
assertEquals(rootSpans.size(), 2);

rootSpans.stream()
.map(ReadableSpan::toSpanData)
.forEach(
spanData -> {
assertEquals(spanData.getName(), "request");
assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK);
Attributes tags = spanData.getAttributes();

assertEquals(
tags.get(AttributeKey.stringKey("db.scylla.consistency_level")), "ONE");
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.batch_size")), "1");
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.query_paged")), "false");
assertEquals(
tags.get(AttributeKey.stringKey("db.scylla.statement_type")), "regular");
assertEquals(
tags.get(AttributeKey.stringKey("db.scylla.load_balancing_policy")),
"PagingOptimizingLoadBalancingPolicy");
assertEquals(
tags.get(AttributeKey.stringKey("db.scylla.speculative_execution_policy")),
"NoSpeculativeExecutionPolicy");

// no such information in RegularStatement:
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.keyspace")), null); //
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.table")), null);
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.partition_key")), null);
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.db.operation")), null);
// no such information with PrecisionLevel.NORMAL:
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.statement")), null);
// no such information with operation INSERT:
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.rows_count")), null);

// TODO: more
});
}
}

/** Basic test for creating spans. */
@Test(groups = "short")
public void simpleRequestErrorTracingTest() {
for (int i = 0; i < 10; i++) {
final Collection<ReadableSpan> spans =
collectSpans(
PrecisionLevel.FULL,
(tracer, tracingInfoFactory) -> {
Span userSpan = tracer.spanBuilder("user span").startSpan();
Scope scope = userSpan.makeCurrent();

try {
session.execute("INSERT ONTO t(k, v) VALUES (4, 2)");
// ^ syntax error here
assert false; // exception should be thrown before this line is executed
} catch (SyntaxError error) {
// pass
}

try {
session.execute("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)");
// ^ too many values
assert false; // exception should be thrown before this line is executed
} catch (InvalidQueryException error) {
// pass
}

scope.close();
userSpan.end();
});

// Retrieve span created directly by tracer.
final List<ReadableSpan> userSpans =
spans.stream()
.filter(span -> !span.getParentSpanContext().isValid())
.collect(Collectors.toList());
assertEquals(userSpans.size(), 1);
final ReadableSpan userSpan = userSpans.get(0);

for (ReadableSpan span : spans) {
assertTrue(span.getSpanContext().isValid());
assertTrue(
span.getSpanContext().equals(userSpan.getSpanContext())
|| span.getParentSpanContext().isValid());
}

// Retrieve spans representing requests.
final Collection<ReadableSpan> rootSpans =
spans.stream()
.filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext()))
.collect(Collectors.toList());
assertEquals(rootSpans.size(), 2);

rootSpans.stream()
.map(ReadableSpan::toSpanData)
.forEach(
spanData -> {
assertEquals(spanData.getName(), "request");
assertEquals(spanData.getStatus().getStatusCode(), StatusCode.ERROR);
final String collectedStatement =
spanData.getAttributes().get(AttributeKey.stringKey("db.scylla.statement"));
assert collectedStatement.equals("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)")
|| collectedStatement.equals("INSERT ONTO t(k, v) VALUES (4, 2)")
: "Bad statement gathered";
});
}
}
}

0 comments on commit 5b6e60b

Please sign in to comment.