diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index a3dfb09ecf..26542fc2a7 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -284,7 +284,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram } logger.Debugf("Consuming message from %v[%v:%v]", c.verb.Name, msg.Partition, msg.Offset) - publishedAt := parseHeaders(logger, msg.Headers) + publishedAt, publisherRequestKey := parseHeaders(logger, msg.Headers) remainingRetries := c.retryParams.Count backoff := c.retryParams.MinBackoff for { @@ -292,7 +292,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram callCtx, callCancel := context.WithCancel(ctx) callChan := make(chan error) go func() { - callChan <- c.call(callCtx, msg.Value, int(msg.Partition), int(msg.Offset)) + callChan <- c.call(callCtx, msg.Value, int(msg.Partition), int(msg.Offset), publisherRequestKey) }() var err error select { @@ -347,25 +347,33 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram } // parseHeaders extracts FTL header values for a kafka message -func parseHeaders(logger *log.Logger, headers []*sarama.RecordHeader) (publishedAt optional.Option[time.Time]) { +func parseHeaders(logger *log.Logger, headers []*sarama.RecordHeader) (publishedAt optional.Option[time.Time], requestKey optional.Option[key.Request]) { for _, h := range headers { - key := string(h.Key) - if key == createdAtHeader { + switch string(h.Key) { + case createdAtHeader: t, err := time.Parse(time.RFC3339Nano, string(h.Value)) if err != nil { logger.Warnf("failed to parse %s header: %v", createdAtHeader, err) } else { publishedAt = optional.Some(t) } + + case requestKeyHeader: + k, err := key.ParseRequestKey(string(h.Value)) + if err != nil { + logger.Warnf("failed to parse %s header: %v", requestKeyHeader, err) + } else { + requestKey = optional.Some(k) + } } } return } -func (c *consumer) call(ctx context.Context, body []byte, partition, offset int) error { +func (c *consumer) call(ctx context.Context, body []byte, partition, offset int, publisherRequestKey optional.Option[key.Request]) error { start := time.Now() - requestKey := key.NewRequestKey(key.OriginPubsub, schema.RefKey{Module: c.moduleName, Name: c.verb.Name}.String()) + requestKey := publisherRequestKey.Default(key.NewRequestKey(key.OriginPubsub, schema.RefKey{Module: c.moduleName, Name: c.verb.Name}.String())) destRef := &schema.Ref{ Module: c.moduleName, Name: c.verb.Name, diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index 14cad7db78..ab8fc7bf40 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -17,7 +17,8 @@ import ( ) const ( - createdAtHeader = "ftl.created_at" + createdAtHeader = "ftl.created_at" + requestKeyHeader = "ftl.request_key" ) type publisher struct { @@ -78,15 +79,10 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller } partition, offset, err := p.producer.SendMessage(&sarama.ProducerMessage{ - Topic: p.topic.Runtime.TopicID, - Value: sarama.ByteEncoder(data), - Key: sarama.StringEncoder(key), - Headers: []sarama.RecordHeader{ - { - Key: []byte(createdAtHeader), - Value: []byte(createdAt.Format(time.RFC3339Nano)), - }, - }, + Topic: p.topic.Runtime.TopicID, + Value: sarama.ByteEncoder(data), + Key: sarama.StringEncoder(key), + Headers: newHeaders(createdAt, requestKey), }) observability.PubSub.Published(ctx, p.module, p.topic.Name, caller.Name, err) if err != nil { @@ -100,3 +96,19 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller logger.Debugf("Published to %v[%v:%v]", p.topic.Name, partition, offset) return nil } + +func newHeaders(createdAt time.Time, requestKey optional.Option[key.Request]) []sarama.RecordHeader { + headers := []sarama.RecordHeader{ + { + Key: []byte(createdAtHeader), + Value: []byte(createdAt.Format(time.RFC3339Nano)), + }, + } + if requestKey, ok := requestKey.Get(); ok { + headers = append(headers, sarama.RecordHeader{ + Key: []byte(requestKeyHeader), + Value: []byte(requestKey.String()), + }) + } + return headers +} diff --git a/frontend/console/src/features/traces/TraceDetailItem.tsx b/frontend/console/src/features/traces/TraceDetailItem.tsx index afd5e9f2ef..7cab38c44c 100644 --- a/frontend/console/src/features/traces/TraceDetailItem.tsx +++ b/frontend/console/src/features/traces/TraceDetailItem.tsx @@ -1,5 +1,5 @@ import type { TraceEvent } from '../../api/timeline/use-request-trace-events' -import { AsyncExecuteEvent, CallEvent, type Event, IngressEvent, PubSubPublishEvent } from '../../protos/xyz/block/ftl/timeline/v1/event_pb' +import { AsyncExecuteEvent, CallEvent, type Event, IngressEvent, PubSubConsumeEvent, PubSubPublishEvent } from '../../protos/xyz/block/ftl/timeline/v1/event_pb' import { classNames } from '../../utils' import { TimelineIcon } from '../timeline/TimelineIcon' import { eventBackgroundColor } from '../timeline/timeline.utils' @@ -45,6 +45,9 @@ export const TraceDetailItem: React.FC = ({ } else if (traceEvent instanceof PubSubPublishEvent) { action = 'Publish' eventName = `${traceEvent.topic}` + } else if (traceEvent instanceof PubSubConsumeEvent) { + action = 'Consume' + eventName = `${traceEvent.destVerbModule}.${traceEvent.destVerbName}` } const barColor = event.id === selectedEventId ? 'bg-green-500' : eventBackgroundColor(event) diff --git a/frontend/console/src/features/traces/TracesPage.tsx b/frontend/console/src/features/traces/TracesPage.tsx index 94a6be6e05..94a4ed1efa 100644 --- a/frontend/console/src/features/traces/TracesPage.tsx +++ b/frontend/console/src/features/traces/TracesPage.tsx @@ -8,6 +8,7 @@ import { TraceDetails } from './TraceDetails' import { TraceDetailsAsyncCall } from './details/TraceDetailsAsyncCall' import { TraceDetailsCall } from './details/TraceDetailsCall' import { TraceDetailsIngress } from './details/TraceDetailsIngress' +import { TraceDetailsPubsubConsume } from './details/TraceDetailsPubsubConsume' import { TraceDetailsPubsubPublish } from './details/TraceDetailsPubsubPublish' export const TracesPage = () => { @@ -60,6 +61,9 @@ export const TracesPage = () => { case 'pubsubPublish': eventDetailsComponent = break + case 'pubsubConsume': + eventDetailsComponent = + break default: eventDetailsComponent =

No details available for this event type.

break diff --git a/frontend/console/src/features/traces/details/TraceDetailsPubsubConsume.tsx b/frontend/console/src/features/traces/details/TraceDetailsPubsubConsume.tsx new file mode 100644 index 0000000000..736e62ba0a --- /dev/null +++ b/frontend/console/src/features/traces/details/TraceDetailsPubsubConsume.tsx @@ -0,0 +1,40 @@ +import { AttributeBadge } from '../../../components/AttributeBadge' +import { CodeBlock } from '../../../components/CodeBlock' +import type { Event, PubSubConsumeEvent } from '../../../protos/xyz/block/ftl/timeline/v1/event_pb' +import { formatDuration } from '../../../utils/date.utils' +import { DeploymentCard } from '../../deployments/DeploymentCard' + +export const TraceDetailsPubsubConsume = ({ event }: { event: Event }) => { + const pubsubConsume = event.entry.value as PubSubConsumeEvent + return ( + <> + PubSub Publish Details + + {pubsubConsume.error && ( + <> +

Error

+ + + )} + + + +
    +
  • + +
  • +
  • + +
  • +
  • + +
  • + {pubsubConsume.requestKey && ( +
  • + +
  • + )} +
+ + ) +} \ No newline at end of file