From f57f544ede2e54cc5a618122a344092333ccf22d Mon Sep 17 00:00:00 2001 From: Crypt Keeper <64215+codefromthecrypt@users.noreply.github.com> Date: Tue, 9 Jan 2024 08:24:05 +0800 Subject: [PATCH] kafka-streams: cleans up after deprecation removal (#1404) Signed-off-by: Adrian Cole --- instrumentation/kafka-streams/README.md | 69 ++---------------- .../kafka/streams/KafkaStreamsTracing.java | 40 ++++------- ...sor.java => TracingFixedKeyProcessor.java} | 10 +-- ... => TracingFixedKeyProcessorSupplier.java} | 18 +++-- ...V2Processor.java => TracingProcessor.java} | 10 +-- ...ier.java => TracingProcessorSupplier.java} | 17 ++--- .../brave/kafka/streams/KafkaHeadersTest.java | 8 +-- .../brave/kafka/streams/KafkaStreamsTest.java | 70 ++++++++----------- .../streams/KafkaStreamsTracingTest.java | 68 ++++++------------ .../TracingKafkaClientSupplierTests.java | 3 +- 10 files changed, 95 insertions(+), 218 deletions(-) rename instrumentation/kafka-streams/src/main/java/brave/kafka/streams/{TracingV2FixedKeyProcessor.java => TracingFixedKeyProcessor.java} (82%) rename instrumentation/kafka-streams/src/main/java/brave/kafka/streams/{TracingV2FixedKeyProcessorSupplier.java => TracingFixedKeyProcessorSupplier.java} (61%) rename instrumentation/kafka-streams/src/main/java/brave/kafka/streams/{TracingV2Processor.java => TracingProcessor.java} (82%) rename instrumentation/kafka-streams/src/main/java/brave/kafka/streams/{TracingV2ProcessorSupplier.java => TracingProcessorSupplier.java} (62%) diff --git a/instrumentation/kafka-streams/README.md b/instrumentation/kafka-streams/README.md index 9fc36dffd..25774b5e0 100644 --- a/instrumentation/kafka-streams/README.md +++ b/instrumentation/kafka-streams/README.md @@ -1,9 +1,7 @@ # Brave Kafka Streams instrumentation [EXPERIMENTAL] Add decorators for Kafka Streams to enable tracing. -* `TracingKafkaClientSupplier` a client supplier which traces poll and send operations. -* `TracingProcessorSupplier` completes a span on `process` -* `TracingTransformerSupplier` completes a span on `transform` +* `KafkaStreamsTracing` completes a span on `process` or `processValues` This does not trace all operations by default. See [RATIONALE.md] for why. @@ -19,12 +17,10 @@ kafkaStreamsTracing = KafkaStreamsTracing.create(tracing); ``` [KIP-820](https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API) -introduces new processor APIs to the Kafka Streams DSL. -The following sections show how to instrument applications with the latest and previous APIs. +introduces new processor APIs to the Kafka Streams DSL. You must use version >= v3.4.0 +to instrument applications. -## Kafka Streams >= v3.4.0 - -To trace a processor in your application use the `TracingV2ProcessorSupplier`, provided by instrumentation API: +To trace a processor in your application use `kafkaStreamsTracing.process` like so: ```java builder.stream(inputTopic) @@ -33,68 +29,15 @@ builder.stream(inputTopic) customProcessor)); ``` -or the `TracingV2FixedKeyProcessorSupplier`, provided by instrumentation API: +or use `kafkaStreamsTracing.processValues` like so: ```java builder.stream(inputTopic) .processValues(kafkaStreamsTracing.processValues( - "process", + "processValues", customProcessor)); ``` -## Kafka Streams < v3.4.0 - -To trace a processor in your application use `TracingProcessorSupplier`, provided by instrumentation API: - -```java -builder.stream(inputTopic) - .processor(kafkaStreamsTracing.processor( - "process", - customProcessor)); -``` - -To trace a transformer, use `TracingTransformerSupplier`, `TracingValueTransformerSupplier`, or `TracingValueTransformerWithValueSupplier` provided by instrumentation API: - -```java -builder.stream(inputTopic) - .transform(kafkaStreamsTracing.transformer( - "transformer-1", - customTransformer)) - .to(outputTopic); -``` - -```java -builder.stream(inputTopic) - .transformValue(kafkaStreamsTracing.valueTransformer( - "transform-value", - customTransformer)) - .to(outputTopic); -``` - -```java -builder.stream(inputTopic) - .transformValueWithKey(kafkaStreamsTracing.valueTransformerWithKey( - "transform-value-with-key", - customTransformer)) - .to(outputTopic); -``` - -Additional transformers have been introduced to cover most common Kafka Streams DSL operations (e.g. `map`, `mapValues`, `foreach`, `peek`, `filter`). - -```java -builder.stream(inputTopic) - .transform(kafkaStreamsTracing.map("map", mapper)) - .to(outputTopic); -``` - -For flat operations like flatMap, the `flatTransform` method can be used: - -```java -builder.stream(inputTopic) - .flatTransform(kafkaStreamsTracing.flatMap("flat-map", mapper)) - .to(outputTopic); -``` - For more details, [see here](https://github.com/openzipkin/brave/blob/master/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java). To create a Kafka Streams with Tracing Client Supplier enabled, pass your topology and configuration like this: diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java index 8afa696f5..e7824fcfd 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java @@ -33,7 +33,9 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessingContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; /** Use this class to decorate Kafka Stream Topologies and enable Tracing. */ public final class KafkaStreamsTracing { @@ -85,7 +87,7 @@ public static Builder newBuilder(MessagingTracing messagingTracing) { * Provides a {@link KafkaClientSupplier} with tracing enabled, hence Producer and Consumer * operations will be traced. *

- * This is mean to be used in scenarios {@link KafkaStreams} creation is not controlled by the + * This is meant to be used in scenarios {@link KafkaStreams} creation is not controlled by the * user but framework (e.g. Spring Kafka Streams) creates it, and {@link KafkaClientSupplier} is * accepted. */ @@ -113,7 +115,7 @@ public KafkaStreams kafkaStreams(Topology topology, Properties streamsConfig) { } /** - * Create a tracing-decorated {@link org.apache.kafka.streams.processor.api.ProcessorSupplier} + * Create a tracing-decorated {@link ProcessorSupplier} * *

Simple example using Kafka Streams DSL: *

{@code
@@ -124,13 +126,13 @@ public KafkaStreams kafkaStreams(Topology topology, Properties streamsConfig) {
    *
    * @see TracingKafkaClientSupplier
    */
-  public  org.apache.kafka.streams.processor.api.ProcessorSupplier process(String spanName,
-    org.apache.kafka.streams.processor.api.ProcessorSupplier processorSupplier) {
-    return new TracingV2ProcessorSupplier<>(this, spanName, processorSupplier);
+  public  ProcessorSupplier process(String spanName,
+    ProcessorSupplier processorSupplier) {
+    return new TracingProcessorSupplier<>(this, spanName, processorSupplier);
   }
 
   /**
-   * Create a tracing-decorated {@link org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier}
+   * Create a tracing-decorated {@link FixedKeyProcessorSupplier}
    *
    * 

Simple example using Kafka Streams DSL: *

{@code
@@ -141,9 +143,9 @@ public  org.apache.kafka.streams.processor.api.ProcessorSu
    *
    * @see TracingKafkaClientSupplier
    */
-  public  org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier processValues(String spanName,
-    org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier processorSupplier) {
-    return new TracingV2FixedKeyProcessorSupplier<>(this, spanName, processorSupplier);
+  public  FixedKeyProcessorSupplier processValues(String spanName,
+    FixedKeyProcessorSupplier processorSupplier) {
+    return new TracingFixedKeyProcessorSupplier<>(this, spanName, processorSupplier);
   }
 
   static void addTags(ProcessorContext processorContext, SpanCustomizer result) {
@@ -151,24 +153,12 @@ static void addTags(ProcessorContext processorContext, SpanCustomizer result) {
     result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processorContext.taskId().toString());
   }
 
-  static void addTags(org.apache.kafka.streams.processor.api.ProcessingContext processingContext, SpanCustomizer result) {
-    result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG, processingContext.applicationId());
+  static void addTags(ProcessingContext processingContext, SpanCustomizer result) {
+    result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG,
+      processingContext.applicationId());
     result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processingContext.taskId().toString());
   }
 
-  Span nextSpan(ProcessorContext context) {
-    TraceContextOrSamplingFlags extracted = extractor.extract(context.headers());
-    // Clear any propagation keys present in the headers
-    if (!extracted.equals(emptyExtraction)) {
-      clearHeaders(context.headers());
-    }
-    Span result = tracer.nextSpan(extracted);
-    if (!result.isNoop()) {
-      addTags(context, result);
-    }
-    return result;
-  }
-
   Span nextSpan(ProcessingContext context, Headers headers) {
     TraceContextOrSamplingFlags extracted = extractor.extract(headers);
     // Clear any propagation keys present in the headers
@@ -183,7 +173,7 @@ Span nextSpan(ProcessingContext context, Headers headers) {
   }
 
   // We can't just skip clearing headers we use because we might inject B3 single, yet have stale B3
-  // multi, or visa versa.
+  // multi, or vice versa.
   void clearHeaders(Headers headers) {
     // Headers::remove creates and consumes an iterator each time. This does one loop instead.
     for (Iterator
i = headers.iterator(); i.hasNext(); ) { diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessor.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessor.java similarity index 82% rename from instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessor.java rename to instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessor.java index 8bf8fcae4..8c85f2a82 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessor.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessor.java @@ -21,11 +21,7 @@ import static brave.internal.Throwables.propagateIfFatal; -/* - * Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes - * and those that implement the new kafka streams API introduced in version 3.4.0 - */ -class TracingV2FixedKeyProcessor implements FixedKeyProcessor { +class TracingFixedKeyProcessor implements FixedKeyProcessor { final KafkaStreamsTracing kafkaStreamsTracing; final Tracer tracer; final String spanName; @@ -33,8 +29,8 @@ class TracingV2FixedKeyProcessor implements FixedKeyProcessor delegateProcessor) { + TracingFixedKeyProcessor(KafkaStreamsTracing kafkaStreamsTracing, + String spanName, FixedKeyProcessor delegateProcessor) { this.kafkaStreamsTracing = kafkaStreamsTracing; this.tracer = kafkaStreamsTracing.tracer; this.spanName = spanName; diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessorSupplier.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessorSupplier.java similarity index 61% rename from instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessorSupplier.java rename to instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessorSupplier.java index d2cbe7001..ce12441e1 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2FixedKeyProcessorSupplier.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingFixedKeyProcessorSupplier.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2023 The OpenZipkin Authors + * Copyright 2013-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,18 +16,15 @@ import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; -/* - * Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes - * and those that implement the new kafka streams API introduced in version 3.4.0 - */ -class TracingV2FixedKeyProcessorSupplier implements FixedKeyProcessorSupplier { +class TracingFixedKeyProcessorSupplier + implements FixedKeyProcessorSupplier { final KafkaStreamsTracing kafkaStreamsTracing; final String spanName; final FixedKeyProcessorSupplier delegateProcessorSupplier; - TracingV2FixedKeyProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing, - String spanName, - FixedKeyProcessorSupplier processorSupplier) { + TracingFixedKeyProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing, + String spanName, + FixedKeyProcessorSupplier processorSupplier) { this.kafkaStreamsTracing = kafkaStreamsTracing; this.spanName = spanName; this.delegateProcessorSupplier = processorSupplier; @@ -35,6 +32,7 @@ class TracingV2FixedKeyProcessorSupplier implements FixedKeyProc /** This wraps process method to enable tracing. */ @Override public FixedKeyProcessor get() { - return new TracingV2FixedKeyProcessor<>(kafkaStreamsTracing, spanName, delegateProcessorSupplier.get()); + return new TracingFixedKeyProcessor<>(kafkaStreamsTracing, spanName, + delegateProcessorSupplier.get()); } } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2Processor.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java similarity index 82% rename from instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2Processor.java rename to instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java index 89bf1680f..378066366 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2Processor.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java @@ -21,11 +21,7 @@ import static brave.internal.Throwables.propagateIfFatal; -/* - * Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes - * and those that implement the new kafka streams API introduced in version 3.4.0 - */ -class TracingV2Processor implements Processor { +class TracingProcessor implements Processor { final KafkaStreamsTracing kafkaStreamsTracing; final Tracer tracer; final String spanName; @@ -33,8 +29,8 @@ class TracingV2Processor implements Processor delegateProcessor) { + TracingProcessor(KafkaStreamsTracing kafkaStreamsTracing, + String spanName, Processor delegateProcessor) { this.kafkaStreamsTracing = kafkaStreamsTracing; this.tracer = kafkaStreamsTracing.tracer; this.spanName = spanName; diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2ProcessorSupplier.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorSupplier.java similarity index 62% rename from instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2ProcessorSupplier.java rename to instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorSupplier.java index bc96efd7a..982af06ff 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingV2ProcessorSupplier.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorSupplier.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2023 The OpenZipkin Authors + * Copyright 2013-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,18 +16,15 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -/* - * Note. the V2 naming convention has been introduced here to help distinguish between the existing TracingProcessor classes - * and those that implement the new kafka streams API introduced in version 3.4.0 - */ -class TracingV2ProcessorSupplier implements ProcessorSupplier { +class TracingProcessorSupplier + implements ProcessorSupplier { final KafkaStreamsTracing kafkaStreamsTracing; final String spanName; final ProcessorSupplier delegateProcessorSupplier; - TracingV2ProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing, - String spanName, - ProcessorSupplier processorSupplier) { + TracingProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing, + String spanName, + ProcessorSupplier processorSupplier) { this.kafkaStreamsTracing = kafkaStreamsTracing; this.spanName = spanName; this.delegateProcessorSupplier = processorSupplier; @@ -35,6 +32,6 @@ class TracingV2ProcessorSupplier implements ProcessorSuppl /** This wraps process method to enable tracing. */ @Override public Processor get() { - return new TracingV2Processor<>(kafkaStreamsTracing, spanName, delegateProcessorSupplier.get()); + return new TracingProcessor<>(kafkaStreamsTracing, spanName, delegateProcessorSupplier.get()); } } diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaHeadersTest.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaHeadersTest.java index 80c23acd2..e52ca7554 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaHeadersTest.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaHeadersTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2023 The OpenZipkin Authors + * Copyright 2013-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,7 +25,7 @@ public class KafkaHeadersTest { record.headers().add("b3", new byte[] {'1'}); assertThat(KafkaHeaders.lastStringHeader(record.headers(), "b3")) - .isEqualTo("1"); + .isEqualTo("1"); } @Test void lastStringHeader_null() { @@ -36,7 +36,7 @@ public class KafkaHeadersTest { KafkaHeaders.replaceHeader(record.headers(), "b3", "1"); assertThat(record.headers().lastHeader("b3").value()) - .containsExactly('1'); + .containsExactly('1'); } @Test void replaceHeader_replace() { @@ -44,6 +44,6 @@ public class KafkaHeadersTest { KafkaHeaders.replaceHeader(record.headers(), "b3", "1"); assertThat(record.headers().lastHeader("b3").value()) - .containsExactly('1'); + .containsExactly('1'); } } diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTest.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTest.java index a71de05da..b7b761641 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTest.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTest.java @@ -20,11 +20,11 @@ import brave.propagation.StrictCurrentTraceContext; import brave.propagation.TraceContext; import brave.test.TestSpanHandler; -import java.util.function.Function; import java.util.function.Supplier; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import static brave.test.ITRemote.BAGGAGE_FIELD; @@ -42,49 +42,35 @@ class KafkaStreamsTest { StrictCurrentTraceContext currentTraceContext = StrictCurrentTraceContext.create(); TestSpanHandler spans = new TestSpanHandler(); Tracing tracing = Tracing.newBuilder() - .currentTraceContext(currentTraceContext) - .addSpanHandler(spans) - .propagationFactory(BaggagePropagation.newFactoryBuilder(B3Propagation.FACTORY) - .add(BaggagePropagationConfig.SingleBaggageField.newBuilder(BAGGAGE_FIELD) - .addKeyName(BAGGAGE_FIELD_KEY) - .build()).build()) - .build(); + .currentTraceContext(currentTraceContext) + .addSpanHandler(spans) + .propagationFactory(BaggagePropagation.newFactoryBuilder(B3Propagation.FACTORY) + .add(BaggagePropagationConfig.SingleBaggageField.newBuilder(BAGGAGE_FIELD) + .addKeyName(BAGGAGE_FIELD_KEY) + .build()).build()) + .build(); KafkaStreamsTracing kafkaStreamsTracing = KafkaStreamsTracing.create(tracing); TraceContext parent = tracing.tracer().newTrace().context(); - Function processorContextSupplier = - (Headers headers) -> - { - ProcessorContext processorContext = mock(ProcessorContext.class); - when(processorContext.applicationId()).thenReturn(TEST_APPLICATION_ID); - when(processorContext.topic()).thenReturn(TEST_TOPIC); - when(processorContext.taskId()).thenReturn(new TaskId(0, 0)); - when(processorContext.headers()).thenReturn(headers); - return processorContext; - }; + Supplier> processorV2ContextSupplier = () -> { + ProcessorContext processorContext = + mock(ProcessorContext.class); + when(processorContext.applicationId()).thenReturn(TEST_APPLICATION_ID); + when(processorContext.taskId()).thenReturn(new TaskId(0, 0)); + return processorContext; + }; - Supplier> processorV2ContextSupplier = - () -> - { - org.apache.kafka.streams.processor.api.ProcessorContext processorContext = mock(org.apache.kafka.streams.processor.api.ProcessorContext.class); - when(processorContext.applicationId()).thenReturn(TEST_APPLICATION_ID); - when(processorContext.taskId()).thenReturn(new TaskId(0, 0)); - return processorContext; - }; + ProcessorSupplier fakeV2ProcessorSupplier = + kafkaStreamsTracing.process("forward-1", () -> new Processor<>() { + ProcessorContext context; - org.apache.kafka.streams.processor.api.ProcessorSupplier fakeV2ProcessorSupplier = - kafkaStreamsTracing.process( - "forward-1", () -> - new org.apache.kafka.streams.processor.api.Processor() { - org.apache.kafka.streams.processor.api.ProcessorContext context; - @Override - public void init(org.apache.kafka.streams.processor.api.ProcessorContext context) { - this.context = context; - } - @Override - public void process(Record record) { - context.forward(record); - } - }); + @Override public void init(ProcessorContext context) { + this.context = context; + } + + @Override public void process(Record record) { + context.forward(record); + } + }); } diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTracingTest.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTracingTest.java index 76655849b..47dbbe581 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTracingTest.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/KafkaStreamsTracingTest.java @@ -18,7 +18,9 @@ import java.util.Date; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import org.junit.jupiter.api.Test; @@ -29,78 +31,48 @@ public class KafkaStreamsTracingTest extends KafkaStreamsTest { @Test void nextSpan_uses_current_context() { - ProcessorContext fakeProcessorContext = processorContextSupplier.apply(new RecordHeaders()); - Span child; - try (Scope scope = tracing.currentTraceContext().newScope(parent)) { - child = kafkaStreamsTracing.nextSpan(fakeProcessorContext); - } - child.finish(); - - assertThat(child.context().parentIdString()) - .isEqualTo(parent.spanIdString()); - } - - @Test void nextSpanWithHeaders_uses_current_context() { - org.apache.kafka.streams.processor.api.ProcessorContext fakeProcessorContext = processorV2ContextSupplier.get(); + ProcessorContext fakeProcessorContext = processorV2ContextSupplier.get(); Span child; try (Scope scope = tracing.currentTraceContext().newScope(parent)) { child = kafkaStreamsTracing.nextSpan(fakeProcessorContext, new RecordHeaders()); } child.finish(); - assertThat(child.context().parentIdString()) - .isEqualTo(parent.spanIdString()); + assertThat(child.context().parentIdString()).isEqualTo(parent.spanIdString()); } @Test void nextSpan_should_create_span_if_no_headers() { - ProcessorContext fakeProcessorContext = processorContextSupplier.apply(new RecordHeaders()); - assertThat(kafkaStreamsTracing.nextSpan(fakeProcessorContext)).isNotNull(); - } - - @Test void nextSpanWithHeaders_should_create_span_if_no_headers() { - org.apache.kafka.streams.processor.api.ProcessorContext fakeProcessorContext = processorV2ContextSupplier.get(); + ProcessorContext fakeProcessorContext = processorV2ContextSupplier.get(); assertThat(kafkaStreamsTracing.nextSpan(fakeProcessorContext, new RecordHeaders())).isNotNull(); } @Test void nextSpan_should_tag_app_id_and_task_id() { - ProcessorContext fakeProcessorContext = processorContextSupplier.apply(new RecordHeaders()); - kafkaStreamsTracing.nextSpan(fakeProcessorContext).start().finish(); - - assertThat(spans.get(0).tags()) - .containsOnly( - entry("kafka.streams.application.id", TEST_APPLICATION_ID), - entry("kafka.streams.task.id", TEST_TASK_ID)); - } - - @Test void nextSpanWithHeaders_should_tag_app_id_and_task_id() { - org.apache.kafka.streams.processor.api.ProcessorContext fakeProcessorContext = processorV2ContextSupplier.get(); + ProcessorContext fakeProcessorContext = processorV2ContextSupplier.get(); kafkaStreamsTracing.nextSpan(fakeProcessorContext, new RecordHeaders()).start().finish(); - assertThat(spans.get(0).tags()) - .containsOnly( - entry("kafka.streams.application.id", TEST_APPLICATION_ID), - entry("kafka.streams.task.id", TEST_TASK_ID)); + assertThat(spans.get(0).tags()).containsOnly( + entry("kafka.streams.application.id", TEST_APPLICATION_ID), + entry("kafka.streams.task.id", TEST_TASK_ID)); } @Test void newProcessorSupplier_should_tag_app_id_and_task_id() { - org.apache.kafka.streams.processor.api.Processor processor = fakeV2ProcessorSupplier.get(); + Processor processor = + fakeV2ProcessorSupplier.get(); processor.init(processorV2ContextSupplier.get()); processor.process(new Record<>(TEST_KEY, TEST_VALUE, new Date().getTime())); - assertThat(spans.get(0).tags()) - .containsOnly( - entry("kafka.streams.application.id", TEST_APPLICATION_ID), - entry("kafka.streams.task.id", TEST_TASK_ID)); + assertThat(spans.get(0).tags()).containsOnly( + entry("kafka.streams.application.id", TEST_APPLICATION_ID), + entry("kafka.streams.task.id", TEST_TASK_ID)); } @Test void newProcessorSupplier_should_add_baggage_field() { - org.apache.kafka.streams.processor.api.ProcessorSupplier processorSupplier = - kafkaStreamsTracing.process( - "forward-1", () -> - (org.apache.kafka.streams.processor.api.Processor) record -> - assertThat(BAGGAGE_FIELD.getValue(currentTraceContext.get())).isEqualTo("user1")); + ProcessorSupplier + processorSupplier = kafkaStreamsTracing.process("forward-1", () -> record -> + assertThat(BAGGAGE_FIELD.getValue(currentTraceContext.get())) + .isEqualTo("user1")); Headers headers = new RecordHeaders().add(BAGGAGE_FIELD_KEY, "user1".getBytes()); - org.apache.kafka.streams.processor.api.Processor processor = processorSupplier.get(); + Processor processor = processorSupplier.get(); processor.init(processorV2ContextSupplier.get()); processor.process(new Record<>(TEST_KEY, TEST_VALUE, new Date().getTime(), headers)); } diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/TracingKafkaClientSupplierTests.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/TracingKafkaClientSupplierTests.java index c1990417f..59da1626c 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/TracingKafkaClientSupplierTests.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/TracingKafkaClientSupplierTests.java @@ -25,7 +25,7 @@ class TracingKafkaClientSupplierTests { - final Map props = Collections.singletonMap("bootstrap.servers","localhost:9092"); + final Map props = Collections.singletonMap("bootstrap.servers", "localhost:9092"); @Test void shouldReturnNewAdmin() { TracingKafkaClientSupplier supplier = new TracingKafkaClientSupplier(null); @@ -61,5 +61,4 @@ public Consumer getGlobalConsumer(Map map) { return null; } } - }