From ff56abdeca23926b9d9b928be899041c3da3645b Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Sat, 20 Mar 2021 13:08:31 +0530 Subject: [PATCH 1/2] Apply learnings of ctx, wg learned and fix grace shutdown bug Fixes #170 https://stackoverflow.com/a/66708290/4106031 learned from --- cmd/redshiftbatcher/main.go | 41 ++++++++++++-------------- cmd/redshiftloader/main.go | 40 +++++++++++-------------- pkg/kafka/manager.go | 1 + pkg/redshiftbatcher/batch_processor.go | 11 ++++++- pkg/redshiftbatcher/batcher_handler.go | 21 ++++++++----- pkg/serializer/message.go | 15 +++++++--- 6 files changed, 72 insertions(+), 57 deletions(-) diff --git a/cmd/redshiftbatcher/main.go b/cmd/redshiftbatcher/main.go index 83ee55f2b..577f52bb7 100644 --- a/cmd/redshiftbatcher/main.go +++ b/cmd/redshiftbatcher/main.go @@ -73,6 +73,7 @@ func run(cmd *cobra.Command, args []string) { } ctx, cancel := context.WithCancel(context.Background()) + defer cancel() consumerGroups := make(map[string]kafka.ConsumerGroupInterface) var consumersReady []chan bool @@ -105,43 +106,39 @@ func run(cmd *cobra.Command, args []string) { groupID, groupConfig.TopicRegexes, ) + wg.Add(1) go manager.SyncTopics(ctx, wg) + wg.Add(1) go manager.Consume(ctx, wg) } + klog.V(2).Infof("consumerGroups: %v", len(consumersReady)) - sigterm := make(chan os.Signal, 1) - signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) - ready := 0 - - klog.V(2).Infof("ConsumerGroups: %v", len(consumersReady)) - for ready >= 0 { + go func() { + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { - default: case <-sigterm: klog.V(2).Info("SIGTERM signal received") - ready = -1 + cancel() + klog.V(2).Info("Cancelled main context") } + }() - if ready == -1 || ready == len(consumersReady) { - time.Sleep(3 * time.Second) - continue - } - - for _, channel := range consumersReady { + go func() { + for i, c := range consumersReady { select { - case <-channel: - ready += 1 - klog.V(2).Infof("ConsumerGroup #%d is up and running", ready) + case <-c: + klog.V(2).Infof( + "#%d consumerGroup is up and running", + i, + ) } } - } - - klog.V(2).Info("Cancelled main context") - cancel() + }() - klog.V(2).Info("Waiting for all goroutines to shutdown...") + klog.V(2).Info("wg wait()") wg.Wait() var closeErr error diff --git a/cmd/redshiftloader/main.go b/cmd/redshiftloader/main.go index e7e4182f3..f15a305cb 100644 --- a/cmd/redshiftloader/main.go +++ b/cmd/redshiftloader/main.go @@ -125,43 +125,39 @@ func run(cmd *cobra.Command, args []string) { groupConfig.TopicRegexes, // cancel, ) + wg.Add(1) go manager.SyncTopics(ctx, wg) + wg.Add(1) go manager.Consume(ctx, wg) } + klog.V(2).Infof("consumerGroups: %v", len(consumersReady)) - sigterm := make(chan os.Signal, 1) - signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) - ready := 0 - - klog.V(2).Infof("ConsumerGroups: %v", len(consumersReady)) - for ready >= 0 { + go func() { + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { - default: case <-sigterm: klog.V(2).Info("SIGTERM signal received") - ready = -1 + cancel() + klog.V(2).Info("Cancelled main context") } + }() - if ready == -1 || ready == len(consumersReady) { - time.Sleep(3 * time.Second) - continue - } - - for _, channel := range consumersReady { + go func() { + for i, c := range consumersReady { select { - case <-channel: - ready += 1 - klog.V(2).Infof("ConsumerGroup #%d is up and running", ready) + case <-c: + klog.V(2).Infof( + "#%d consumerGroup is up and running", + i, + ) } } - } - - klog.V(2).Info("Cancelling context to trigger graceful shutdown...") - cancel() + }() - klog.V(2).Info("Waiting for waitgroups to shutdown...") + klog.V(2).Info("wg wait()") wg.Wait() var closeErr error diff --git a/pkg/kafka/manager.go b/pkg/kafka/manager.go index b03f0831a..ee994aba3 100644 --- a/pkg/kafka/manager.go +++ b/pkg/kafka/manager.go @@ -192,6 +192,7 @@ func (c *Manager) SyncTopics( select { case <-ctx.Done(): + klog.V(2).Info("ctx cancelled bye") return case <-ticker.C: continue diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index c1df0121a..cd0aadd04 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -456,8 +456,8 @@ func (b *batchProcessor) Process( bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)), maskSchema: make(map[string]serializer.MaskInfo), } - go b.processBatch(wg, session, msgBuf, resp) wg.Add(1) + go b.processBatch(wg, session, msgBuf, resp) responses = append(responses, resp) } if len(responses) == 0 { @@ -501,6 +501,15 @@ func (b *batchProcessor) Process( // failure in between signal and marking the offset can lead to // duplicates in the loader topic, but it's ok as loader is idempotent for _, resp := range responses { + select { + default: + case <-session.Context().Done(): + klog.V(2).Infof( + "%s: processor returning, session ctx done", + b.topic, + ) + return + } err := b.signalLoad(resp) if err != nil { errChan <- err diff --git a/pkg/redshiftbatcher/batcher_handler.go b/pkg/redshiftbatcher/batcher_handler.go index 96b8b12ac..c1f797cf7 100644 --- a/pkg/redshiftbatcher/batcher_handler.go +++ b/pkg/redshiftbatcher/batcher_handler.go @@ -158,9 +158,14 @@ func (h *batcherHandler) ConsumeClaim( ) wg := &sync.WaitGroup{} - go processor.Process(wg, session, processChan, errChan) wg.Add(1) - defer wg.Wait() + go processor.Process(wg, session, processChan, errChan) + + defer func() { + klog.V(2).Infof("%s: wg wait() for processing to return", claim.Topic()) + wg.Wait() + klog.V(2).Infof("%s: wg done. processing returned", claim.Topic()) + }() klog.V(4).Infof("%s: read msgs", claim.Topic()) // NOTE: @@ -204,10 +209,10 @@ func (h *batcherHandler) ConsumeClaim( // Deserialize the message msg, err := h.serializer.Deserialize(message) if err != nil { - return fmt.Errorf("error deserializing binary, err: %s\n", err) + return fmt.Errorf("%s: consumeClaim returning, error deserializing binary, err: %s\n", claim.Topic(), err) } if msg == nil || msg.Value == nil { - return fmt.Errorf("got message as nil, message: %+v\n", msg) + return fmt.Errorf("%s: consumeClaim returning, error, got message as nil, message: %+v\n", claim.Topic(), msg) } if lastSchemaId == nil { @@ -220,10 +225,10 @@ func (h *batcherHandler) ConsumeClaim( msg.SchemaId, ) // Flush the batch due to schema change - msgBatch.Flush() + msgBatch.Flush(session.Context()) } // Flush the batch by size or insert in batch - msgBatch.Insert(msg) + msgBatch.Insert(session.Context(), msg) *lastSchemaId = msg.SchemaId case <-maxWaitTicker.C: // Flush the batch by time @@ -231,11 +236,11 @@ func (h *batcherHandler) ConsumeClaim( "%s: maxWaitSeconds hit", claim.Topic(), ) - msgBatch.Flush() + msgBatch.Flush(session.Context()) case err := <-errChan: syscall.Kill(syscall.Getpid(), syscall.SIGINT) klog.Errorf( - "%s: error occured in processing, err: %v, triggered shutdown", + "consumeClaim returning, %s: error occured in processing, err: %v, triggered shutdown", claim.Topic(), err, ) diff --git a/pkg/serializer/message.go b/pkg/serializer/message.go index 31ca90acd..e629d8032 100644 --- a/pkg/serializer/message.go +++ b/pkg/serializer/message.go @@ -1,6 +1,7 @@ package serializer import ( + "context" "github.com/Shopify/sarama" "github.com/practo/klog/v2" "sync" @@ -54,10 +55,16 @@ func NewMessageAsyncBatch( } } -func (b *MessageAsyncBatch) Flush() { +func (b *MessageAsyncBatch) Flush(ctx context.Context) { size := len(b.msgBuf) if size > 0 { - b.processChan <- b.msgBuf + // write to channel with context check, fixes #170 + select { + case <-ctx.Done(): + klog.V(2).Infof("%s: flush cancelled, ctx done, return", b.topic) + return + case b.processChan <- b.msgBuf: + } b.msgBuf = make([]*Message, 0, b.maxSize) klog.V(4).Infof( "%s: flushed:%d, processChan:%v", @@ -75,14 +82,14 @@ func (b *MessageAsyncBatch) Flush() { // insert makes the batch and also and flushes to the processor // if batchSize >= maxSize -func (b *MessageAsyncBatch) Insert(msg *Message) { +func (b *MessageAsyncBatch) Insert(ctx context.Context, msg *Message) { b.msgBuf = append(b.msgBuf, msg) if len(b.msgBuf) >= b.maxSize { klog.V(2).Infof( "%s: maxSize hit", msg.Topic, ) - b.Flush() + b.Flush(ctx) } } From 2a4f7a42b77cf873689272b1908c16aa1dced70f Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Sat, 20 Mar 2021 14:09:03 +0530 Subject: [PATCH 2/2] Send to chan with ctx check Fixes https://github.com/practo/tipoca-stream/pull/171#issuecomment-803272510 --- pkg/redshiftbatcher/batch_processor.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index cd0aadd04..b1143be14 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -488,7 +488,18 @@ func (b *batchProcessor) Process( "%s, error(s) occured in processing (sending err)", b.topic, ) b.handleShutdown() - errChan <- errors + + // send to channel with context check, fix #170 + select { + case <-session.Context().Done(): + klog.V(2).Infof( + "%s: processor returning, session ctx done", + b.topic, + ) + return + case errChan <- errors: + } + klog.Errorf( "%s, error(s) occured: %+v, processor shutdown.", b.topic, @@ -512,7 +523,16 @@ func (b *batchProcessor) Process( } err := b.signalLoad(resp) if err != nil { - errChan <- err + // send to channel with context check, fix #170 + select { + case <-session.Context().Done(): + klog.V(2).Infof( + "%s: processor returning, session ctx done", + b.topic, + ) + return + case errChan <- err: + } klog.Errorf( "%s, error signalling: %v, processor shutdown.", b.topic,