diff --git a/cmd/redshiftbatcher/main.go b/cmd/redshiftbatcher/main.go index 83ee55f2b..577f52bb7 100644 --- a/cmd/redshiftbatcher/main.go +++ b/cmd/redshiftbatcher/main.go @@ -73,6 +73,7 @@ func run(cmd *cobra.Command, args []string) { } ctx, cancel := context.WithCancel(context.Background()) + defer cancel() consumerGroups := make(map[string]kafka.ConsumerGroupInterface) var consumersReady []chan bool @@ -105,43 +106,39 @@ func run(cmd *cobra.Command, args []string) { groupID, groupConfig.TopicRegexes, ) + wg.Add(1) go manager.SyncTopics(ctx, wg) + wg.Add(1) go manager.Consume(ctx, wg) } + klog.V(2).Infof("consumerGroups: %v", len(consumersReady)) - sigterm := make(chan os.Signal, 1) - signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) - ready := 0 - - klog.V(2).Infof("ConsumerGroups: %v", len(consumersReady)) - for ready >= 0 { + go func() { + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { - default: case <-sigterm: klog.V(2).Info("SIGTERM signal received") - ready = -1 + cancel() + klog.V(2).Info("Cancelled main context") } + }() - if ready == -1 || ready == len(consumersReady) { - time.Sleep(3 * time.Second) - continue - } - - for _, channel := range consumersReady { + go func() { + for i, c := range consumersReady { select { - case <-channel: - ready += 1 - klog.V(2).Infof("ConsumerGroup #%d is up and running", ready) + case <-c: + klog.V(2).Infof( + "#%d consumerGroup is up and running", + i, + ) } } - } - - klog.V(2).Info("Cancelled main context") - cancel() + }() - klog.V(2).Info("Waiting for all goroutines to shutdown...") + klog.V(2).Info("wg wait()") wg.Wait() var closeErr error diff --git a/cmd/redshiftloader/main.go b/cmd/redshiftloader/main.go index e7e4182f3..f15a305cb 100644 --- a/cmd/redshiftloader/main.go +++ b/cmd/redshiftloader/main.go @@ -125,43 +125,39 @@ func run(cmd *cobra.Command, args []string) { groupConfig.TopicRegexes, // cancel, ) + wg.Add(1) go manager.SyncTopics(ctx, wg) + wg.Add(1) go manager.Consume(ctx, wg) } + klog.V(2).Infof("consumerGroups: %v", len(consumersReady)) - sigterm := make(chan os.Signal, 1) - signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) - ready := 0 - - klog.V(2).Infof("ConsumerGroups: %v", len(consumersReady)) - for ready >= 0 { + go func() { + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { - default: case <-sigterm: klog.V(2).Info("SIGTERM signal received") - ready = -1 + cancel() + klog.V(2).Info("Cancelled main context") } + }() - if ready == -1 || ready == len(consumersReady) { - time.Sleep(3 * time.Second) - continue - } - - for _, channel := range consumersReady { + go func() { + for i, c := range consumersReady { select { - case <-channel: - ready += 1 - klog.V(2).Infof("ConsumerGroup #%d is up and running", ready) + case <-c: + klog.V(2).Infof( + "#%d consumerGroup is up and running", + i, + ) } } - } - - klog.V(2).Info("Cancelling context to trigger graceful shutdown...") - cancel() + }() - klog.V(2).Info("Waiting for waitgroups to shutdown...") + klog.V(2).Info("wg wait()") wg.Wait() var closeErr error diff --git a/pkg/kafka/manager.go b/pkg/kafka/manager.go index b03f0831a..ee994aba3 100644 --- a/pkg/kafka/manager.go +++ b/pkg/kafka/manager.go @@ -192,6 +192,7 @@ func (c *Manager) SyncTopics( select { case <-ctx.Done(): + klog.V(2).Info("ctx cancelled bye") return case <-ticker.C: continue diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index c1df0121a..cd0aadd04 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -456,8 +456,8 @@ func (b *batchProcessor) Process( bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)), maskSchema: make(map[string]serializer.MaskInfo), } - go b.processBatch(wg, session, msgBuf, resp) wg.Add(1) + go b.processBatch(wg, session, msgBuf, resp) responses = append(responses, resp) } if len(responses) == 0 { @@ -501,6 +501,15 @@ func (b *batchProcessor) Process( // failure in between signal and marking the offset can lead to // duplicates in the loader topic, but it's ok as loader is idempotent for _, resp := range responses { + select { + default: + case <-session.Context().Done(): + klog.V(2).Infof( + "%s: processor returning, session ctx done", + b.topic, + ) + return + } err := b.signalLoad(resp) if err != nil { errChan <- err diff --git a/pkg/redshiftbatcher/batcher_handler.go b/pkg/redshiftbatcher/batcher_handler.go index 96b8b12ac..c1f797cf7 100644 --- a/pkg/redshiftbatcher/batcher_handler.go +++ b/pkg/redshiftbatcher/batcher_handler.go @@ -158,9 +158,14 @@ func (h *batcherHandler) ConsumeClaim( ) wg := &sync.WaitGroup{} - go processor.Process(wg, session, processChan, errChan) wg.Add(1) - defer wg.Wait() + go processor.Process(wg, session, processChan, errChan) + + defer func() { + klog.V(2).Infof("%s: wg wait() for processing to return", claim.Topic()) + wg.Wait() + klog.V(2).Infof("%s: wg done. processing returned", claim.Topic()) + }() klog.V(4).Infof("%s: read msgs", claim.Topic()) // NOTE: @@ -204,10 +209,10 @@ func (h *batcherHandler) ConsumeClaim( // Deserialize the message msg, err := h.serializer.Deserialize(message) if err != nil { - return fmt.Errorf("error deserializing binary, err: %s\n", err) + return fmt.Errorf("%s: consumeClaim returning, error deserializing binary, err: %s\n", claim.Topic(), err) } if msg == nil || msg.Value == nil { - return fmt.Errorf("got message as nil, message: %+v\n", msg) + return fmt.Errorf("%s: consumeClaim returning, error, got message as nil, message: %+v\n", claim.Topic(), msg) } if lastSchemaId == nil { @@ -220,10 +225,10 @@ func (h *batcherHandler) ConsumeClaim( msg.SchemaId, ) // Flush the batch due to schema change - msgBatch.Flush() + msgBatch.Flush(session.Context()) } // Flush the batch by size or insert in batch - msgBatch.Insert(msg) + msgBatch.Insert(session.Context(), msg) *lastSchemaId = msg.SchemaId case <-maxWaitTicker.C: // Flush the batch by time @@ -231,11 +236,11 @@ func (h *batcherHandler) ConsumeClaim( "%s: maxWaitSeconds hit", claim.Topic(), ) - msgBatch.Flush() + msgBatch.Flush(session.Context()) case err := <-errChan: syscall.Kill(syscall.Getpid(), syscall.SIGINT) klog.Errorf( - "%s: error occured in processing, err: %v, triggered shutdown", + "consumeClaim returning, %s: error occured in processing, err: %v, triggered shutdown", claim.Topic(), err, ) diff --git a/pkg/serializer/message.go b/pkg/serializer/message.go index 31ca90acd..e629d8032 100644 --- a/pkg/serializer/message.go +++ b/pkg/serializer/message.go @@ -1,6 +1,7 @@ package serializer import ( + "context" "github.com/Shopify/sarama" "github.com/practo/klog/v2" "sync" @@ -54,10 +55,16 @@ func NewMessageAsyncBatch( } } -func (b *MessageAsyncBatch) Flush() { +func (b *MessageAsyncBatch) Flush(ctx context.Context) { size := len(b.msgBuf) if size > 0 { - b.processChan <- b.msgBuf + // write to channel with context check, fixes #170 + select { + case <-ctx.Done(): + klog.V(2).Infof("%s: flush cancelled, ctx done, return", b.topic) + return + case b.processChan <- b.msgBuf: + } b.msgBuf = make([]*Message, 0, b.maxSize) klog.V(4).Infof( "%s: flushed:%d, processChan:%v", @@ -75,14 +82,14 @@ func (b *MessageAsyncBatch) Flush() { // insert makes the batch and also and flushes to the processor // if batchSize >= maxSize -func (b *MessageAsyncBatch) Insert(msg *Message) { +func (b *MessageAsyncBatch) Insert(ctx context.Context, msg *Message) { b.msgBuf = append(b.msgBuf, msg) if len(b.msgBuf) >= b.maxSize { klog.V(2).Infof( "%s: maxSize hit", msg.Topic, ) - b.Flush() + b.Flush(ctx) } }