Skip to content

Commit

Permalink
use context.Cause (#1894)
Browse files Browse the repository at this point in the history
WithCancelCause ctx.Err() returns context.ErrCancelled,
need to call context.Cause to return cause,
context.Cause returns ctx.Err when cause function not cause of context done

This was preventing scripting errors from bubbling into mirror logs
  • Loading branch information
serprex authored Jun 29, 2024
1 parent 3ff842e commit 87e8f0d
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 12 deletions.
6 changes: 1 addition & 5 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,7 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
numRecords := atomic.Int64{}
lastSeenLSN := atomic.Int64{}

queueCtx, queueCtxErr := context.WithCancelCause(ctx)
queueErr := func(err error) {
c.logger.Error("[kafka] queue error", slog.Any("error", err))
queueCtxErr(err)
}
queueCtx, queueErr := context.WithCancelCause(ctx)

pool, err := c.createPool(queueCtx, req.Script, req.FlowJobName, &lastSeenLSN, queueErr)
if err != nil {
Expand Down
8 changes: 2 additions & 6 deletions flow/connectors/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,7 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord
publish := make(chan publishResult, 32)
waitChan := make(chan struct{})

queueCtx, queueCtxErr := context.WithCancelCause(ctx)
queueErr := func(err error) {
c.logger.Error("[pubsub] queue error", slog.Any("error", err))
queueCtxErr(err)
}
queueCtx, queueErr := context.WithCancelCause(ctx)

pool, err := c.createPool(queueCtx, req.Script, req.FlowJobName, &topiccache, publish, queueErr)
if err != nil {
Expand Down Expand Up @@ -369,7 +365,7 @@ Loop:
topiccache.Stop(queueCtx)
select {
case <-queueCtx.Done():
return nil, fmt.Errorf("[pubsub] queueCtx.Done: %w", queueCtx.Err())
return nil, fmt.Errorf("[pubsub] queueCtx.Done: %w", context.Cause(queueCtx))
case <-waitChan:
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@ func (pool *LPool[T]) Wait(ctx context.Context) error {
case <-pool.wait:
return nil
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
}
}

0 comments on commit 87e8f0d

Please sign in to comment.