diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index ebb7abe030..f811e51ee0 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -156,8 +156,10 @@ func (c *EventHubConnector) processBatch( return 0, err } - c.logger.Info("processBatch", slog.Int("Total records sent to event hub", int(numRecords.Load()))) - return numRecords.Load(), nil + currNumRecords := numRecords.Load() + + c.logger.Info("processBatch", slog.Int("Total records sent to event hub", int(currNumRecords))) + return currNumRecords, nil } numRecords.Add(1) diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index aa70df75f9..5634173faf 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -118,7 +118,7 @@ func (h *HubBatches) flushAllBatches( return nil } - var numEventsPushed int32 + var numEventsPushed atomic.Int32 err := h.ForEach( func( destination ScopedEventhub, @@ -130,7 +130,7 @@ func (h *HubBatches) flushAllBatches( return err } - atomic.AddInt32(&numEventsPushed, numEvents) + numEventsPushed.Add(numEvents) slog.Info("flushAllBatches", slog.String(string(shared.FlowNameKey), flowName), slog.Int("events sent", int(numEvents)), @@ -145,7 +145,7 @@ func (h *HubBatches) flushAllBatches( } slog.Info("hub batches flush", slog.String(string(shared.FlowNameKey), flowName), - slog.Int("events sent", int(numEventsPushed))) + slog.Int("events sent", int(numEventsPushed.Load()))) // clear the batches after flushing them. return err