Skip to content

Commit

Permalink
(#60) message Brokers RabbitMQ: add abstract plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
SaintAngeLs committed Sep 26, 2024
1 parent d2a3424 commit 9cb43fa
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="../../../Paralax.MessageBrokers.RabbitMQ/src/Paralax.MessageBrokers.RabbitMQ/Paralax.MessageBrokers.RabbitMQ.csproj" />
<ProjectReference Include="../../../Paralax.Tracing.Jaeger/src/Paralax.Tracing.Jaeger/Paralax.Tracing.Jaeger.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.002.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Paralax.Tracing.Jaeger.RabbitMQ", "Paralax.Tracing.Jaeger.RabbitMQ.csproj", "{BDC7C53E-ABC4-4866-AAD4-796538039661}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{BDC7C53E-ABC4-4866-AAD4-796538039661}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BDC7C53E-ABC4-4866-AAD4-796538039661}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BDC7C53E-ABC4-4866-AAD4-796538039661}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BDC7C53E-ABC4-4866-AAD4-796538039661}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {539599BC-9DEA-456B-961A-D7A3673CA653}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System;
using System.Text;
using System.Threading.Tasks;
using OpenTelemetry.Trace;
using RabbitMQ.Client.Events;
using OpenTracing;
using OpenTracing.Tag;
using Paralax.MessageBrokers.RabbitMQ;

namespace Paralax.Tracing.Jaeger.RabbitMQ.Plugins
{
internal sealed class JaegerPlugin : RabbitMqPlugin
{
private readonly ITracer _tracer;
private readonly string _spanContextHeader;

public JaegerPlugin(ITracer tracer, RabbitMqOptions options)
{
_tracer = tracer ?? throw new ArgumentNullException(nameof(tracer));
_spanContextHeader = options.GetSpanContextHeader() ?? "span-context";
}

public override async Task HandleAsync(object message, object correlationContext, BasicDeliverEventArgs args)
{
var messageName = message.GetType().Name.Underscore(); // Converts message name to underscored format
var messageId = args.BasicProperties.MessageId ?? string.Empty;
var spanContext = ExtractSpanContext(args); // Extracts span context from headers

using var scope = BuildScope(messageName, spanContext);
var span = scope.Span;
span.Log($"Started processing a message: '{messageName}' [id: '{messageId}'].");

try
{
await Next(message, correlationContext, args); // Call the next plugin in the pipeline
}
catch (Exception ex)
{
span.SetTag(Tags.Error, true); // Mark the span as errored
span.Log($"Exception: {ex.Message}"); // Log the exception message
}

span.Log($"Finished processing a message: '{messageName}' [id: '{messageId}'].");
}

private string ExtractSpanContext(BasicDeliverEventArgs args)
{
if (args.BasicProperties.Headers is { } &&
args.BasicProperties.Headers.TryGetValue(_spanContextHeader, out var spanContextHeader) &&
spanContextHeader is byte[] spanContextBytes)
{
return Encoding.UTF8.GetString(spanContextBytes);
}

return string.Empty; // No span context found
}

private IScope BuildScope(string messageName, string serializedSpanContext)
{
var spanBuilder = _tracer
.BuildSpan($"processing-{messageName}")
.WithTag("message-type", messageName);

if (string.IsNullOrEmpty(serializedSpanContext))
{
return spanBuilder.StartActive(true);
}

var spanContext = SpanContext.ContextFromString(serializedSpanContext);

return spanBuilder
.AddReference(References.FollowsFrom, spanContext)
.StartActive(true);
}
}
}

0 comments on commit 9cb43fa

Please sign in to comment.