From 157a84354134db2b1d068592c1771e15eb5378fd Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 21 Sep 2023 14:24:13 -0700 Subject: [PATCH] fix concurrency issue in eventhub --- flow/connectors/eventhub/eventhub.go | 34 ++++++++++++++++++---------- flow/connectors/postgres/cdc.go | 2 +- flow/go.mod | 1 + flow/go.sum | 2 ++ 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index b4d61f4275..ef0f658242 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -17,6 +17,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + cmap "github.com/orcaman/concurrent-map/v2" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" ) @@ -28,7 +29,7 @@ type EventHubConnector struct { tableSchemas map[string]*protos.TableSchema creds *azidentity.DefaultAzureCredential tokenProvider auth.TokenProvider - hubs map[string]*eventhub.Hub + hubs cmap.ConcurrentMap[string, *eventhub.Hub] } // NewEventHubConnector creates a new EventHubConnector. @@ -60,7 +61,7 @@ func NewEventHubConnector( pgMetadata: pgMetadata, creds: defaultAzureCreds, tokenProvider: jwtTokenProvider, - hubs: make(map[string]*eventhub.Hub), + hubs: cmap.New[*eventhub.Hub](), }, nil } @@ -68,7 +69,14 @@ func (c *EventHubConnector) Close() error { var allErrors error // close all the event hub connections. - for _, hub := range c.hubs { + for _, hubName := range c.hubs.Keys() { + hub, ok := c.hubs.Get(hubName) + if !ok { + log.Errorf("failed to get event hub connection: %v", hubName) + allErrors = errors.Join(allErrors, fmt.Errorf("failed to get event hub connection: %v", hubName)) + continue + } + err := hub.Close(c.ctx) if err != nil { log.Errorf("failed to close event hub connection: %v", err) @@ -104,7 +112,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S defer func() { shutdown <- true }() - tableNameRowsMapping := make(map[string]uint32) + tableNameRowsMapping := cmap.New[uint32]() batch := req.Records eventsPerHeartBeat := 1000 eventsPerBatch := int(req.PushBatchSize) @@ -182,14 +190,14 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S FirstSyncedCheckPointID: batch.FirstCheckPointID, LastSyncedCheckPointID: batch.LastCheckPointID, NumRecordsSynced: int64(len(batch.Records)), - TableNameRowsMapping: tableNameRowsMapping, + TableNameRowsMapping: tableNameRowsMapping.Items(), }, nil } func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, maxParallelism int64, flowName string, - tableNameRowsMapping map[string]uint32) error { + tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error { if len(events) == 0 { log.WithFields(log.Fields{ "flowName": flowName, @@ -204,7 +212,6 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, var wg sync.WaitGroup var once sync.Once var firstErr error - var mapLock sync.Mutex // Limiting concurrent sends guard := make(chan struct{}, maxParallelism) @@ -237,9 +244,12 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, "flowName": flowName, }).Infof("pushed %d events to event hub: %s", numEventsPushed, tblName) - mapLock.Lock() - defer mapLock.Unlock() - tableNameRowsMapping[tblName] += uint32(len(eventBatch)) + rowCount, ok := tableNameRowsMapping.Get(tblName) + if !ok { + rowCount = uint32(0) + } + rowCount += uint32(len(eventBatch)) + tableNameRowsMapping.Set(tblName, rowCount) }(tblName, eventBatch) } @@ -255,14 +265,14 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, } func (c *EventHubConnector) getOrCreateHubConnection(name string) (*eventhub.Hub, error) { - hub, ok := c.hubs[name] + hub, ok := c.hubs.Get(name) if !ok { hub, err := eventhub.NewHub(c.config.GetNamespace(), name, c.tokenProvider) if err != nil { log.Errorf("failed to create event hub connection: %v", err) return nil, err } - c.hubs[name] = hub + c.hubs.Set(name, hub) return hub, nil } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index b32efbbc9b..641d5f8543 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -317,7 +317,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre // TODO (kaushik): consider persistent state for a mirror job // to be stored somewhere in temporal state. We might need to persist // the state of the relation message somewhere - log.Infof("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", + log.Debugf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns) if p.relationMessageMapping[msg.RelationID] == nil { p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) diff --git a/flow/go.mod b/flow/go.mod index db395f335c..0ea1b9d71e 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -38,6 +38,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.3 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect + github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/twmb/murmur3 v1.1.8 // indirect github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect diff --git a/flow/go.sum b/flow/go.sum index a446aeb974..8cf261b361 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -1329,6 +1329,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=