Skip to content

Commit

Permalink
locks for tablemapping
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 14, 2023
1 parent 0e0296c commit 3d416d0
Showing 1 changed file with 3 additions and 0 deletions.
3 changes: 3 additions & 0 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
var wg sync.WaitGroup
var once sync.Once
var firstErr error
var mapLock sync.Mutex
// Limiting concurrent sends
guard := make(chan struct{}, maxParallelism)

Expand Down Expand Up @@ -239,7 +240,9 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
numEventsPushed, tblName)
mapLock.Lock()
tableNameRowsMapping[tblName] += uint32(len(eventBatch))
mapLock.Unlock()
}(tblName, eventBatch)
}

Expand Down

0 comments on commit 3d416d0

Please sign in to comment.