Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correctly record event times #3254

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix: correctly record event times
Fixes #3240
  • Loading branch information
alecthomas committed Oct 30, 2024
commit 2196c90d3d96bf745ea44ab8c2535b41a37c7361
1 change: 1 addition & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ lint-frontend: build-frontend
lint-backend:
@golangci-lint run --new-from-rev=$(git merge-base origin/main HEAD) ./...
@lint-commit-or-rollback ./backend/...
@go-check-sumtype ./...

lint-scripts:
#!/bin/bash
Expand Down
8 changes: 3 additions & 5 deletions backend/controller/timeline/events_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ type Call struct {
Response either.Either[*ftlv1.CallResponse, error]
}

func (c *Call) inEvent() {}

func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, call *Call) error {
callEvent := callToCallEvent(call)
func (c *Call) toEvent() (Event, error) { return callToCallEvent(c), nil } //nolint:unparam

func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, callEvent *CallEvent) error {
var sourceModule, sourceVerb optional.Option[string]
if sr, ok := callEvent.SourceVerb.Get(); ok {
sourceModule, sourceVerb = optional.Some(sr.Module), optional.Some(sr.Name)
Expand Down Expand Up @@ -95,7 +93,7 @@ func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, call
}

err = libdal.TranslatePGError(querier.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{
DeploymentKey: call.DeploymentKey,
DeploymentKey: callEvent.DeploymentKey,
RequestKey: requestKey,
ParentRequestKey: parentRequestKey,
TimeStamp: callEvent.Time,
Expand Down
11 changes: 8 additions & 3 deletions backend/controller/timeline/events_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ type CronScheduled struct {
Error optional.Option[string]
}

func (*CronScheduled) inEvent() {}
func (e *CronScheduled) toEvent() (Event, error) { //nolint:unparam
return &CronScheduledEvent{
CronScheduled: *e,
Duration: time.Since(e.Time),
}, nil
}

type eventCronScheduledJSON struct {
DurationMS int64 `json:"duration_ms"`
Expand All @@ -43,9 +48,9 @@ type eventCronScheduledJSON struct {
Error optional.Option[string] `json:"error,omitempty"`
}

func (s *Service) insertCronScheduledEvent(ctx context.Context, querier sql.Querier, event *CronScheduled) error {
func (s *Service) insertCronScheduledEvent(ctx context.Context, querier sql.Querier, event *CronScheduledEvent) error {
cronJSON := eventCronScheduledJSON{
DurationMS: time.Since(event.Time).Milliseconds(),
DurationMS: event.Duration.Milliseconds(),
ScheduledAt: event.ScheduledAt,
Schedule: event.Schedule,
Error: event.Error,
Expand Down
47 changes: 29 additions & 18 deletions backend/controller/timeline/events_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ type Ingress struct {
Error optional.Option[string]
}

func (*Ingress) inEvent() {}

func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, ingress *Ingress) error {
func (ingress *Ingress) toEvent() (Event, error) {
requestBody := ingress.RequestBody
if len(requestBody) == 0 {
requestBody = []byte("{}")
Expand All @@ -78,29 +76,42 @@ func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, in
responseBody = []byte("{}")
}

if len(responseBody) == 0 {
responseBody = []byte("{}")
}

reqHeaderBytes, err := json.Marshal(ingress.RequestHeaders)
if err != nil {
return fmt.Errorf("failed to marshal request header: %w", err)
return nil, fmt.Errorf("failed to marshal request header: %w", err)
}

respHeaderBytes, err := json.Marshal(ingress.ResponseHeaders)
if err != nil {
return fmt.Errorf("failed to marshal response header: %w", err)
return nil, fmt.Errorf("failed to marshal response header: %w", err)
}

ingressJSON := eventIngressJSON{
DurationMS: time.Since(ingress.StartTime).Milliseconds(),
return &IngressEvent{
DeploymentKey: ingress.DeploymentKey,
RequestKey: optional.Some(ingress.RequestKey),
Verb: *ingress.Verb,
Method: ingress.RequestMethod,
Path: ingress.RequestPath,
StatusCode: ingress.ResponseStatus,
Request: json.RawMessage(requestBody),
RequestHeader: json.RawMessage(reqHeaderBytes),
Response: json.RawMessage(responseBody),
ResponseHeader: json.RawMessage(respHeaderBytes),
Time: ingress.StartTime,
Duration: time.Since(ingress.StartTime),
Request: requestBody,
RequestHeader: reqHeaderBytes,
Response: responseBody,
ResponseHeader: respHeaderBytes,
Error: ingress.Error,
}, nil
}

func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, ingress *IngressEvent) error {
ingressJSON := eventIngressJSON{
DurationMS: ingress.Duration.Milliseconds(),
Method: ingress.Method,
Path: ingress.Path,
StatusCode: ingress.StatusCode,
Request: ingress.Request,
RequestHeader: ingress.RequestHeader,
Response: ingress.Response,
ResponseHeader: ingress.ResponseHeader,
Error: ingress.Error,
}

Expand All @@ -115,12 +126,12 @@ func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, in
return fmt.Errorf("failed to encrypt ingress payload: %w", err)
}

log.FromContext(ctx).Warnf("Inserting ingress event for %s %s", ingress.RequestKey, ingress.RequestPath)
log.FromContext(ctx).Warnf("Inserting ingress event for %s %s", ingress.RequestKey, ingress.Path)

err = libdal.TranslatePGError(querier.InsertTimelineIngressEvent(ctx, sql.InsertTimelineIngressEventParams{
DeploymentKey: ingress.DeploymentKey,
RequestKey: optional.Some(ingress.RequestKey.String()),
TimeStamp: ingress.StartTime,
TimeStamp: ingress.Time,
Module: ingress.Verb.Module,
Verb: ingress.Verb.Name,
IngressType: "http",
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/timeline/events_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Log struct {
Error optional.Option[string]
}

func (l *Log) inEvent() {}
func (l *Log) toEvent() (Event, error) { return &LogEvent{Log: *l}, nil } //nolint:unparam

type LogEvent struct {
ID int64
Expand All @@ -40,7 +40,7 @@ type eventLogJSON struct {
Error optional.Option[string] `json:"error,omitempty"`
}

func (s *Service) insertLogEvent(ctx context.Context, querier sql.Querier, log *Log) error {
func (s *Service) insertLogEvent(ctx context.Context, querier sql.Querier, log *LogEvent) error {
var requestKey optional.Option[string]
if name, ok := log.RequestKey.Get(); ok {
requestKey = optional.Some(name.String())
Expand Down
27 changes: 17 additions & 10 deletions backend/controller/timeline/timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,21 @@ type Event interface {

// InEvent is a marker interface for events that are inserted into the timeline.
type InEvent interface {
inEvent()
toEvent() (Event, error)
}

type Service struct {
ctx context.Context
conn *stdsql.DB
encryption *encryption.Service
events chan InEvent
events chan Event
lastDroppedError atomic.Value[time.Time]
lastFailedError atomic.Value[time.Time]
}

func New(ctx context.Context, conn *stdsql.DB, encryption *encryption.Service) *Service {
var s *Service
events := make(chan InEvent, 1000)
events := make(chan Event, 1000)
s = &Service{
ctx: ctx,
conn: conn,
Expand All @@ -72,7 +72,12 @@ func (s *Service) DeleteOldEvents(ctx context.Context, eventType EventType, age
}

// EnqueueEvent asynchronously enqueues an event for insertion into the timeline.
func (s *Service) EnqueueEvent(ctx context.Context, event InEvent) {
func (s *Service) EnqueueEvent(ctx context.Context, inEvent InEvent) {
event, err := inEvent.toEvent()
if err != nil {
log.FromContext(ctx).Warnf("Failed to convert event to event: %v", err)
return
}
select {
case s.events <- event:
default:
Expand All @@ -85,7 +90,7 @@ func (s *Service) EnqueueEvent(ctx context.Context, event InEvent) {

func (s *Service) processEvents() {
lastFlush := time.Now()
buffer := make([]InEvent, 0, maxBatchSize)
buffer := make([]Event, 0, maxBatchSize)
for {
select {
case event := <-s.events:
Expand All @@ -108,7 +113,7 @@ func (s *Service) processEvents() {
}

// Flush all events in the buffer to the database in a single transaction.
func (s *Service) flushEvents(events []InEvent) {
func (s *Service) flushEvents(events []Event) {
logger := log.FromContext(s.ctx).Scope("timeline")
tx, err := s.conn.Begin()
if err != nil {
Expand All @@ -121,14 +126,16 @@ func (s *Service) flushEvents(events []InEvent) {
for _, event := range events {
var err error
switch e := event.(type) {
case *Call:
case *CallEvent:
err = s.insertCallEvent(s.ctx, querier, e)
case *Log:
case *LogEvent:
err = s.insertLogEvent(s.ctx, querier, e)
case *Ingress:
case *IngressEvent:
err = s.insertHTTPIngress(s.ctx, querier, e)
case *CronScheduled:
case *CronScheduledEvent:
err = s.insertCronScheduledEvent(s.ctx, querier, e)
case *DeploymentCreatedEvent, *DeploymentUpdatedEvent:
// TODO: Implement
default:
panic(fmt.Sprintf("unexpected event type: %T", e))
}
Expand Down
45 changes: 45 additions & 0 deletions examples/go/time/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,57 @@ replace github.com/TBD54566975/ftl => ../../..
require github.com/TBD54566975/ftl v0.0.0-00010101000000-000000000000

require (
al.essio.dev/pkg/shellescape v1.5.1 // indirect
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
github.com/XSAM/otelsql v0.35.0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/kong v1.2.1 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/types v0.16.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/danieljoos/wincred v1.2.2 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/hashicorp/cronexpr v1.1.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.7.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
github.com/swaggest/jsonschema-go v0.3.72 // indirect
github.com/swaggest/refl v1.3.0 // indirect
github.com/zalando/go-keyring v0.2.6 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
)
Loading