diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 5b8b75122..aecfdf97c 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -42,6 +42,11 @@ type AsyncErrorWriter interface { WriteError(ctx *Context, err error, res *GraphQLResponse, w io.Writer) } +// Resolver is a single threaded event loop that processes all events on a single goroutine. +// It is absolutely critical to ensure that all events are processed quickly to prevent blocking +// and that resolver modifications are done on the event loop goroutine. Long-running operations +// should be offloaded to the subscription worker goroutine. If a different goroutine needs to emit +// an event, it should be done through the events channel to avoid race conditions. type Resolver struct { ctx context.Context options ResolverOptions @@ -283,6 +288,7 @@ type sub struct { // startWorker runs in its own goroutine to process fetches and write data to the client synchronously // it also takes care of sending heartbeats to the client but only if the subscription supports it +// TODO implement a goroutine pool that is sharded by the subscription id to avoid creating a new goroutine for each subscription func (s *sub) startWorker() { if s.heartbeat { s.startWorkerWithHeartbeat() @@ -291,6 +297,9 @@ func (s *sub) startWorker() { s.startWorkerWithoutHeartbeat() } +// startWorkerWithHeartbeat is similar to startWorker but sends heartbeats to the client when +// subscription over multipart is used. It sends a heartbeat to the client every heartbeatInterval. +// TODO: Implement a shared timer implementation to avoid creating a new ticker for each subscription. func (s *sub) startWorkerWithHeartbeat() { heartbeatTicker := time.NewTicker(s.resolver.heartbeatInterval) defer heartbeatTicker.Stop() @@ -301,12 +310,14 @@ func (s *sub) startWorkerWithHeartbeat() { return case <-heartbeatTicker.C: s.resolver.handleHeartbeat(s, multipartHeartbeat) - case fn := <-s.workChan: + case fn, ok := <-s.workChan: + if !ok { + s.complete() + return + } fn() // Reset the heartbeat ticker after each write to avoid sending unnecessary heartbeats heartbeatTicker.Reset(s.resolver.heartbeatInterval) - case <-s.completed: // Shutdown the writer when the subscription is completed - return } } } @@ -317,14 +328,29 @@ func (s *sub) startWorkerWithoutHeartbeat() { select { case <-s.resolver.ctx.Done(): // Skip sending events if the resolver is shutting down return - case fn := <-s.workChan: + case fn, ok := <-s.workChan: + if !ok { + s.complete() + return + } fn() - case <-s.completed: // Shutdown the writer when the subscription is completed - return } } } +func (s *sub) complete() { + // The channel is used to communicate that the subscription is done + // It is used only in the synchronous subscription case and to avoid sending events + // to a subscription that is already done. + defer close(s.completed) + + // We put the complete handshake to the work channel of the subscription + // to ensure that it is the last message that is sent to the client. + if s.ctx.Context().Err() == nil { + s.writer.Complete() + } +} + func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, sharedInput []byte) { if r.options.Debug { fmt.Printf("resolver:trigger:subscription:update:%d\n", sub.id.SubscriptionID) @@ -411,6 +437,7 @@ func (r *Resolver) processEvents() { // All events are processed in the order they are received and need to be processed quickly // to prevent blocking the event loop and any other events from being processed. // TODO: consider using a worker pool that distributes events from different triggers to different workers +// to avoid blocking the event loop and improve performance. func (r *Resolver) handleEvent(event subscriptionEvent) { switch event.kind { case subscriptionEventKindAddSubscription: @@ -543,6 +570,7 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription) triggerID: triggerID, ch: r.events, ctx: ctx, + completed: s.completed, } cloneCtx := add.ctx.clone(ctx) trig = &trigger{ @@ -699,13 +727,7 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) { case <-c.ctx.Done(): // Skip sending the event if the client disconnected case s.workChan <- fn: - // Send the work to the subscription worker - case <-s.completed: - // Stop sending if the subscription is completed. Otherwise, this could block the event loop forever - // when the subscription worker was shutdown after channel close but the event was still scheduled. - if s.resolver.options.Debug { - fmt.Printf("resolver:trigger:subscription:completed:%d:%d\n", s.id.ConnectionID, s.id.SubscriptionID) - } + // Send the event to the subscription worker } } } @@ -748,24 +770,13 @@ func (r *Resolver) shutdownTriggerSubscriptions(id uint64, shutdownMatcher func( if shutdownMatcher != nil && !shutdownMatcher(s.id) { continue } - // We close the completed channel on the work channel of the subscription - // to ensure that all jobs are processed before the channel is closed. - select { - case <-r.ctx.Done(): - // Skip sending the event if the resolver is shutting down - case <-c.ctx.Done(): - // Skip sending the event if the client disconnected - case s.workChan <- func() { - // We put the complete handshake to the work channel of the subscription - // to ensure that it is the last message that is sent to the client. - if c.Context().Err() == nil { - s.writer.Complete() - } - // This will shutdown the subscription worker - close(s.completed) - }: - } + // Because the event loop is single threaded, we can safely close the channel from this sender + // The subscription worker will finish processing all events before the channel is closed. + close(s.workChan) + + // Important because we remove the subscription from the trigger on the same goroutine + // as we send work to the subscription worker. We can ensure that no new work is sent to the worker after this point. delete(trig.subscriptions, c) if r.options.Debug { @@ -1015,6 +1026,7 @@ type subscriptionUpdater struct { triggerID uint64 ch chan subscriptionEvent ctx context.Context + completed chan struct{} } func (s *subscriptionUpdater) Update(data []byte) { @@ -1025,6 +1037,8 @@ func (s *subscriptionUpdater) Update(data []byte) { select { case <-s.ctx.Done(): return + case <-s.completed: // Fail fast if already completed + return case s.ch <- subscriptionEvent{ triggerID: s.triggerID, kind: subscriptionEventKindTriggerUpdate, @@ -1041,6 +1055,8 @@ func (s *subscriptionUpdater) Done() { select { case <-s.ctx.Done(): return + case <-s.completed: // Fail fast if already completed + return case s.ch <- subscriptionEvent{ triggerID: s.triggerID, kind: subscriptionEventKindTriggerDone,