Skip to content

Commit

Permalink
fix concurrency issue in eventhub (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Sep 21, 2023
1 parent 39857d7 commit 07e36c5
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
34 changes: 22 additions & 12 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
cmap "github.com/orcaman/concurrent-map/v2"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
)
Expand All @@ -28,7 +29,7 @@ type EventHubConnector struct {
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
tokenProvider auth.TokenProvider
hubs map[string]*eventhub.Hub
hubs cmap.ConcurrentMap[string, *eventhub.Hub]
}

// NewEventHubConnector creates a new EventHubConnector.
Expand Down Expand Up @@ -60,15 +61,22 @@ func NewEventHubConnector(
pgMetadata: pgMetadata,
creds: defaultAzureCreds,
tokenProvider: jwtTokenProvider,
hubs: make(map[string]*eventhub.Hub),
hubs: cmap.New[*eventhub.Hub](),
}, nil
}

func (c *EventHubConnector) Close() error {
var allErrors error

// close all the event hub connections.
for _, hub := range c.hubs {
for _, hubName := range c.hubs.Keys() {
hub, ok := c.hubs.Get(hubName)
if !ok {
log.Errorf("failed to get event hub connection: %v", hubName)
allErrors = errors.Join(allErrors, fmt.Errorf("failed to get event hub connection: %v", hubName))
continue
}

err := hub.Close(c.ctx)
if err != nil {
log.Errorf("failed to close event hub connection: %v", err)
Expand Down Expand Up @@ -104,7 +112,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
defer func() {
shutdown <- true
}()
tableNameRowsMapping := make(map[string]uint32)
tableNameRowsMapping := cmap.New[uint32]()
batch := req.Records
eventsPerHeartBeat := 1000
eventsPerBatch := int(req.PushBatchSize)
Expand Down Expand Up @@ -182,14 +190,14 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
FirstSyncedCheckPointID: batch.FirstCheckPointID,
LastSyncedCheckPointID: batch.LastCheckPointID,
NumRecordsSynced: int64(len(batch.Records)),
TableNameRowsMapping: tableNameRowsMapping,
TableNameRowsMapping: tableNameRowsMapping.Items(),
}, nil
}

func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
maxParallelism int64,
flowName string,
tableNameRowsMapping map[string]uint32) error {
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
if len(events) == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
Expand All @@ -204,7 +212,6 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
var wg sync.WaitGroup
var once sync.Once
var firstErr error
var mapLock sync.Mutex
// Limiting concurrent sends
guard := make(chan struct{}, maxParallelism)

Expand Down Expand Up @@ -237,9 +244,12 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
numEventsPushed, tblName)
mapLock.Lock()
defer mapLock.Unlock()
tableNameRowsMapping[tblName] += uint32(len(eventBatch))
rowCount, ok := tableNameRowsMapping.Get(tblName)
if !ok {
rowCount = uint32(0)
}
rowCount += uint32(len(eventBatch))
tableNameRowsMapping.Set(tblName, rowCount)
}(tblName, eventBatch)
}

Expand All @@ -255,14 +265,14 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
}

func (c *EventHubConnector) getOrCreateHubConnection(name string) (*eventhub.Hub, error) {
hub, ok := c.hubs[name]
hub, ok := c.hubs.Get(name)
if !ok {
hub, err := eventhub.NewHub(c.config.GetNamespace(), name, c.tokenProvider)
if err != nil {
log.Errorf("failed to create event hub connection: %v", err)
return nil, err
}
c.hubs[name] = hub
c.hubs.Set(name, hub)
return hub, nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre
// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
// the state of the relation message somewhere
log.Infof("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
log.Debugf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)
if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
Expand Down
1 change: 1 addition & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/grafana/pyroscope-go/godeltaprof v0.1.3 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
Expand Down
2 changes: 2 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw=
github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
Expand Down

0 comments on commit 07e36c5

Please sign in to comment.