Skip to content

Commit

Permalink
fixed tracing and moving traces to their correct parent in many places
Browse files Browse the repository at this point in the history
  • Loading branch information
thjaeckle committed Nov 14, 2024
1 parent 405c3e1 commit fd06561
Show file tree
Hide file tree
Showing 29 changed files with 380 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.routing.ClusterRouterPool;
import org.apache.pekko.cluster.routing.ClusterRouterPoolSettings;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.persistence.RecoveryCompleted;
import org.apache.pekko.routing.Broadcast;
import org.apache.pekko.routing.ConsistentHashingPool;
import org.apache.pekko.routing.ConsistentHashingRouter;
import org.apache.pekko.routing.Pool;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand Down Expand Up @@ -108,14 +123,14 @@
import org.eclipse.ditto.connectivity.service.messaging.validation.DittoConnectivityCommandValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectionPubSub;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.pekko.PingCommand;
import org.eclipse.ditto.internal.utils.pekko.logging.CommonMdcEntryKey;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.pekko.PingCommand;
import org.eclipse.ditto.internal.utils.pekko.logging.CommonMdcEntryKey;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
Expand All @@ -124,27 +139,12 @@
import org.eclipse.ditto.internal.utils.persistentactors.commands.CommandStrategy;
import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext;
import org.eclipse.ditto.internal.utils.persistentactors.events.EventStrategy;
import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CreateSubscription;

import com.typesafe.config.Config;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.routing.ClusterRouterPool;
import org.apache.pekko.cluster.routing.ClusterRouterPoolSettings;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.persistence.RecoveryCompleted;
import org.apache.pekko.routing.Broadcast;
import org.apache.pekko.routing.ConsistentHashingPool;
import org.apache.pekko.routing.ConsistentHashingRouter;
import org.apache.pekko.routing.Pool;

