Skip to content

Commit

Permalink
removes tablerows map
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 17, 2023
1 parent a0b3292 commit c88d960
Showing 1 changed file with 2 additions and 10 deletions.
12 changes: 2 additions & 10 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
cmap "github.com/orcaman/concurrent-map/v2"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -122,7 +121,6 @@ func (h *HubBatches) flushAllBatches(
var numEventsPushed int32
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(maxParallelism))
tableNameRowsMapping := cmap.New[uint32]()
h.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
g.Go(func() error {
numEvents := eventBatch.NumEvents()
Expand All @@ -135,18 +133,12 @@ func (h *HubBatches) flushAllBatches(
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s", numEvents, tblName)
tableNameRowsMapping.Upsert(tblName.ToString(), uint32(numEvents),
func(exist bool, rowCount uint32, newRows uint32) uint32 {
if !exist {
return newRows
}
return rowCount + newRows
})
return nil
})
})

log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed)
log.Infof("[sendEventBatch] successfully sent %d events in total to event hub",
numEventsPushed)
return g.Wait()
}

Expand Down

0 comments on commit c88d960

Please sign in to comment.