diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index ce4d53d96f..662f7ce2c7 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -135,17 +135,12 @@ func (h *HubBatches) flushAllBatches( log.WithFields(log.Fields{ "flowName": flowName, }).Infof("pushed %d events to event hub: %s", numEvents, tblName) - rowCount, ok := tableNameRowsMapping.Get(tblName.ToString()) - if !ok { - rowCount = uint32(0) - } - rowCount += uint32(numEvents) - tableNameRowsMapping.Upsert(tblName.ToString(), rowCount, - func(exist bool, valueInMap, newValue uint32) uint32 { - if exist { - return valueInMap + tableNameRowsMapping.Upsert(tblName.ToString(), uint32(numEvents), + func(exist bool, rowCount uint32, newRows uint32) uint32 { + if !exist { + return newRows } - return newValue + return rowCount + newRows }) return nil })