diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 47e86ad340..edc510dffe 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -207,6 +207,7 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, var wg sync.WaitGroup var once sync.Once var firstErr error + var mapLock sync.Mutex // Limiting concurrent sends guard := make(chan struct{}, maxParallelism) @@ -239,7 +240,9 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, "flowName": flowName, }).Infof("pushed %d events to event hub: %s", numEventsPushed, tblName) + mapLock.Lock() tableNameRowsMapping[tblName] += uint32(len(eventBatch)) + mapLock.Unlock() }(tblName, eventBatch) }