Skip to content

Commit

Permalink
fix: Fix the exit control flow
Browse files Browse the repository at this point in the history
* after context is cancelled, close the consumer group
* wait till consumer has pushed everything into producer batch
* close producer channel
* stop producer client
  • Loading branch information
joeirimpan committed Jul 2, 2024
1 parent 0348f71 commit 0f15bfa
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 30 deletions.
31 changes: 16 additions & 15 deletions internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func NewRelay(cfg RelayCfg, src *SourcePool, target *Target, topics Topics, filt
// Start starts the consumer loop on kafka (A), fetch messages and relays over to kafka (B) using an async
func (re *Relay) Start(globalCtx context.Context) error {
wg := &sync.WaitGroup{}
defer wg.Wait()

// Derive a cancellable context from the global context (which captures kill signals) to use
// for subsequent connections/health tracking/retries etc.
Expand All @@ -96,41 +95,43 @@ func (re *Relay) Start(globalCtx context.Context) error {
re.log.Info("starting producer worker")
go func() {
defer wg.Done()
if err := re.target.Start(ctx); err != nil {
if err := re.target.Start(); err != nil {
re.log.Error("error starting producer worker", "err", err)
}

if ctx.Err() != context.Canceled {
cancel()
}
}()

// Start the consumer group worker by trigger a signal to the relay loop to fetch
// a consumer worker to fetch initial healthy node.
re.log.Info("starting consumer worker")
re.signalCh <- struct{}{}

wg.Add(1)
go func() {
defer wg.Done()
// wait till main ctx is cancelled
<-globalCtx.Done()

// stop consumer group
re.source.Close()
}()

// Start the indefinite poll that asks for new connections
// and then consumes messages from them.
if err := re.startPoll(ctx); err != nil {
re.log.Error("error starting consumer worker", "err", err)
}

// Close the target/producer on exit.
re.target.CloseBatchCh()
// close the producer inlet channle
close(re.target.inletCh)

// close producer
re.target.Close()

wg.Wait()

return nil
}

// Close close the underlying kgo.Client(s)
func (re *Relay) Close() {
re.log.Debug("closing relay consumer, producer...")
re.source.Close()
re.target.Close()
}

// startPoll starts the consumer worker which polls the kafka cluster for messages.
func (re *Relay) startPoll(ctx context.Context) error {
var (
Expand Down
2 changes: 2 additions & 0 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (sp *SourcePool) SetInitialOffsets(of map[string]map[int32]kgo.Offset) {
// indefinitely long to return based on the config.
func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error) {
retries := 0
loop:
for {
select {
case <-globalCtx.Done():
Expand All @@ -155,6 +156,7 @@ func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error) {
retries++
sp.log.Error("new source connection failed", "id", s.ID, "broker", s.Config.BootstrapBrokers, "error", err, "retries", retries)
waitTries(globalCtx, sp.backoffFn(retries))
continue loop
}

// Cache the current live connection internally.
Expand Down
19 changes: 5 additions & 14 deletions internal/relay/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,24 @@ func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topic
return p, nil
}

// Close closes the kafka client.
// Close remove the producer topics from &kgo.Client.
func (tg *Target) Close() {
if tg.client != nil {
// prevent blocking on close
tg.client.PurgeTopicsFromProducing()
}
}

// CloseBatchCh closes the Producer batch channel.
func (tg *Target) CloseBatchCh() {
close(tg.inletCh)
}

// GetBatchCh returns the Producer batch channel.
func (tg *Target) GetBatchCh() chan *kgo.Record {
return tg.inletCh
}

// Start starts the blocking producer which flushes messages to the target Kafka.
func (tg *Target) Start(ctx context.Context) error {
func (tg *Target) Start() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tick := time.NewTicker(tg.pCfg.FlushFrequency)
defer tick.Stop()

Expand All @@ -95,13 +93,6 @@ func (tg *Target) Start(ctx context.Context) error {

for {
select {
case <-ctx.Done():
if err := tg.drain(); err != nil {
return err
}

return ctx.Err()

// Queue the message to and flush if the batch size is reached.
case msg, ok := <-tg.inletCh:
if !ok {
Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,5 @@ func main() {
if metrSrv != nil {
metrSrv.Shutdown(globalCtx)
}
relay.Close()
lo.Info("bye")
}

0 comments on commit 0f15bfa

Please sign in to comment.