Skip to content

Commit

Permalink
Merge pull request #171 from practo/fix-162
Browse files Browse the repository at this point in the history
Applying the learnings of ctx handling and fixing #170
  • Loading branch information
alok87 authored Mar 20, 2021
2 parents 0326795 + 2a4f7a4 commit d7384a5
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 59 deletions.
41 changes: 19 additions & 22 deletions cmd/redshiftbatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func run(cmd *cobra.Command, args []string) {
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

consumerGroups := make(map[string]kafka.ConsumerGroupInterface)
var consumersReady []chan bool
Expand Down Expand Up @@ -105,43 +106,39 @@ func run(cmd *cobra.Command, args []string) {
groupID,
groupConfig.TopicRegexes,
)

wg.Add(1)
go manager.SyncTopics(ctx, wg)

wg.Add(1)
go manager.Consume(ctx, wg)
}
klog.V(2).Infof("consumerGroups: %v", len(consumersReady))

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
ready := 0

klog.V(2).Infof("ConsumerGroups: %v", len(consumersReady))
for ready >= 0 {
go func() {
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
default:
case <-sigterm:
klog.V(2).Info("SIGTERM signal received")
ready = -1
cancel()
klog.V(2).Info("Cancelled main context")
}
}()

if ready == -1 || ready == len(consumersReady) {
time.Sleep(3 * time.Second)
continue
}

for _, channel := range consumersReady {
go func() {
for i, c := range consumersReady {
select {
case <-channel:
ready += 1
klog.V(2).Infof("ConsumerGroup #%d is up and running", ready)
case <-c:
klog.V(2).Infof(
"#%d consumerGroup is up and running",
i,
)
}
}
}

klog.V(2).Info("Cancelled main context")
cancel()
}()

klog.V(2).Info("Waiting for all goroutines to shutdown...")
klog.V(2).Info("wg wait()")
wg.Wait()

var closeErr error
Expand Down
40 changes: 18 additions & 22 deletions cmd/redshiftloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,43 +125,39 @@ func run(cmd *cobra.Command, args []string) {
groupConfig.TopicRegexes,
// cancel,
)

wg.Add(1)
go manager.SyncTopics(ctx, wg)

wg.Add(1)
go manager.Consume(ctx, wg)
}
klog.V(2).Infof("consumerGroups: %v", len(consumersReady))

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
ready := 0

klog.V(2).Infof("ConsumerGroups: %v", len(consumersReady))
for ready >= 0 {
go func() {
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
default:
case <-sigterm:
klog.V(2).Info("SIGTERM signal received")
ready = -1
cancel()
klog.V(2).Info("Cancelled main context")
}
}()

if ready == -1 || ready == len(consumersReady) {
time.Sleep(3 * time.Second)
continue
}

for _, channel := range consumersReady {
go func() {
for i, c := range consumersReady {
select {
case <-channel:
ready += 1
klog.V(2).Infof("ConsumerGroup #%d is up and running", ready)
case <-c:
klog.V(2).Infof(
"#%d consumerGroup is up and running",
i,
)
}
}
}

klog.V(2).Info("Cancelling context to trigger graceful shutdown...")
cancel()
}()

klog.V(2).Info("Waiting for waitgroups to shutdown...")
klog.V(2).Info("wg wait()")
wg.Wait()

