Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(subscription): never try to send on blocked channel when subscription was completed #1100

Merged
merged 10 commits into from
Mar 3, 2025
76 changes: 46 additions & 30 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type AsyncErrorWriter interface {
WriteError(ctx *Context, err error, res *GraphQLResponse, w io.Writer)
}

// Resolver is a single threaded event loop that processes all events on a single goroutine.
// It is absolutely critical to ensure that all events are processed quickly to prevent blocking
// and that resolver modifications are done on the event loop goroutine. Long-running operations
// should be offloaded to the subscription worker goroutine. If a different goroutine needs to emit
// an event, it should be done through the events channel to avoid race conditions.
type Resolver struct {
ctx context.Context
options ResolverOptions
Expand Down Expand Up @@ -283,6 +288,7 @@ type sub struct {

// startWorker runs in its own goroutine to process fetches and write data to the client synchronously
// it also takes care of sending heartbeats to the client but only if the subscription supports it
// TODO implement a goroutine pool that is sharded by the subscription id to avoid creating a new goroutine for each subscription
func (s *sub) startWorker() {
if s.heartbeat {
s.startWorkerWithHeartbeat()
Expand All @@ -291,6 +297,9 @@ func (s *sub) startWorker() {
s.startWorkerWithoutHeartbeat()
}

// startWorkerWithHeartbeat is similar to startWorker but sends heartbeats to the client when
// subscription over multipart is used. It sends a heartbeat to the client every heartbeatInterval.
// TODO: Implement a shared timer implementation to avoid creating a new ticker for each subscription.
func (s *sub) startWorkerWithHeartbeat() {
heartbeatTicker := time.NewTicker(s.resolver.heartbeatInterval)
defer heartbeatTicker.Stop()
Expand All @@ -301,12 +310,14 @@ func (s *sub) startWorkerWithHeartbeat() {
return
case <-heartbeatTicker.C:
s.resolver.handleHeartbeat(s, multipartHeartbeat)
case fn := <-s.workChan:
case fn, ok := <-s.workChan:
if !ok {
s.complete()
return
}
fn()
// Reset the heartbeat ticker after each write to avoid sending unnecessary heartbeats
heartbeatTicker.Reset(s.resolver.heartbeatInterval)
case <-s.completed: // Shutdown the writer when the subscription is completed
return
}
}
}
Expand All @@ -317,14 +328,29 @@ func (s *sub) startWorkerWithoutHeartbeat() {
select {
case <-s.resolver.ctx.Done(): // Skip sending events if the resolver is shutting down
return
case fn := <-s.workChan:
case fn, ok := <-s.workChan:
if !ok {
s.complete()
return
}
fn()
case <-s.completed: // Shutdown the writer when the subscription is completed
return
}
}
}

func (s *sub) complete() {
// The channel is used to communicate that the subscription is done
// It is used only in the synchronous subscription case and to avoid sending events
// to a subscription that is already done.
defer close(s.completed)

// We put the complete handshake to the work channel of the subscription
// to ensure that it is the last message that is sent to the client.
if s.ctx.Context().Err() == nil {
s.writer.Complete()
}
}

func (r *Resolver) executeSubscriptionUpdate(resolveCtx *Context, sub *sub, sharedInput []byte) {
if r.options.Debug {
fmt.Printf("resolver:trigger:subscription:update:%d\n", sub.id.SubscriptionID)
Expand Down Expand Up @@ -411,6 +437,7 @@ func (r *Resolver) processEvents() {
// All events are processed in the order they are received and need to be processed quickly
// to prevent blocking the event loop and any other events from being processed.
// TODO: consider using a worker pool that distributes events from different triggers to different workers
// to avoid blocking the event loop and improve performance.
func (r *Resolver) handleEvent(event subscriptionEvent) {
switch event.kind {
case subscriptionEventKindAddSubscription:
Expand Down Expand Up @@ -543,6 +570,7 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription)
triggerID: triggerID,
ch: r.events,
ctx: ctx,
completed: s.completed,
}
cloneCtx := add.ctx.clone(ctx)
trig = &trigger{
Expand Down Expand Up @@ -699,13 +727,7 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
case <-c.ctx.Done():
// Skip sending the event if the client disconnected
case s.workChan <- fn:
// Send the work to the subscription worker
case <-s.completed:
// Stop sending if the subscription is completed. Otherwise, this could block the event loop forever
// when the subscription worker was shutdown after channel close but the event was still scheduled.
if s.resolver.options.Debug {
fmt.Printf("resolver:trigger:subscription:completed:%d:%d\n", s.id.ConnectionID, s.id.SubscriptionID)
}
// Send the event to the subscription worker
}
}
}
Expand Down Expand Up @@ -748,24 +770,13 @@ func (r *Resolver) shutdownTriggerSubscriptions(id uint64, shutdownMatcher func(
if shutdownMatcher != nil && !shutdownMatcher(s.id) {
continue
}
// We close the completed channel on the work channel of the subscription
// to ensure that all jobs are processed before the channel is closed.
select {
case <-r.ctx.Done():
// Skip sending the event if the resolver is shutting down
case <-c.ctx.Done():
// Skip sending the event if the client disconnected
case s.workChan <- func() {
// We put the complete handshake to the work channel of the subscription
// to ensure that it is the last message that is sent to the client.
if c.Context().Err() == nil {
s.writer.Complete()
}
// This will shutdown the subscription worker
close(s.completed)
}:
}

// Because the event loop is single threaded, we can safely close the channel from this sender
// The subscription worker will finish processing all events before the channel is closed.
close(s.workChan)

// Important because we remove the subscription from the trigger on the same goroutine
// as we send work to the subscription worker. We can ensure that no new work is sent to the worker after this point.
delete(trig.subscriptions, c)

if r.options.Debug {
Expand Down Expand Up @@ -1015,6 +1026,7 @@ type subscriptionUpdater struct {
triggerID uint64
ch chan subscriptionEvent
ctx context.Context
completed chan struct{}
}

func (s *subscriptionUpdater) Update(data []byte) {
Expand All @@ -1025,6 +1037,8 @@ func (s *subscriptionUpdater) Update(data []byte) {
select {
case <-s.ctx.Done():
return
case <-s.completed: // Fail fast if already completed
return
case s.ch <- subscriptionEvent{
triggerID: s.triggerID,
kind: subscriptionEventKindTriggerUpdate,
Expand All @@ -1041,6 +1055,8 @@ func (s *subscriptionUpdater) Done() {
select {
case <-s.ctx.Done():
return
case <-s.completed: // Fail fast if already completed
return
case s.ch <- subscriptionEvent{
triggerID: s.triggerID,
kind: subscriptionEventKindTriggerDone,
Expand Down
Loading