/**
* Handles {@code *Connection} commands and manages the persistence of connection. The actual connection handling to the
* remote server is delegated to a child actor that uses a specific client (AMQP 1.0 or 0.9.1).
Expand Down Expand Up @@ -572,22 +572,29 @@ private void askSelfForRetrieveConnectionStatus(@Nullable final CharSequence cor

@Override
public void onMutation(final Command<?> command, final ConnectivityEvent<?> event,
final WithDittoHeaders response, final boolean becomeCreated, final boolean becomeDeleted) {
final WithDittoHeaders response, final boolean becomeCreated, final boolean becomeDeleted,
@Nullable final StartedSpan startedSpan) {
if (command instanceof StagedCommand stagedCommand) {
interpretStagedCommand(stagedCommand.withSenderUnlessDefined(getSender()));
if (startedSpan != null) {
startedSpan.finish();
}
} else {
super.onMutation(command, event, response, becomeCreated, becomeDeleted);
super.onMutation(command, event, response, becomeCreated, becomeDeleted, startedSpan);
}
}

@Override
public void onStagedMutation(final Command<?> command, final CompletionStage<ConnectivityEvent<?>> event,
final CompletionStage<WithDittoHeaders> response, final boolean becomeCreated,
final boolean becomeDeleted) {
final boolean becomeDeleted, @Nullable final StartedSpan startedSpan) {
if (command instanceof StagedCommand stagedCommand) {
interpretStagedCommand(stagedCommand.withSenderUnlessDefined(getSender()));
if (startedSpan != null) {
startedSpan.finish();
}
} else {
super.onStagedMutation(command, event, response, becomeCreated, becomeDeleted);
super.onStagedMutation(command, event, response, becomeCreated, becomeDeleted, startedSpan);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,30 @@

import javax.annotation.Nullable;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.WithEntity;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehaviorAndRequestCounting;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan;
Expand All @@ -53,18 +64,6 @@
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThings;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingsResponse;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

/**
* Acts as a client for {@code ThingsAggregatorActor} which responds
* to a {@link RetrieveThings} command via a {@link SourceRef} which is a pointer in the cluster emitting the retrieved
Expand Down Expand Up @@ -108,8 +107,8 @@ public static Props props(final ActorRef pubSubMediator) {
@Override
public Receive handleMessage() {
return ReceiveBuilder.create()
.match(RetrieveThings.class, rt -> handleRetrieveThings(rt, rt))
.match(SudoRetrieveThings.class, srt -> handleSudoRetrieveThings(srt, srt))
.match(RetrieveThings.class, this::handleRetrieveThings)
.match(SudoRetrieveThings.class, this::handleSudoRetrieveThings)
.matchAny(m -> {
log.warning("Got unknown message: {}", m);
unhandled(m);
Expand All @@ -122,52 +121,47 @@ public void serviceUnbind(final Control serviceUnbind) {
// nothing to do
}

private void handleRetrieveThings(final RetrieveThings rt, final Object msgToAsk) {
private void handleRetrieveThings(final RetrieveThings rt) {
final List<ThingId> thingIds = rt.getEntityIds();
log.withCorrelationId(rt)
.info("Got '{}' message. Retrieving requested '{}' Things..",
RetrieveThings.class.getSimpleName(), thingIds.size());

final ActorRef sender = getSender();
askTargetActor(rt, thingIds, msgToAsk, sender);
askTargetActor(rt, thingIds, getSender());
}

private void handleSudoRetrieveThings(final SudoRetrieveThings srt, final Object msgToAsk) {
private void handleSudoRetrieveThings(final SudoRetrieveThings srt) {
final List<ThingId> thingIds = srt.getThingIds();
log.withCorrelationId(srt)
.info("Got '{}' message. Retrieving requested '{}' Things..",
SudoRetrieveThings.class.getSimpleName(), thingIds.size());

final ActorRef sender = getSender();
askTargetActor(srt, thingIds, msgToAsk, sender);
askTargetActor(srt, thingIds, getSender());
}

private void askTargetActor(final Command<?> command, final List<ThingId> thingIds,
final Object msgToAsk, final ActorRef sender) {

final Object tracedMsgToAsk;
private void askTargetActor(final Command<?> command, final List<ThingId> thingIds, final ActorRef sender)
{
final DittoHeaders dittoHeaders = command.getDittoHeaders();
final var startedSpan = DittoTracing.newPreparedSpan(
command.getDittoHeaders(),
dittoHeaders,
TRACE_AGGREGATOR_RETRIEVE_THINGS
)
.tag("size", Integer.toString(thingIds.size()))
.start();
if (msgToAsk instanceof DittoHeadersSettable<?> dittoHeadersSettable) {
tracedMsgToAsk = dittoHeadersSettable.setDittoHeaders(
DittoHeaders.of(startedSpan.propagateContext(dittoHeadersSettable.getDittoHeaders()))
);
} else {
tracedMsgToAsk = msgToAsk;
}
final Command<?> tracedCommand = command.setDittoHeaders(
DittoHeaders.of(startedSpan.propagateContext(
dittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
.build()
))
);

final DistributedPubSubMediator.Publish pubSubMsg =
DistPubSubAccess.publishViaGroup(command.getType(), tracedMsgToAsk);
DistPubSubAccess.publishViaGroup(tracedCommand.getType(), tracedCommand);

withRequestCounting(
Patterns.ask(pubSubMediator, pubSubMsg, Duration.ofSeconds(ASK_TIMEOUT))
.thenAccept(response -> {
if (response instanceof SourceRef) {
handleSourceRef((SourceRef<?>) response, thingIds, command, sender, startedSpan);
handleSourceRef((SourceRef<?>) response, thingIds, tracedCommand, sender, startedSpan);
} else if (response instanceof DittoRuntimeException dre) {
startedSpan.tagAsFailed(dre).finish();
sender.tell(response, getSelf());
Expand All @@ -177,7 +171,7 @@ private void askTargetActor(final Command<?> command, final List<ThingId> thingI
response.getClass().getSimpleName(), response);
final DittoInternalErrorException responseEx =
DittoInternalErrorException.newBuilder()
.dittoHeaders(command.getDittoHeaders())
.dittoHeaders(tracedCommand.getDittoHeaders())
.build();
startedSpan.tagAsFailed(responseEx).finish();
sender.tell(responseEx, getSelf());
Expand All @@ -187,7 +181,8 @@ private void askTargetActor(final Command<?> command, final List<ThingId> thingI
}

private void handleSourceRef(final SourceRef<?> sourceRef, final List<ThingId> thingIds,
final Command<?> originatingCommand, final ActorRef originatingSender, final StartedSpan startedSpan) {
final Command<?> originatingCommand, final ActorRef originatingSender, final StartedSpan startedSpan)
{
final Function<Jsonifiable<?>, PlainJson> thingPlainJsonSupplier;
final Function<List<PlainJson>, CommandResponse<?>> overallResponseSupplier;
final UnaryOperator<List<PlainJson>> plainJsonSorter = supplyPlainJsonSorter(thingIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.apache.pekko.http.javadsl.model.HttpHeader;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.http.javadsl.server.Complete;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.http.javadsl.server.RouteResult;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.common.HttpStatusCodeOutOfRangeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
Expand All @@ -43,14 +50,6 @@
import org.eclipse.ditto.internal.utils.tracing.span.SpanTagKey;
import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan;

import org.apache.pekko.http.javadsl.model.HttpHeader;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.http.javadsl.server.Complete;
import org.apache.pekko.http.javadsl.server.Route;
import org.apache.pekko.http.javadsl.server.RouteResult;

/**
* Custom Pekko Http directive tracing the request.
*/
Expand Down Expand Up @@ -116,7 +115,7 @@ private static SpanOperationName resolveSpanOperationName(final HttpRequest http

private static URI getTraceUri(final HttpRequest httpRequest) {
final var traceInformationGenerator = TraceInformationGenerator.getInstance();
final var traceInformation = traceInformationGenerator.apply(String.valueOf(getRelativeUri(httpRequest)));
final var traceInformation = traceInformationGenerator.apply(getRelativeUriPath(httpRequest));
return traceInformation.getTraceUri();
}

Expand All @@ -125,6 +124,12 @@ private static Uri getRelativeUri(final HttpRequest httpRequest) {
return uri.toRelative();
}

private static String getRelativeUriPath(final HttpRequest httpRequest) {
final var uri = httpRequest.getUri();
final var relativeUri = uri.toRelative();
return relativeUri.path();
}

private static String getRequestMethodName(final HttpRequest httpRequest) {
final var httpMethod = httpRequest.method();
return httpMethod.name();
Expand Down Expand Up @@ -246,10 +251,7 @@ private static void addRequestResponseTags(
@Nullable final CharSequence correlationId
) {
startedSpan.tag(SpanTagKey.REQUEST_METHOD_NAME.getTagForValue(getRequestMethodName(httpRequest)));
@Nullable final var relativeRequestUri = tryToGetRelativeRequestUri(httpRequest, correlationId);
if (null != relativeRequestUri) {
startedSpan.tag(SpanTagKey.REQUEST_URI.getTagForValue(relativeRequestUri));
}
startedSpan.tag(SpanTagKey.REQUEST_URI.getTagForValue(URI.create(getRelativeUri(httpRequest).toString())));
@Nullable final var httpStatus = tryToGetResponseHttpStatus(httpResponse, correlationId);
if (null != httpStatus) {
startedSpan.tag(SpanTagKey.HTTP_STATUS.getTagForValue(httpStatus));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.Collection;
import java.util.concurrent.Executor;

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationChain;
import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationFailureAggregator;
Expand All @@ -32,7 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.lang.Nullable;
import com.typesafe.config.Config;

/**
Expand Down
4 changes: 4 additions & 0 deletions internal/models/signalenrichment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-models-signal</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-tracing</artifactId>
</dependency>

<!-- test-only -->
<dependency>
Expand Down
Loading

0 comments on commit fd06561

Please sign in to comment.