Skip to content

Commit

Permalink
feat: ConsoleService.GetTimeline
Browse files Browse the repository at this point in the history
This introduces a 1:1 mapping from the DAL timeline query filters to an
equivalent proto, and adds a `GetTimeline()` that uses it.

`StreamTimeline()` hasn't been converted yet, but it should be
relatively straightforward.
  • Loading branch information
alecthomas committed Sep 11, 2023
1 parent 1ecc27f commit 1c5daa2
Show file tree
Hide file tree
Showing 23 changed files with 2,176 additions and 469 deletions.
24 changes: 15 additions & 9 deletions Bitfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ SCHEMA_IN = backend/schema/schema.go backend/schema/protobuf.go \
cmd/ftl/cmd_schema.go
SCHEMA_OUT = protos/xyz/block/ftl/v1/schema/schema.proto

PROTO_IN = **/*.proto **/buf.* protos/xyz/block/ftl/v1/schema/schema.proto
PROTO_IN = **/*.proto **/buf.*
PROTO_OUT = protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go \
protos/xyz/block/ftl/v1/schema/schema.pb.go \
protos/xyz/block/ftl/v1/console/console.pb.go \
Expand All @@ -28,7 +28,9 @@ KT_MVN_OUT = kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with
KT_RUNTIME_OUT = build/template/ftl/jars/ftl-runtime.jar

CLIENT_OUT = console/client/dist/index.html
CLIENT_IN = console/client/**/*
CLIENT_IN = console/client/src/**/*
NODE_MODULES_OUT = console/client/node_modules
NODE_MODULES_IN = console/client/package{,-lock}.json

#virtual release:
# inputs: %{RELEASE}/ftl %{RELEASE}/ftl-controller %{RELEASE}/ftl-runner
Expand All @@ -38,10 +40,10 @@ CLIENT_IN = console/client/**/*

