Skip to content

Commit

Permalink
fix use of atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 11, 2024
1 parent 7b8bc54 commit 7741945
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
6 changes: 4 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,10 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

c.logger.Info("processBatch", slog.Int("Total records sent to event hub", int(numRecords.Load())))
return numRecords.Load(), nil
currNumRecords := numRecords.Load()

c.logger.Info("processBatch", slog.Int("Total records sent to event hub", int(currNumRecords)))
return currNumRecords, nil
}

numRecords.Add(1)
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (h *HubBatches) flushAllBatches(
return nil
}

var numEventsPushed int32
var numEventsPushed atomic.Int32
err := h.ForEach(
func(
destination ScopedEventhub,
Expand All @@ -130,7 +130,7 @@ func (h *HubBatches) flushAllBatches(
return err
}

atomic.AddInt32(&numEventsPushed, numEvents)
numEventsPushed.Add(numEvents)
slog.Info("flushAllBatches",
slog.String(string(shared.FlowNameKey), flowName),
slog.Int("events sent", int(numEvents)),
Expand All @@ -145,7 +145,7 @@ func (h *HubBatches) flushAllBatches(
}
slog.Info("hub batches flush",
slog.String(string(shared.FlowNameKey), flowName),
slog.Int("events sent", int(numEventsPushed)))
slog.Int("events sent", int(numEventsPushed.Load())))

// clear the batches after flushing them.
return err
Expand Down

0 comments on commit 7741945

Please sign in to comment.