Skip to content

Commit

Permalink
feat: hook up filters to steaming endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Sep 18, 2023
1 parent aaef01c commit 2f87874
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 131 deletions.
28 changes: 15 additions & 13 deletions backend/controller/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,26 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
updateInterval = req.Msg.UpdateInterval.AsDuration()
}

var query []dal.EventFilter
if req.Msg.DeploymentName != "" {
deploymentName, err := model.ParseDeploymentName(req.Msg.DeploymentName)
if err != nil {
return errors.WithStack(err)
}
query = append(query, dal.FilterDeployments(deploymentName))
if req.Msg.Query.Limit == 0 {
return connect.NewError(connect.CodeInvalidArgument, errors.New("limit must be > 0"))
}
var lastEventTime time.Time
if req.Msg.AfterTime != nil {
lastEventTime = req.Msg.AfterTime.AsTime()
} else {
lastEventTime = time.Now()

query, err := eventsQueryProtoToDAL(req.Msg.Query)
if err != nil {
return errors.WithStack(err)
}

// Default to last 1 day of events
var lastEventTime time.Time
for {
thisRequestTime := time.Now()
events, err := c.dal.QueryEvents(ctx, 1000, append(query, dal.FilterTimeRange(thisRequestTime, lastEventTime))...)
newQuery := query

if !lastEventTime.IsZero() {
newQuery = append(newQuery, dal.FilterTimeRange(thisRequestTime, lastEventTime))
}

events, err := c.dal.QueryEvents(ctx, int(req.Msg.Query.Limit), newQuery...)
if err != nil {
return errors.WithStack(err)
}
Expand Down
30 changes: 26 additions & 4 deletions console/client/src/features/timeline/Timeline.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Timestamp } from '@bufbuild/protobuf'
import React from 'react'
import { Event, EventsQuery_Filter } from '../../protos/xyz/block/ftl/v1/console/console_pb.ts'
import { SidePanelContext } from '../../providers/side-panel-provider.tsx'
import { getEvents, timeFilter } from '../../services/console.service.ts'
import { getEvents, streamEvents, timeFilter } from '../../services/console.service.ts'
import { formatTimestampShort } from '../../utils/date.utils.ts'
import { panelColor } from '../../utils/style.utils.ts'
import { TimelineCall } from './TimelineCall.tsx'
Expand All @@ -19,6 +19,8 @@ interface Props {
filters: EventsQuery_Filter[]
}

const maxTimelineEntries = 1000

export const Timeline = ({ timeSettings, filters }: Props) => {
const { openPanel, closePanel, isOpen } = React.useContext(SidePanelContext)
const [entries, setEntries] = React.useState<Event[]>([])
Expand All @@ -27,7 +29,10 @@ export const Timeline = ({ timeSettings, filters }: Props) => {
const [selectedLogLevels] = React.useState<number[]>([1, 5, 9, 13, 17])

React.useEffect(() => {
const abortController = new AbortController()
abortController.signal
const fetchEvents = async () => {
console.log('fetching events')
let eventFilters = filters
if (timeSettings.newerThan || timeSettings.olderThan) {
eventFilters = [timeFilter(timeSettings.olderThan, timeSettings.newerThan), ...filters]
Expand All @@ -36,7 +41,24 @@ export const Timeline = ({ timeSettings, filters }: Props) => {
setEntries(events)
}

fetchEvents()
if (timeSettings.isTailing && !timeSettings.isPaused) {
console.log('streaming events')
setEntries([])
streamEvents({
abortControllerSignal: abortController.signal,
filters,
onEventReceived: (event) => {
if (!timeSettings.isPaused) {
setEntries((prev) => [event, ...prev].slice(0, maxTimelineEntries))
}
},
})
} else {
fetchEvents()
}
return () => {
abortController.abort()
}
}, [filters, timeSettings])

React.useEffect(() => {
Expand Down Expand Up @@ -95,8 +117,8 @@ export const Timeline = ({ timeSettings, filters }: Props) => {
<tr
key={entry.id.toString()}
className={`flex border-b border-gray-100 dark:border-slate-700 text-xs font-roboto-mono ${
selectedEntry?.id === entry.id ? 'bg-indigo-50 dark:bg-slate-800' : panelColor
} relative flex cursor-pointer hover:bg-indigo-50 dark:hover:bg-slate-800`}
selectedEntry?.id === entry.id ? 'bg-indigo-50 dark:bg-slate-700' : panelColor
} relative flex cursor-pointer hover:bg-indigo-50 dark:hover:bg-slate-700`}
onClick={() => handleEntryClicked(entry)}
>
<td className='w-8 flex-none flex items-center justify-center'>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export const TimelineTimeControls = ({ onTimeSettingsChange }: Props) => {
}
if (range.value === TIME_RANGES['tail'].value) {
setNewerThan(undefined)
setIsPaused(false)
}
}

Expand Down
12 changes: 3 additions & 9 deletions console/client/src/protos/xyz/block/ftl/v1/console/console_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions console/client/src/services/console.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,24 @@ export const getEvents = async (

return response.events
}

export interface StreamEventsParams {
abortControllerSignal: AbortSignal
filters: EventsQuery_Filter[]
onEventReceived: (event: Event) => void
}

export const streamEvents = async ({ abortControllerSignal, filters, onEventReceived }: StreamEventsParams) => {
try {
for await (const response of client.streamEvents(
{ updateInterval: { seconds: BigInt(1) }, query: { limit: 1000, filters } },
{ signal: abortControllerSignal },
)) {
if (response.event != null) {
onEventReceived(response.event)
}
}
} catch (error) {
console.error('Streaming error:', error)
}
}
Loading

0 comments on commit 2f87874

Please sign in to comment.