# Build all binaries
implicit %{RELEASE}/%{1}: cmd/*
inputs: %{RELEASE} %{GO_SOURCES}
inputs: %{RELEASE} %{GO_SOURCES} %{CLIENT_OUT}
build: go build -o %{OUT} -tags release -ldflags "-X main.version=%{VERSION}" ./cmd/%{1}

#%{RELEASE}/ftl-controller: %{RELEASE} %{GO_SOURCES} #%{CLIENT_OUT}
#%{RELEASE}/ftl-controller: %{RELEASE} %{GO_SOURCES} %{CLIENT_OUT}
# build: go build -o %{OUT} -tags release -ldflags "-X main.version=%{VERSION}" ./cmd/ftl-controller

%{SCHEMA_OUT}: %{SCHEMA_IN}
Expand Down Expand Up @@ -70,8 +72,12 @@ implicit %{RELEASE}/%{1}: cmd/*
%{COMMON_LOG_OUT}: %{COMMON_LOG_IN}
build: go generate %{IN}

#%{CLIENT_OUT}: %{CLIENT_IN}
# cd console/client
# build:
# npm install
# npm run build
%{NODE_MODULES_OUT}: %{NODE_MODULES_IN}
cd console/client
build: npm --no-color --no-progress install
-clean # Don't clean node_modules

%{CLIENT_OUT}: %{CLIENT_IN} %{NODE_MODULES_OUT}
cd console/client
build: npm --no-color run build
clean: rm -rf dist .parcel-cache
7 changes: 7 additions & 0 deletions backend/common/log/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func (l Level) Severity() int {
return int(l)
}

// ParseLevel parses a log level from text.
func ParseLevel(input string) (Level, error) {
var level Level
err := level.UnmarshalText([]byte(input))
return level, err
}

type contextKey struct{}

// FromContext retrieves the current logger from the context or panics
Expand Down
202 changes: 138 additions & 64 deletions backend/controller/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/model"
"github.com/TBD54566975/ftl/backend/common/slices"
"github.com/TBD54566975/ftl/backend/controller/internal/dal"
Expand Down Expand Up @@ -102,13 +103,13 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
}

func (c *ConsoleService) GetCalls(ctx context.Context, req *connect.Request[pbconsole.GetCallsRequest]) (*connect.Response[pbconsole.GetCallsResponse], error) {
events, err := c.dal.QueryEvents(ctx, time.Time{}, time.Now(), dal.FilterCall(types.None[string](), req.Msg.Module, types.Some(req.Msg.Verb)))
events, err := c.dal.QueryEvents(ctx, dal.FilterCall(types.None[string](), req.Msg.Module, types.Some(req.Msg.Verb)))
if err != nil {
return nil, errors.WithStack(err)
}

return connect.NewResponse(&pbconsole.GetCallsResponse{
Calls: slices.Map(filterCallEvents(events), callEventToCall),
Calls: slices.Map(filterEvents[*dal.CallEvent](events), callEventToCall),
}), nil
}

Expand All @@ -118,16 +119,31 @@ func (c *ConsoleService) GetRequestCalls(ctx context.Context, req *connect.Reque
return nil, errors.WithStack(err)
}

events, err := c.dal.QueryEvents(ctx, time.Time{}, time.Now(), dal.FilterRequests(requestKey))
events, err := c.dal.QueryEvents(ctx, dal.FilterRequests(requestKey))
if err != nil {
return nil, errors.WithStack(err)
}

return connect.NewResponse(&pbconsole.GetRequestCallsResponse{
Calls: slices.Map(filterCallEvents(events), callEventToCall),
Calls: slices.Map(filterEvents[*dal.CallEvent](events), callEventToCall),
}), nil
}

func (c *ConsoleService) GetTimeline(ctx context.Context, req *connect.Request[pbconsole.TimelineQuery]) (*connect.Response[pbconsole.GetTimelineResponse], error) {
query, err := timelineQueryProtoToDAL(req.Msg)
if err != nil {
return nil, errors.WithStack(err)
}
results, err := c.dal.QueryEvents(ctx, query...)
if err != nil {
return nil, errors.WithStack(err)
}
response := &pbconsole.GetTimelineResponse{
Events: slices.Map(results, eventDALToProto),
}
return connect.NewResponse(response), nil
}

func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Request[pbconsole.StreamTimelineRequest], stream *connect.ServerStream[pbconsole.StreamTimelineResponse]) error {
// Default to 1 second interval if not specified.
updateInterval := 1 * time.Second
Expand All @@ -143,50 +159,26 @@ func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Reques
}
query = append(query, dal.FilterDeployments(deploymentName))
}
var lastEventTime time.Time
if req.Msg.AfterTime != nil {
lastEventTime = req.Msg.AfterTime.AsTime()
} else {
lastEventTime = time.Now()
}

lastEventTime := req.Msg.AfterTime.AsTime()
for {
thisRequestTime := time.Now()
events, err := c.dal.QueryEvents(ctx, lastEventTime, thisRequestTime, query...)
events, err := c.dal.QueryEvents(ctx, append(query, dal.FilterTimeRange(thisRequestTime, lastEventTime))...)
if err != nil {
return errors.WithStack(err)
}

timelineEvents := filterTimelineEvents(events)
for index, timelineEvent := range timelineEvents {
for index, timelineEvent := range events {
more := len(events) > index+1
var err error

switch event := timelineEvent.(type) {
case *dal.CallEvent:
err = stream.Send(&pbconsole.StreamTimelineResponse{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.StreamTimelineResponse_Call{
Call: callEventToCall(event),
},
More: more,
})
case *dal.LogEvent:
err = stream.Send(&pbconsole.StreamTimelineResponse{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.StreamTimelineResponse_Log{
Log: logEventToLogEntry(event),
},
More: more,
})
case *dal.DeploymentEvent:
err = stream.Send(&pbconsole.StreamTimelineResponse{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.StreamTimelineResponse_Deployment{
Deployment: deploymentEventToDeployment(event),
},
More: more,
})
}

err := stream.Send(&pbconsole.StreamTimelineResponse{
Event: eventDALToProto(timelineEvent),
More: more,
})
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -203,7 +195,7 @@ func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Reques
func (c *ConsoleService) StreamLogs(ctx context.Context, req *connect.Request[pbconsole.StreamLogsRequest], stream *connect.ServerStream[pbconsole.StreamLogsResponse]) error {
// Default to 1 second interval if not specified.
updateInterval := 1 * time.Second
if req.Msg.UpdateInterval != nil && req.Msg.UpdateInterval.AsDuration() > time.Second { // Minimum 1s interval.
if interval := req.Msg.UpdateInterval; interval != nil && interval.AsDuration() > time.Second { // Minimum 1s interval.
updateInterval = req.Msg.UpdateInterval.AsDuration()
}

Expand All @@ -215,16 +207,15 @@ func (c *ConsoleService) StreamLogs(ctx context.Context, req *connect.Request[pb
}
query = append(query, dal.FilterDeployments(deploymentName))
}

lastLogTime := req.Msg.AfterTime.AsTime()
for {
thisRequestTime := time.Now()
events, err := c.dal.QueryEvents(ctx, lastLogTime, thisRequestTime, query...)
events, err := c.dal.QueryEvents(ctx, append(query, dal.FilterTimeRange(thisRequestTime, lastLogTime))...)
if err != nil {
return errors.WithStack(err)
}

logEvents := filterLogEvents(events)
logEvents := filterEvents[*dal.LogEvent](events)
for index, log := range logEvents {
var requestKey *string
if r, ok := log.RequestKey.Get(); ok {
Expand Down Expand Up @@ -326,38 +317,121 @@ func deploymentEventToDeployment(event *dal.DeploymentEvent) *pbconsole.Deployme
}
}

func filterCallEvents(events []dal.Event) []*dal.CallEvent {
var filtered []*dal.CallEvent
func filterEvents[E dal.Event](events []dal.Event) []E {
var filtered []E
for _, event := range events {
if call, ok := event.(*dal.CallEvent); ok {
filtered = append(filtered, call)
if e, ok := event.(E); ok {
filtered = append(filtered, e)
}
}
return filtered
}

func filterLogEvents(events []dal.Event) []*dal.LogEvent {
var filtered []*dal.LogEvent
for _, event := range events {
if log, ok := event.(*dal.LogEvent); ok {
filtered = append(filtered, log)
func timelineQueryProtoToDAL(pb *pbconsole.TimelineQuery) ([]dal.EventFilter, error) {
var query []dal.EventFilter
for _, filter := range pb.Filters {
switch filter := filter.Filter.(type) {
case *pbconsole.TimelineQuery_Filter_Deployments:
deploymentNames := make([]model.DeploymentName, 0, len(filter.Deployments.Deployments))
for _, deployment := range filter.Deployments.Deployments {
deploymentName, err := model.ParseDeploymentName(deployment)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.WithStack(err))
}
deploymentNames = append(deploymentNames, deploymentName)
}
query = append(query, dal.FilterDeployments(deploymentNames...))

case *pbconsole.TimelineQuery_Filter_Requests:
requestKeys := make([]model.IngressRequestKey, 0, len(filter.Requests.Requests))
for _, request := range filter.Requests.Requests {
requestKey, err := model.ParseIngressRequestKey(request)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.WithStack(err))
}
requestKeys = append(requestKeys, requestKey)
}
query = append(query, dal.FilterRequests(requestKeys...))

case *pbconsole.TimelineQuery_Filter_EventTypes:
eventTypes := make([]dal.EventType, 0, len(filter.EventTypes.EventTypes))
for _, eventType := range filter.EventTypes.EventTypes {
switch eventType {
case pbconsole.EventType_EVENT_TYPE_CALL:
eventTypes = append(eventTypes, dal.EventTypeCall)
case pbconsole.EventType_EVENT_TYPE_LOG:
eventTypes = append(eventTypes, dal.EventTypeLog)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT:
eventTypes = append(eventTypes, dal.EventTypeDeployment)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, errors.Errorf("unknown event type %v", eventType))
}
}

case *pbconsole.TimelineQuery_Filter_LogLevel:
level := log.Level(filter.LogLevel.LogLevel)
if level < log.Trace || level > log.Error {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.Errorf("unknown log level %v", filter.LogLevel.LogLevel))
}
query = append(query, dal.FilterLogLevel(level))

case *pbconsole.TimelineQuery_Filter_Time:
var newerThan, olderThan time.Time
if filter.Time.NewerThan != nil {
newerThan = filter.Time.NewerThan.AsTime()
}
if filter.Time.OlderThan != nil {
olderThan = filter.Time.OlderThan.AsTime()
}
query = append(query, dal.FilterTimeRange(newerThan, olderThan))

case *pbconsole.TimelineQuery_Filter_Id:
var lowerThan, higherThan int64
if filter.Id.LowerThan != nil {
lowerThan = *filter.Id.LowerThan
}
if filter.Id.HigherThan != nil {
higherThan = *filter.Id.HigherThan
}
query = append(query, dal.FilterIDRange(lowerThan, higherThan))

default:
return nil, connect.NewError(connect.CodeInvalidArgument, errors.Errorf("unknown filter %T", filter))
}
}
return filtered
return query, nil
}

func filterTimelineEvents(events []dal.Event) []dal.Event {
var filtered []dal.Event
for _, event := range events {
if _, ok := event.(*dal.LogEvent); ok {
filtered = append(filtered, event)
func eventDALToProto(event dal.Event) *pbconsole.TimelineEvent {
switch event := event.(type) {
case *dal.CallEvent:
return &pbconsole.TimelineEvent{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.TimelineEvent_Call{
Call: callEventToCall(event),
},
}
if _, ok := event.(*dal.CallEvent); ok {
filtered = append(filtered, event)

case *dal.LogEvent:
return &pbconsole.TimelineEvent{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.TimelineEvent_Log{
Log: logEventToLogEntry(event),
},
}
if _, ok := event.(*dal.DeploymentEvent); ok {
filtered = append(filtered, event)

case *dal.DeploymentEvent:
return &pbconsole.TimelineEvent{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.TimelineEvent_Deployment{
Deployment: deploymentEventToDeployment(event),
},
}

default:
panic(errors.Errorf("unknown event type %T", event))
}
return filtered
}
10 changes: 5 additions & 5 deletions backend/controller/internal/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestDAL(t *testing.T) {
})

t.Run("ExpireRunnerClaims", func(t *testing.T) {
time.Sleep(time.Millisecond * 200)
time.Sleep(time.Millisecond * 500)
count, err := dal.ExpireRunnerClaims(ctx)
assert.NoError(t, err)
assert.Equal(t, 1, count)
Expand Down Expand Up @@ -255,25 +255,25 @@ func TestDAL(t *testing.T) {

t.Run("QueryEvents", func(t *testing.T) {
t.Run("NoFilters", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, time.Time{}, time.Now())
events, err := dal.QueryEvents(ctx)
assert.NoError(t, err)
assertEventsEqual(t, []Event{expectedDeploymentEvent, callEvent, logEvent}, events)
})

t.Run("ByDeployment", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, time.Time{}, time.Now(), FilterDeployments(deploymentName))
events, err := dal.QueryEvents(ctx, FilterDeployments(deploymentName))
assert.NoError(t, err)
assertEventsEqual(t, []Event{expectedDeploymentEvent, callEvent, logEvent}, events)
})

t.Run("ByCall", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, time.Time{}, time.Now(), FilterTypes(EventTypeCall), FilterCall(types.None[string](), "time", types.None[string]()))
events, err := dal.QueryEvents(ctx, FilterTypes(EventTypeCall), FilterCall(types.None[string](), "time", types.None[string]()))
assert.NoError(t, err)
assertEventsEqual(t, []Event{callEvent}, events)
})

t.Run("ByLogLevel", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, time.Time{}, time.Now(), FilterTypes(EventTypeLog), FilterLogs(log.Trace))
events, err := dal.QueryEvents(ctx, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace))
assert.NoError(t, err)
assertEventsEqual(t, []Event{logEvent}, events)
})
Expand Down
Loading

0 comments on commit 1c5daa2

Please sign in to comment.