Skip to content

Commit

Permalink
feat: Add limit to GetTimeline query (#375)
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman authored Sep 12, 2023
1 parent 42c1517 commit deddbd8
Show file tree
Hide file tree
Showing 8 changed files with 601 additions and 644 deletions.
88 changes: 28 additions & 60 deletions backend/controller/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
}

func (c *ConsoleService) GetCalls(ctx context.Context, req *connect.Request[pbconsole.GetCallsRequest]) (*connect.Response[pbconsole.GetCallsResponse], error) {
events, err := c.dal.QueryEvents(ctx, dal.FilterCall(types.None[string](), req.Msg.Module, types.Some(req.Msg.Verb)))
events, err := c.dal.QueryEvents(ctx, 100, dal.FilterCall(types.None[string](), req.Msg.Module, types.Some(req.Msg.Verb)))
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -119,7 +119,7 @@ func (c *ConsoleService) GetRequestCalls(ctx context.Context, req *connect.Reque
return nil, errors.WithStack(err)
}

events, err := c.dal.QueryEvents(ctx, dal.FilterRequests(requestKey))
events, err := c.dal.QueryEvents(ctx, 100, dal.FilterRequests(requestKey))
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -134,12 +134,31 @@ func (c *ConsoleService) GetTimeline(ctx context.Context, req *connect.Request[p
if err != nil {
return nil, errors.WithStack(err)
}
results, err := c.dal.QueryEvents(ctx, query...)

if req.Msg.Limit == 0 {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("limit must be > 0"))
}
limit := int(req.Msg.Limit)

// Get 1 more than the requested limit to determine if there are more results.
limitPlusOne := limit + 1

results, err := c.dal.QueryEvents(ctx, limitPlusOne, query...)
if err != nil {
return nil, errors.WithStack(err)
}

var cursor *int64
// Return only the requested number of results.
if len(results) > limit {
results = results[:limit]
id := results[len(results)-1].GetID()
cursor = &id
}

response := &pbconsole.GetTimelineResponse{
Events: slices.Map(results, eventDALToProto),
Cursor: cursor,
}
return connect.NewResponse(response), nil
}
Expand Down Expand Up @@ -168,7 +187,7 @@ func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Reques

for {
thisRequestTime := time.Now()
events, err := c.dal.QueryEvents(ctx, append(query, dal.FilterTimeRange(thisRequestTime, lastEventTime))...)
events, err := c.dal.QueryEvents(ctx, 1000, append(query, dal.FilterTimeRange(thisRequestTime, lastEventTime))...)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -192,62 +211,6 @@ func (c *ConsoleService) StreamTimeline(ctx context.Context, req *connect.Reques
}
}

func (c *ConsoleService) StreamLogs(ctx context.Context, req *connect.Request[pbconsole.StreamLogsRequest], stream *connect.ServerStream[pbconsole.StreamLogsResponse]) error {
// Default to 1 second interval if not specified.
updateInterval := 1 * time.Second
if interval := req.Msg.UpdateInterval; interval != nil && interval.AsDuration() > time.Second { // Minimum 1s interval.
updateInterval = req.Msg.UpdateInterval.AsDuration()
}

var query []dal.EventFilter
if req.Msg.DeploymentName != "" {
deploymentName, err := model.ParseDeploymentName(req.Msg.DeploymentName)
if err != nil {
return errors.WithStack(err)
}
query = append(query, dal.FilterDeployments(deploymentName))
}
lastLogTime := req.Msg.AfterTime.AsTime()
for {
thisRequestTime := time.Now()
events, err := c.dal.QueryEvents(ctx, append(query, dal.FilterTimeRange(thisRequestTime, lastLogTime))...)
if err != nil {
return errors.WithStack(err)
}

logEvents := filterEvents[*dal.LogEvent](events)
for index, log := range logEvents {
var requestKey *string
if r, ok := log.RequestKey.Get(); ok {
rstr := r.String()
requestKey = &rstr
}

err := stream.Send(&pbconsole.StreamLogsResponse{
Log: &pbconsole.LogEntry{
DeploymentName: log.DeploymentName.String(),
RequestKey: requestKey,
TimeStamp: timestamppb.New(log.Time),
LogLevel: log.Level,
Attributes: log.Attributes,
Message: log.Message,
Error: log.Error.Ptr(),
},
More: len(logEvents) > index+1,
})
if err != nil {
return errors.WithStack(err)
}
}
lastLogTime = thisRequestTime
select {
case <-time.After(updateInterval):
case <-ctx.Done():
return nil
}
}
}

func callEventToCall(event *dal.CallEvent) *pbconsole.Call {
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
Expand Down Expand Up @@ -331,6 +294,11 @@ func filterEvents[E dal.Event](events []dal.Event) []E {

func timelineQueryProtoToDAL(pb *pbconsole.TimelineQuery) ([]dal.EventFilter, error) {
var query []dal.EventFilter

if pb.Order == pbconsole.TimelineQuery_DESC {
query = append(query, dal.FilterDescending())
}

for _, filter := range pb.Filters {
switch filter := filter.Filter.(type) {
case *pbconsole.TimelineQuery_Filter_Deployments:
Expand Down
14 changes: 10 additions & 4 deletions backend/controller/internal/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,26 +254,32 @@ func TestDAL(t *testing.T) {
}

t.Run("QueryEvents", func(t *testing.T) {
t.Run("Limit", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, 1)
assert.NoError(t, err)
assert.Equal(t, 1, len(events))
})

t.Run("NoFilters", func(t *testing.T) {
events, err := dal.QueryEvents(ctx)
events, err := dal.QueryEvents(ctx, 1000)
assert.NoError(t, err)
assertEventsEqual(t, []Event{expectedDeploymentEvent, callEvent, logEvent}, events)
})

t.Run("ByDeployment", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, FilterDeployments(deploymentName))
events, err := dal.QueryEvents(ctx, 1000, FilterDeployments(deploymentName))
assert.NoError(t, err)
assertEventsEqual(t, []Event{expectedDeploymentEvent, callEvent, logEvent}, events)
})

t.Run("ByCall", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, FilterTypes(EventTypeCall), FilterCall(types.None[string](), "time", types.None[string]()))
events, err := dal.QueryEvents(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(types.None[string](), "time", types.None[string]()))
assert.NoError(t, err)
assertEventsEqual(t, []Event{callEvent}, events)
})

t.Run("ByLogLevel", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace))
events, err := dal.QueryEvents(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace))
assert.NoError(t, err)
assertEventsEqual(t, []Event{logEvent}, events)
})
Expand Down
36 changes: 30 additions & 6 deletions backend/controller/internal/dal/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ const (
// Event types.
//
//sumtype:decl
type Event interface{ event() }
type Event interface {
GetID() int64
event()
}

type LogEvent struct {
ID int64
Expand All @@ -50,7 +53,8 @@ type LogEvent struct {
Error types.Option[string]
}

func (e *LogEvent) event() {}
func (e *LogEvent) GetID() int64 { return e.ID }
func (e *LogEvent) event() {}

type CallEvent struct {
ID int64
Expand All @@ -65,7 +69,8 @@ type CallEvent struct {
Error types.Option[string]
}

func (e *CallEvent) event() {}
func (e *CallEvent) GetID() int64 { return e.ID }
func (e *CallEvent) event() {}

type DeploymentEvent struct {
ID int64
Expand All @@ -78,7 +83,8 @@ type DeploymentEvent struct {
ReplacedDeployment types.Option[model.DeploymentName]
}

func (e *DeploymentEvent) event() {}
func (e *DeploymentEvent) GetID() int64 { return e.ID }
func (e *DeploymentEvent) event() {}

type eventFilterCall struct {
sourceModule types.Option[string]
Expand All @@ -96,6 +102,7 @@ type eventFilter struct {
olderThan time.Time
idHigherThan int64
idLowerThan int64
descending bool
}

type EventFilter func(query *eventFilter)
Expand Down Expand Up @@ -155,6 +162,13 @@ func FilterIDRange(higherThan, lowerThan int64) EventFilter {
}
}

// FilterDescending returns events in descending order.
func FilterDescending() EventFilter {
return func(query *eventFilter) {
query.descending = true
}
}

// The internal JSON payload of a call event.
type eventCallJSON struct {
DurationMS int64 `json:"duration_ms"`
Expand All @@ -180,7 +194,11 @@ type eventRow struct {
RequestKey types.Option[model.IngressRequestKey]
}

func (d *DAL) QueryEvents(ctx context.Context, filters ...EventFilter) ([]Event, error) {
func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter) ([]Event, error) {
if limit < 1 {
return nil, errors.Errorf("limit must be >= 1, got %d", limit)
}

// Build query.
q := `SELECT e.id AS id,
d.name AS deployment_name,
Expand Down Expand Up @@ -252,7 +270,13 @@ func (d *DAL) QueryEvents(ctx context.Context, filters ...EventFilter) ([]Event,
q += ")\n"
}

q += " ORDER BY time_stamp ASC"
if filter.descending {
q += " ORDER BY time_stamp DESC"
} else {
q += " ORDER BY time_stamp ASC"
}

q += fmt.Sprintf(" LIMIT %d", limit)

// Issue query.
rows, err := d.db.Conn().Query(ctx, q, args...)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit deddbd8

Please sign in to comment.