Skip to content

Commit

Permalink
fix: pubsub telemetry fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jan 17, 2025
1 parent d9a1799 commit 56c0c9e
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 18 deletions.
22 changes: 15 additions & 7 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,15 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}

logger.Debugf("Consuming message from %v[%v:%v]", c.verb.Name, msg.Partition, msg.Offset)
publishedAt := parseHeaders(logger, msg.Headers)
publishedAt, publisherRequestKey := parseHeaders(logger, msg.Headers)
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
startTime := time.Now()
callCtx, callCancel := context.WithCancel(ctx)
callChan := make(chan error)
go func() {
callChan <- c.call(callCtx, msg.Value, int(msg.Partition), int(msg.Offset))
callChan <- c.call(callCtx, msg.Value, int(msg.Partition), int(msg.Offset), publisherRequestKey)
}()
var err error
select {
Expand Down Expand Up @@ -347,25 +347,33 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}

// parseHeaders extracts FTL header values for a kafka message
func parseHeaders(logger *log.Logger, headers []*sarama.RecordHeader) (publishedAt optional.Option[time.Time]) {
func parseHeaders(logger *log.Logger, headers []*sarama.RecordHeader) (publishedAt optional.Option[time.Time], requestKey optional.Option[key.Request]) {
for _, h := range headers {
key := string(h.Key)
if key == createdAtHeader {
switch string(h.Key) {
case createdAtHeader:
t, err := time.Parse(time.RFC3339Nano, string(h.Value))
if err != nil {
logger.Warnf("failed to parse %s header: %v", createdAtHeader, err)
} else {
publishedAt = optional.Some(t)
}

case requestKeyHeader:
k, err := key.ParseRequestKey(string(h.Value))
if err != nil {
logger.Warnf("failed to parse %s header: %v", requestKeyHeader, err)
} else {
requestKey = optional.Some(k)
}
}
}
return
}

func (c *consumer) call(ctx context.Context, body []byte, partition, offset int) error {
func (c *consumer) call(ctx context.Context, body []byte, partition, offset int, publisherRequestKey optional.Option[key.Request]) error {
start := time.Now()

requestKey := key.NewRequestKey(key.OriginPubsub, schema.RefKey{Module: c.moduleName, Name: c.verb.Name}.String())
requestKey := publisherRequestKey.Default(key.NewRequestKey(key.OriginPubsub, schema.RefKey{Module: c.moduleName, Name: c.verb.Name}.String()))
destRef := &schema.Ref{
Module: c.moduleName,
Name: c.verb.Name,
Expand Down
32 changes: 22 additions & 10 deletions backend/runner/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
)

const (
createdAtHeader = "ftl.created_at"
createdAtHeader = "ftl.created_at"
requestKeyHeader = "ftl.request_key"
)

type publisher struct {
Expand Down Expand Up @@ -78,15 +79,10 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller
}

partition, offset, err := p.producer.SendMessage(&sarama.ProducerMessage{
Topic: p.topic.Runtime.TopicID,
Value: sarama.ByteEncoder(data),
Key: sarama.StringEncoder(key),
Headers: []sarama.RecordHeader{
{
Key: []byte(createdAtHeader),
Value: []byte(createdAt.Format(time.RFC3339Nano)),
},
},
Topic: p.topic.Runtime.TopicID,
Value: sarama.ByteEncoder(data),
Key: sarama.StringEncoder(key),
Headers: newHeaders(createdAt, requestKey),
})
observability.PubSub.Published(ctx, p.module, p.topic.Name, caller.Name, err)
if err != nil {
Expand All @@ -100,3 +96,19 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller
logger.Debugf("Published to %v[%v:%v]", p.topic.Name, partition, offset)
return nil
}

func newHeaders(createdAt time.Time, requestKey optional.Option[key.Request]) []sarama.RecordHeader {
headers := []sarama.RecordHeader{
{
Key: []byte(createdAtHeader),
Value: []byte(createdAt.Format(time.RFC3339Nano)),
},
}
if requestKey, ok := requestKey.Get(); ok {
headers = append(headers, sarama.RecordHeader{
Key: []byte(requestKeyHeader),
Value: []byte(requestKey.String()),
})
}
return headers
}
5 changes: 4 additions & 1 deletion frontend/console/src/features/traces/TraceDetailItem.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { TraceEvent } from '../../api/timeline/use-request-trace-events'
import { AsyncExecuteEvent, CallEvent, type Event, IngressEvent, PubSubPublishEvent } from '../../protos/xyz/block/ftl/timeline/v1/event_pb'
import { AsyncExecuteEvent, CallEvent, type Event, IngressEvent, PubSubConsumeEvent, PubSubPublishEvent } from '../../protos/xyz/block/ftl/timeline/v1/event_pb'
import { classNames } from '../../utils'
import { TimelineIcon } from '../timeline/TimelineIcon'
import { eventBackgroundColor } from '../timeline/timeline.utils'
Expand Down Expand Up @@ -45,6 +45,9 @@ export const TraceDetailItem: React.FC<TraceDetailItemProps> = ({
} else if (traceEvent instanceof PubSubPublishEvent) {
action = 'Publish'
eventName = `${traceEvent.topic}`
} else if (traceEvent instanceof PubSubConsumeEvent) {
action = 'Consume'
eventName = `${traceEvent.destVerbModule}.${traceEvent.destVerbName}`
}

const barColor = event.id === selectedEventId ? 'bg-green-500' : eventBackgroundColor(event)
Expand Down
4 changes: 4 additions & 0 deletions frontend/console/src/features/traces/TracesPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { TraceDetails } from './TraceDetails'
import { TraceDetailsAsyncCall } from './details/TraceDetailsAsyncCall'
import { TraceDetailsCall } from './details/TraceDetailsCall'
import { TraceDetailsIngress } from './details/TraceDetailsIngress'
import { TraceDetailsPubsubConsume } from './details/TraceDetailsPubsubConsume'
import { TraceDetailsPubsubPublish } from './details/TraceDetailsPubsubPublish'

export const TracesPage = () => {
Expand Down Expand Up @@ -60,6 +61,9 @@ export const TracesPage = () => {
case 'pubsubPublish':
eventDetailsComponent = <TraceDetailsPubsubPublish event={selectedEvent} />
break
case 'pubsubConsume':
eventDetailsComponent = <TraceDetailsPubsubConsume event={selectedEvent} />
break
default:
eventDetailsComponent = <p>No details available for this event type.</p>
break
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { AttributeBadge } from '../../../components/AttributeBadge'
import { CodeBlock } from '../../../components/CodeBlock'
import type { Event, PubSubConsumeEvent } from '../../../protos/xyz/block/ftl/timeline/v1/event_pb'
import { formatDuration } from '../../../utils/date.utils'
import { DeploymentCard } from '../../deployments/DeploymentCard'

export const TraceDetailsPubsubConsume = ({ event }: { event: Event }) => {
const pubsubConsume = event.entry.value as PubSubConsumeEvent
return (
<>
<span className='text-xl font-semibold'>PubSub Publish Details</span>

{pubsubConsume.error && (
<>
<h3 className='pt-4'>Error</h3>
<CodeBlock code={pubsubConsume.error} language='text' />
</>
)}

<DeploymentCard className='mt-4' deploymentKey={pubsubConsume.deploymentKey} />

<ul className='pt-4 space-y-2'>
<li>
<AttributeBadge name='topic' value={pubsubConsume.topic} />
</li>
<li>
<AttributeBadge name='subscription' value={`${pubsubConsume.destVerbModule}.${pubsubConsume.destVerbName}`} />
</li>
<li>
<AttributeBadge name='duration' value={formatDuration(pubsubConsume.duration)} />
</li>
{pubsubConsume.requestKey && (
<li>
<AttributeBadge name='request' value={pubsubConsume.requestKey} />
</li>
)}
</ul>
</>
)
}

0 comments on commit 56c0c9e

Please sign in to comment.