From dcab2565a527c4002ce1cb39099f10b636ee45d6 Mon Sep 17 00:00:00 2001 From: John Gerassimou Date: Wed, 27 Dec 2023 17:18:34 -0500 Subject: [PATCH] kotel/tracer: add tombstone attribute --- plugin/kotel/tracer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/plugin/kotel/tracer.go b/plugin/kotel/tracer.go index e4905a62..0e3f6cf5 100644 --- a/plugin/kotel/tracer.go +++ b/plugin/kotel/tracer.go @@ -137,6 +137,9 @@ func (t *Tracer) WithProcessSpan(r *kgo.Record) (context.Context, trace.Span) { if t.consumerGroup != "" { attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(t.consumerGroup)) } + if r.Key != nil && r.Value == nil { + attrs = append(attrs, semconv.MessagingKafkaMessageTombstoneKey.Bool(true)) + } opts := []trace.SpanStartOption{ trace.WithAttributes(attrs...), trace.WithSpanKind(trace.SpanKindConsumer), @@ -169,6 +172,9 @@ func (t *Tracer) OnProduceRecordBuffered(r *kgo.Record) { if t.clientID != "" { attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(t.clientID)) } + if r.Key != nil && r.Value == nil { + attrs = append(attrs, semconv.MessagingKafkaMessageTombstoneKey.Bool(true)) + } opts := []trace.SpanStartOption{ trace.WithAttributes(attrs...), trace.WithSpanKind(trace.SpanKindProducer), @@ -221,6 +227,9 @@ func (t *Tracer) OnFetchRecordBuffered(r *kgo.Record) { if t.consumerGroup != "" { attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(t.consumerGroup)) } + if r.Key != nil && r.Value == nil { + attrs = append(attrs, semconv.MessagingKafkaMessageTombstoneKey.Bool(true)) + } opts := []trace.SpanStartOption{ trace.WithAttributes(attrs...), trace.WithSpanKind(trace.SpanKindConsumer),