Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ConsoleService.GetTimeline #374

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions Bitfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ SCHEMA_IN = backend/schema/schema.go backend/schema/protobuf.go \
cmd/ftl/cmd_schema.go
SCHEMA_OUT = protos/xyz/block/ftl/v1/schema/schema.proto

PROTO_IN = **/*.proto **/buf.* protos/xyz/block/ftl/v1/schema/schema.proto
PROTO_IN = **/*.proto **/buf.*
PROTO_OUT = protos/xyz/block/ftl/v1/ftlv1connect/ftl.connect.go \
protos/xyz/block/ftl/v1/schema/schema.pb.go \
protos/xyz/block/ftl/v1/console/console.pb.go \
Expand All @@ -28,7 +28,9 @@ KT_MVN_OUT = kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT-jar-with
KT_RUNTIME_OUT = build/template/ftl/jars/ftl-runtime.jar

CLIENT_OUT = console/client/dist/index.html
CLIENT_IN = console/client/**/*
CLIENT_IN = console/client/src/**/*
NODE_MODULES_OUT = console/client/node_modules
NODE_MODULES_IN = console/client/package{,-lock}.json

#virtual release:
# inputs: %{RELEASE}/ftl %{RELEASE}/ftl-controller %{RELEASE}/ftl-runner
Expand All @@ -38,10 +40,10 @@ CLIENT_IN = console/client/**/*

