Skip to content

Commit

Permalink
kotel/tracer: add tombstone attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
yianni committed Dec 27, 2023
1 parent dec75b3 commit dcab256
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions plugin/kotel/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (t *Tracer) WithProcessSpan(r *kgo.Record) (context.Context, trace.Span) {
if t.consumerGroup != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(t.consumerGroup))
}
if r.Key != nil && r.Value == nil {
attrs = append(attrs, semconv.MessagingKafkaMessageTombstoneKey.Bool(true))
}
opts := []trace.SpanStartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindConsumer),
Expand Down Expand Up @@ -169,6 +172,9 @@ func (t *Tracer) OnProduceRecordBuffered(r *kgo.Record) {
if t.clientID != "" {
attrs = append(attrs, semconv.MessagingKafkaClientIDKey.String(t.clientID))
}
if r.Key != nil && r.Value == nil {
attrs = append(attrs, semconv.MessagingKafkaMessageTombstoneKey.Bool(true))
}
opts := []trace.SpanStartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindProducer),
Expand Down Expand Up @@ -221,6 +227,9 @@ func (t *Tracer) OnFetchRecordBuffered(r *kgo.Record) {
if t.consumerGroup != "" {
attrs = append(attrs, semconv.MessagingKafkaConsumerGroupKey.String(t.consumerGroup))
}
if r.Key != nil && r.Value == nil {
attrs = append(attrs, semconv.MessagingKafkaMessageTombstoneKey.Bool(true))
}
opts := []trace.SpanStartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindConsumer),
Expand Down

0 comments on commit dcab256

Please sign in to comment.