Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for propagating OpenTelemetry baggage #2107

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry;
using OpenTelemetry.Trace;
using Proto.Extensions;
using Proto.Mailbox;
Expand Down Expand Up @@ -109,7 +110,7 @@ public override Task Receive(MessageEnvelope envelope) =>
OpenTelemetryMethodsDecorators.Receive(Source, envelope, _receiveActivitySetup,
() => base.Receive(envelope));

public override void Respond(object message)=>
public override void Respond(object message) =>
OpenTelemetryMethodsDecorators.Respond(message,
() => base.Respond(message));

Expand Down Expand Up @@ -275,7 +276,7 @@ internal static void Forward(string source, PID target, object message, Activity
throw;
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static void Respond(object message,
Action respond)
Expand Down Expand Up @@ -312,6 +313,12 @@ internal static async Task Receive(string source, MessageEnvelope envelope, Acti

var propagationContext = envelope.Header.ExtractPropagationContext();

bool hasBaggage = propagationContext.Baggage.Count > 0 || Baggage.Current.Count > 0;
if (hasBaggage)
{
Baggage.Current = propagationContext.Baggage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR on this?
It extracts baggage via header values.
then set the Baggage.Current to an actual baggage.
and then create a new activity via BuildStartedActivity,
and BuildStartedActivity extracts data from Baggage.Current into the new activity?

}

using var activity =
OpenTelemetryHelpers.BuildStartedActivity(propagationContext.ActivityContext, source, nameof(Receive),
message, receiveActivitySetup);
Expand Down Expand Up @@ -339,4 +346,4 @@ internal static async Task Receive(string source, MessageEnvelope envelope, Acti
throw;
}
}
}
}
79 changes: 77 additions & 2 deletions tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using OpenTelemetry;
using OpenTelemetry.Trace;
using Proto.Future;
using Xunit;
Expand All @@ -12,6 +13,11 @@ namespace Proto.OpenTelemetry.Tests;

public class OpenTelemetryTracingTests : IClassFixture<ActivityFixture>
{
private static readonly Baggage TestBaggage = Baggage.Create(new Dictionary<string, string?>
{
{"baggageKey", "baggageValue"}
});

private static readonly Props ProxyTraceActorProps = Props.FromProducer(() => new TraceTestActor()).WithTracing();

private static readonly Props InnerTraceActorProps = Props.FromFunc(context =>
Expand All @@ -22,7 +28,7 @@ public class OpenTelemetryTracingTests : IClassFixture<ActivityFixture>

if (context.Sender is not null)
{
context.Respond(new TraceResponse());
context.Respond(GetTraceResponse() );
}
}

Expand All @@ -31,6 +37,13 @@ public class OpenTelemetryTracingTests : IClassFixture<ActivityFixture>
)
.WithTracing();

static TraceResponse GetTraceResponse()
{
return Baggage.Current.Count > 0
? new TraceResponse(Baggage.Current)
: new TraceResponse();
}

private static readonly ActivitySource TestSource = new("Proto.Actor.Tests");

private readonly ActivityFixture _fixture;
Expand Down Expand Up @@ -122,6 +135,68 @@ private void TracesPropagateCorrectly(ActivitySpanId outerSpanId, ActivityTraceI
inner.Should().NotBeNull();
}


[Fact]
public async Task TracesPropagateCorrectlyWithBaggageForRequestAsync() =>
await VerifyTrace(async (rootContext, target) =>
{
Baggage.Current = TestBaggage;
var response = await rootContext.RequestAsync<TraceResponse>(target, new TraceMe(SendAs.RequestAsync));
response.Should().BeEquivalentTo(new TraceResponse(TestBaggage));
}
);

[Fact]
public async Task TracesPropagateCorrectlyWithBaggageForRequest() =>
await VerifyTrace(async (rootContext, target) =>
{
Baggage.Current = TestBaggage;
rootContext.Request(target, new TraceMe(SendAs.Request));
await Task.Delay(100);
}
);

[Fact]
public async Task TracesPropagateCorrectlyWithBaggageForRequestWithForward() =>
await VerifyTrace(async (rootContext, target) =>
{
Baggage.Current = TestBaggage;
var response = await rootContext.RequestAsync<TraceResponse>(target, new TraceMe(SendAs.Forward));
response.Should().BeEquivalentTo(new TraceResponse(TestBaggage));
}
);

[Fact]
public async Task TracesPropagateCorrectlyWithBaggageForRequestWithSender() =>
await VerifyTrace(async (rootContext, target) =>
{
Baggage.Current = TestBaggage;
var future = new FutureProcess(rootContext.System);
rootContext.Request(target, new TraceMe(SendAs.Request), future.Pid);
var response = (MessageEnvelope)await future.Task;
response.Message.Should().Be(new TraceResponse(TestBaggage));
}
);

[Fact]
public async Task TracesPropagateCorrectlyWithBaggageForRequestWithSenderWithAdditionalMiddleware() =>
await VerifyTrace(async (tracedRoot, target) =>
{
var middleContext = tracedRoot.WithSenderMiddleware(next => async (context, _, envelope) =>
{
var updatedEnvelope = envelope.WithHeader("test", "value");
await next(context, target, updatedEnvelope);
});
var future = new FutureProcess(middleContext.System);
Baggage.Current = TestBaggage;
middleContext.Request(target, new TraceMe(SendAs.Request), future.Pid);
var response = (MessageEnvelope)await future.Task;
response.Message.Should().Be(new TraceResponse(TestBaggage));
}
);

// End

private async Task VerifyTrace(Func<IRootContext, PID, Task> action)
{
var tracedRoot = new ActorSystem().Root.WithTracing();
Expand Down Expand Up @@ -175,7 +250,7 @@ private enum SendAs

private record TraceMe(SendAs Method);

private record TraceResponse;
private record TraceResponse(Baggage? Baggage = null);

public class TraceTestActor : IActor
{
Expand Down
Loading