# Build all binaries
implicit %{RELEASE}/%{1}: cmd/*
inputs: %{RELEASE} %{GO_SOURCES}
inputs: %{RELEASE} %{GO_SOURCES} %{CLIENT_OUT}
build: go build -o %{OUT} -tags release -ldflags "-X main.version=%{VERSION}" ./cmd/%{1}

#%{RELEASE}/ftl-controller: %{RELEASE} %{GO_SOURCES} #%{CLIENT_OUT}
#%{RELEASE}/ftl-controller: %{RELEASE} %{GO_SOURCES} %{CLIENT_OUT}
# build: go build -o %{OUT} -tags release -ldflags "-X main.version=%{VERSION}" ./cmd/ftl-controller

%{SCHEMA_OUT}: %{SCHEMA_IN}
Expand Down Expand Up @@ -70,8 +72,12 @@ implicit %{RELEASE}/%{1}: cmd/*
%{COMMON_LOG_OUT}: %{COMMON_LOG_IN}
build: go generate %{IN}

#%{CLIENT_OUT}: %{CLIENT_IN}
# cd console/client
# build:
# npm install
# npm run build
%{NODE_MODULES_OUT}: %{NODE_MODULES_IN}
cd console/client
build: npm --no-color --no-progress install
-clean # Don't clean node_modules

%{CLIENT_OUT}: %{CLIENT_IN} %{NODE_MODULES_OUT}
cd console/client
build: npm --no-color run build
clean: rm -rf dist .parcel-cache
7 changes: 7 additions & 0 deletions backend/common/log/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func (l Level) Severity() int {
return int(l)
}

// ParseLevel parses a log level from text.
func ParseLevel(input string) (Level, error) {
var level Level
err := level.UnmarshalText([]byte(input))
return level, err
}

type contextKey struct{}

// FromContext retrieves the current logger from the context or panics
Expand Down
204 changes: 140 additions & 64 deletions backend/controller/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/model"
"github.com/TBD54566975/ftl/backend/common/slices"
"github.com/TBD54566975/ftl/backend/controller/internal/dal"
Expand Down Expand Up @@ -102,13 +103,13 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
}

func (c *ConsoleService) GetCalls(ctx context.Context, req *connect.Request[pbconsole.GetCallsRequest]) (*connect.Response[pbconsole.GetCallsResponse], error) {
events, err := c.dal.QueryEvents(ctx, time.Time{}, time.Now(), dal.FilterCall(types.None[string](), req.Msg.Module, types.Some(req.Msg.Verb)))
events, err := c.dal.QueryEvents(ctx, dal.FilterCall(types.None[string](), req.Msg.Module, types.Some(req.Msg.Verb)))
if err != nil {
return nil, errors.WithStack(err)
}

return connect.NewResponse(&pbconsole.GetCallsResponse{
Calls: slices.Map(filterCallEvents(events), callEventToCall),
Calls: slices.Map(filterEvents[*dal.CallEvent](events), callEventToCall),
}), nil
}

Expand All @@ -118,16 +119,31 @@ func (c *ConsoleService) GetRequestCalls(ctx context.Context, req *connect.Reque
return nil, errors.WithStack(err)
}

events, err := c.dal.QueryEvents(ctx, time.Time{}, time.Now(), dal.FilterRequests(requestKey))
events, err := c.dal.QueryEvents(ctx, dal.FilterRequests(requestKey))
if err != nil {
return nil, errors.WithStack(err)
}

return connect.NewResponse(&pbconsole.GetRequestCallsResponse{
Calls: slices.Map(filterCallEvents(events), callEventToCall),
Calls: slices.Map(filterEvents[*dal.CallEvent](events), callEventToCall),
}), nil
}

func (c *ConsoleService) GetTimeline(ctx context.Context, req *connect.Request[pbconsole.TimelineQuery]) (*connect.Response[pbconsole.GetTimelineResponse], error) {
query, err := timelineQueryProtoToDAL(req.Msg)
if err != nil {
return nil, errors.WithStack(err)
}
results, err := c.dal.QueryEvents(ctx, query...)
if err != nil {
return nil, errors.WithStack(err)
}
response := &pbconsole.GetTimelineResponse{
Events: slices.Map(results, eventDALToProto),
}
return connect.NewResponse(response), nil
}

func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Request[pbconsole.StreamTimelineRequest], stream *connect.ServerStream[pbconsole.StreamTimelineResponse]) error {
// Default to 1 second interval if not specified.
updateInterval := 1 * time.Second
Expand All @@ -143,50 +159,26 @@ func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Reques
}
query = append(query, dal.FilterDeployments(deploymentName))
}
var lastEventTime time.Time
if req.Msg.AfterTime != nil {
lastEventTime = req.Msg.AfterTime.AsTime()
} else {
lastEventTime = time.Now()
}

lastEventTime := req.Msg.AfterTime.AsTime()
for {
thisRequestTime := time.Now()
events, err := c.dal.QueryEvents(ctx, lastEventTime, thisRequestTime, query...)
events, err := c.dal.QueryEvents(ctx, append(query, dal.FilterTimeRange(thisRequestTime, lastEventTime))...)
if err != nil {
return errors.WithStack(err)
}

timelineEvents := filterTimelineEvents(events)
for index, timelineEvent := range timelineEvents {
for index, timelineEvent := range events {
more := len(events) > index+1
var err error

switch event := timelineEvent.(type) {
case *dal.CallEvent:
err = stream.Send(&pbconsole.StreamTimelineResponse{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.StreamTimelineResponse_Call{
Call: callEventToCall(event),
},
More: more,
})
case *dal.LogEvent:
err = stream.Send(&pbconsole.StreamTimelineResponse{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.StreamTimelineResponse_Log{
Log: logEventToLogEntry(event),
},
More: more,
})
case *dal.DeploymentEvent:
err = stream.Send(&pbconsole.StreamTimelineResponse{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.StreamTimelineResponse_Deployment{
Deployment: deploymentEventToDeployment(event),
},
More: more,
})
}

err := stream.Send(&pbconsole.StreamTimelineResponse{
Event: eventDALToProto(timelineEvent),
More: more,
})
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -203,7 +195,7 @@ func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Reques
func (c *ConsoleService) StreamLogs(ctx context.Context, req *connect.Request[pbconsole.StreamLogsRequest], stream *connect.ServerStream[pbconsole.StreamLogsResponse]) error {
// Default to 1 second interval if not specified.
updateInterval := 1 * time.Second
if req.Msg.UpdateInterval != nil && req.Msg.UpdateInterval.AsDuration() > time.Second { // Minimum 1s interval.
if interval := req.Msg.UpdateInterval; interval != nil && interval.AsDuration() > time.Second { // Minimum 1s interval.
updateInterval = req.Msg.UpdateInterval.AsDuration()
}

Expand All @@ -215,16 +207,15 @@ func (c *ConsoleService) StreamLogs(ctx context.Context, req *connect.Request[pb
}
query = append(query, dal.FilterDeployments(deploymentName))
}

lastLogTime := req.Msg.AfterTime.AsTime()
for {
thisRequestTime := time.Now()
events, err := c.dal.QueryEvents(ctx, lastLogTime, thisRequestTime, query...)
events, err := c.dal.QueryEvents(ctx, append(query, dal.FilterTimeRange(thisRequestTime, lastLogTime))...)
if err != nil {
return errors.WithStack(err)
}

logEvents := filterLogEvents(events)
logEvents := filterEvents[*dal.LogEvent](events)
for index, log := range logEvents {
var requestKey *string
if r, ok := log.RequestKey.Get(); ok {
Expand Down Expand Up @@ -309,6 +300,8 @@ func deploymentEventToDeployment(event *dal.DeploymentEvent) *pbconsole.Deployme
eventType = pbconsole.DeploymentEventType_DEPLOYMENT_UPDATED
case dal.DeploymentReplaced:
eventType = pbconsole.DeploymentEventType_DEPLOYMENT_REPLACED
default:
panic(errors.Errorf("unknown deployment event type %v", event.Type))
}

var replaced *string
Expand All @@ -326,38 +319,121 @@ func deploymentEventToDeployment(event *dal.DeploymentEvent) *pbconsole.Deployme
}
}

func filterCallEvents(events []dal.Event) []*dal.CallEvent {
var filtered []*dal.CallEvent
func filterEvents[E dal.Event](events []dal.Event) []E {
var filtered []E
for _, event := range events {
if call, ok := event.(*dal.CallEvent); ok {
filtered = append(filtered, call)
if e, ok := event.(E); ok {
filtered = append(filtered, e)
}
}
return filtered
}

func filterLogEvents(events []dal.Event) []*dal.LogEvent {
var filtered []*dal.LogEvent
for _, event := range events {
if log, ok := event.(*dal.LogEvent); ok {
filtered = append(filtered, log)
func timelineQueryProtoToDAL(pb *pbconsole.TimelineQuery) ([]dal.EventFilter, error) {
var query []dal.EventFilter
for _, filter := range pb.Filters {
switch filter := filter.Filter.(type) {
case *pbconsole.TimelineQuery_Filter_Deployments:
deploymentNames := make([]model.DeploymentName, 0, len(filter.Deployments.Deployments))
for _, deployment := range filter.Deployments.Deployments {
deploymentName, err := model.ParseDeploymentName(deployment)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.WithStack(err))
}
deploymentNames = append(deploymentNames, deploymentName)
}
query = append(query, dal.FilterDeployments(deploymentNames...))

case *pbconsole.TimelineQuery_Filter_Requests:
requestKeys := make([]model.IngressRequestKey, 0, len(filter.Requests.Requests))
for _, request := range filter.Requests.Requests {
requestKey, err := model.ParseIngressRequestKey(request)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.WithStack(err))
}
requestKeys = append(requestKeys, requestKey)
}
query = append(query, dal.FilterRequests(requestKeys...))

case *pbconsole.TimelineQuery_Filter_EventTypes:
eventTypes := make([]dal.EventType, 0, len(filter.EventTypes.EventTypes))
for _, eventType := range filter.EventTypes.EventTypes {
switch eventType {
case pbconsole.EventType_EVENT_TYPE_CALL:
eventTypes = append(eventTypes, dal.EventTypeCall)
case pbconsole.EventType_EVENT_TYPE_LOG:
eventTypes = append(eventTypes, dal.EventTypeLog)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT:
eventTypes = append(eventTypes, dal.EventTypeDeployment)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, errors.Errorf("unknown event type %v", eventType))
}
}

case *pbconsole.TimelineQuery_Filter_LogLevel:
level := log.Level(filter.LogLevel.LogLevel)
if level < log.Trace || level > log.Error {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.Errorf("unknown log level %v", filter.LogLevel.LogLevel))
}
query = append(query, dal.FilterLogLevel(level))

case *pbconsole.TimelineQuery_Filter_Time:
var newerThan, olderThan time.Time
if filter.Time.NewerThan != nil {
newerThan = filter.Time.NewerThan.AsTime()
}
if filter.Time.OlderThan != nil {
olderThan = filter.Time.OlderThan.AsTime()
}
query = append(query, dal.FilterTimeRange(newerThan, olderThan))

case *pbconsole.TimelineQuery_Filter_Id:
var lowerThan, higherThan int64
if filter.Id.LowerThan != nil {
lowerThan = *filter.Id.LowerThan
}
if filter.Id.HigherThan != nil {
higherThan = *filter.Id.HigherThan
}
query = append(query, dal.FilterIDRange(lowerThan, higherThan))

default:
return nil, connect.NewError(connect.CodeInvalidArgument, errors.Errorf("unknown filter %T", filter))
}
}
return filtered
return query, nil
}

func filterTimelineEvents(events []dal.Event) []dal.Event {
var filtered []dal.Event
for _, event := range events {
if _, ok := event.(*dal.LogEvent); ok {
filtered = append(filtered, event)
func eventDALToProto(event dal.Event) *pbconsole.TimelineEvent {
switch event := event.(type) {
case *dal.CallEvent:
return &pbconsole.TimelineEvent{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.TimelineEvent_Call{
Call: callEventToCall(event),
},
}
if _, ok := event.(*dal.CallEvent); ok {
filtered = append(filtered, event)

case *dal.LogEvent:
return &pbconsole.TimelineEvent{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.TimelineEvent_Log{
Log: logEventToLogEntry(event),
},
}
if _, ok := event.(*dal.DeploymentEvent); ok {
filtered = append(filtered, event)

case *dal.DeploymentEvent:
return &pbconsole.TimelineEvent{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.TimelineEvent_Deployment{
Deployment: deploymentEventToDeployment(event),
},
}

default:
panic(errors.Errorf("unknown event type %T", event))
}
return filtered
}
Loading