var closeErr error
Expand Down
1 change: 1 addition & 0 deletions pkg/kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (c *Manager) SyncTopics(

select {
case <-ctx.Done():
klog.V(2).Info("ctx cancelled bye")
return
case <-ticker.C:
continue
Expand Down
35 changes: 32 additions & 3 deletions pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ func (b *batchProcessor) Process(
bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)),
maskSchema: make(map[string]serializer.MaskInfo),
}
go b.processBatch(wg, session, msgBuf, resp)
wg.Add(1)
go b.processBatch(wg, session, msgBuf, resp)
responses = append(responses, resp)
}
if len(responses) == 0 {
Expand Down Expand Up @@ -488,7 +488,18 @@ func (b *batchProcessor) Process(
"%s, error(s) occured in processing (sending err)", b.topic,
)
b.handleShutdown()
errChan <- errors

// send to channel with context check, fix #170
select {
case <-session.Context().Done():
klog.V(2).Infof(
"%s: processor returning, session ctx done",
b.topic,
)
return
case errChan <- errors:
}

klog.Errorf(
"%s, error(s) occured: %+v, processor shutdown.",
b.topic,
Expand All @@ -501,9 +512,27 @@ func (b *batchProcessor) Process(
// failure in between signal and marking the offset can lead to
// duplicates in the loader topic, but it's ok as loader is idempotent
for _, resp := range responses {
select {
default:
case <-session.Context().Done():
klog.V(2).Infof(
"%s: processor returning, session ctx done",
b.topic,
)
return
}
err := b.signalLoad(resp)
if err != nil {
errChan <- err
// send to channel with context check, fix #170
select {
case <-session.Context().Done():
klog.V(2).Infof(
"%s: processor returning, session ctx done",
b.topic,
)
return
case errChan <- err:
}
klog.Errorf(
"%s, error signalling: %v, processor shutdown.",
b.topic,
Expand Down
21 changes: 13 additions & 8 deletions pkg/redshiftbatcher/batcher_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,14 @@ func (h *batcherHandler) ConsumeClaim(
)

wg := &sync.WaitGroup{}
go processor.Process(wg, session, processChan, errChan)
wg.Add(1)
defer wg.Wait()
go processor.Process(wg, session, processChan, errChan)

defer func() {
klog.V(2).Infof("%s: wg wait() for processing to return", claim.Topic())
wg.Wait()
klog.V(2).Infof("%s: wg done. processing returned", claim.Topic())
}()

klog.V(4).Infof("%s: read msgs", claim.Topic())
// NOTE:
Expand Down Expand Up @@ -204,10 +209,10 @@ func (h *batcherHandler) ConsumeClaim(
// Deserialize the message
msg, err := h.serializer.Deserialize(message)
if err != nil {
return fmt.Errorf("error deserializing binary, err: %s\n", err)
return fmt.Errorf("%s: consumeClaim returning, error deserializing binary, err: %s\n", claim.Topic(), err)
}
if msg == nil || msg.Value == nil {
return fmt.Errorf("got message as nil, message: %+v\n", msg)
return fmt.Errorf("%s: consumeClaim returning, error, got message as nil, message: %+v\n", claim.Topic(), msg)
}

if lastSchemaId == nil {
Expand All @@ -220,22 +225,22 @@ func (h *batcherHandler) ConsumeClaim(
msg.SchemaId,
)
// Flush the batch due to schema change
msgBatch.Flush()
msgBatch.Flush(session.Context())
}
// Flush the batch by size or insert in batch
msgBatch.Insert(msg)
msgBatch.Insert(session.Context(), msg)
*lastSchemaId = msg.SchemaId
case <-maxWaitTicker.C:
// Flush the batch by time
klog.V(2).Infof(
"%s: maxWaitSeconds hit",
claim.Topic(),
)
msgBatch.Flush()
msgBatch.Flush(session.Context())
case err := <-errChan:
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
klog.Errorf(
"%s: error occured in processing, err: %v, triggered shutdown",
"consumeClaim returning, %s: error occured in processing, err: %v, triggered shutdown",
claim.Topic(),
err,
)
Expand Down
15 changes: 11 additions & 4 deletions pkg/serializer/message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package serializer

import (
"context"
"github.com/Shopify/sarama"
"github.com/practo/klog/v2"
"sync"
Expand Down Expand Up @@ -54,10 +55,16 @@ func NewMessageAsyncBatch(
}
}

func (b *MessageAsyncBatch) Flush() {
func (b *MessageAsyncBatch) Flush(ctx context.Context) {
size := len(b.msgBuf)
if size > 0 {
b.processChan <- b.msgBuf
// write to channel with context check, fixes #170
select {
case <-ctx.Done():
klog.V(2).Infof("%s: flush cancelled, ctx done, return", b.topic)
return
case b.processChan <- b.msgBuf:
}
b.msgBuf = make([]*Message, 0, b.maxSize)
klog.V(4).Infof(
"%s: flushed:%d, processChan:%v",
Expand All @@ -75,14 +82,14 @@ func (b *MessageAsyncBatch) Flush() {

// insert makes the batch and also and flushes to the processor
// if batchSize >= maxSize
func (b *MessageAsyncBatch) Insert(msg *Message) {
func (b *MessageAsyncBatch) Insert(ctx context.Context, msg *Message) {
b.msgBuf = append(b.msgBuf, msg)
if len(b.msgBuf) >= b.maxSize {
klog.V(2).Infof(
"%s: maxSize hit",
msg.Topic,
)
b.Flush()
b.Flush(ctx)
}
}

Expand Down

0 comments on commit d7384a5

Please sign in to comment.