diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index cd0aadd04..b1143be14 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -488,7 +488,18 @@ func (b *batchProcessor) Process( "%s, error(s) occured in processing (sending err)", b.topic, ) b.handleShutdown() - errChan <- errors + + // send to channel with context check, fix #170 + select { + case <-session.Context().Done(): + klog.V(2).Infof( + "%s: processor returning, session ctx done", + b.topic, + ) + return + case errChan <- errors: + } + klog.Errorf( "%s, error(s) occured: %+v, processor shutdown.", b.topic, @@ -512,7 +523,16 @@ func (b *batchProcessor) Process( } err := b.signalLoad(resp) if err != nil { - errChan <- err + // send to channel with context check, fix #170 + select { + case <-session.Context().Done(): + klog.V(2).Infof( + "%s: processor returning, session ctx done", + b.topic, + ) + return + case errChan <- err: + } klog.Errorf( "%s, error signalling: %v, processor shutdown.", b.topic,