Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix concurrency issue in eventhub #410

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
cmap "github.com/orcaman/concurrent-map/v2"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
)
Expand All @@ -28,7 +29,7 @@ type EventHubConnector struct {
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
tokenProvider auth.TokenProvider
hubs map[string]*eventhub.Hub
hubs cmap.ConcurrentMap[string, *eventhub.Hub]
}

// NewEventHubConnector creates a new EventHubConnector.
Expand Down Expand Up @@ -60,15 +61,22 @@ func NewEventHubConnector(
pgMetadata: pgMetadata,
creds: defaultAzureCreds,
tokenProvider: jwtTokenProvider,
hubs: make(map[string]*eventhub.Hub),
hubs: cmap.New[*eventhub.Hub](),
}, nil
}

func (c *EventHubConnector) Close() error {
var allErrors error

// close all the event hub connections.
for _, hub := range c.hubs {
for _, hubName := range c.hubs.Keys() {
hub, ok := c.hubs.Get(hubName)
if !ok {
log.Errorf("failed to get event hub connection: %v", hubName)
allErrors = errors.Join(allErrors, fmt.Errorf("failed to get event hub connection: %v", hubName))
continue
}

err := hub.Close(c.ctx)
if err != nil {
log.Errorf("failed to close event hub connection: %v", err)
Expand Down Expand Up @@ -104,7 +112,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
defer func() {
shutdown <- true
}()
tableNameRowsMapping := make(map[string]uint32)
tableNameRowsMapping := cmap.New[uint32]()
batch := req.Records
eventsPerHeartBeat := 1000
eventsPerBatch := int(req.PushBatchSize)
Expand Down Expand Up @@ -182,14 +190,14 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
FirstSyncedCheckPointID: batch.FirstCheckPointID,
LastSyncedCheckPointID: batch.LastCheckPointID,
NumRecordsSynced: int64(len(batch.Records)),
TableNameRowsMapping: tableNameRowsMapping,
TableNameRowsMapping: tableNameRowsMapping.Items(),
}, nil
}

func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
maxParallelism int64,
flowName string,
tableNameRowsMapping map[string]uint32) error {
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
if len(events) == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
Expand All @@ -204,7 +212,6 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
var wg sync.WaitGroup
var once sync.Once
var firstErr error
var mapLock sync.Mutex
// Limiting concurrent sends
guard := make(chan struct{}, maxParallelism)

Expand Down Expand Up @@ -237,9 +244,12 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
numEventsPushed, tblName)
mapLock.Lock()
defer mapLock.Unlock()
tableNameRowsMapping[tblName] += uint32(len(eventBatch))
rowCount, ok := tableNameRowsMapping.Get(tblName)
if !ok {
rowCount = uint32(0)
}
rowCount += uint32(len(eventBatch))
tableNameRowsMapping.Set(tblName, rowCount)
}(tblName, eventBatch)
}

Expand All @@ -255,14 +265,14 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
}

func (c *EventHubConnector) getOrCreateHubConnection(name string) (*eventhub.Hub, error) {
hub, ok := c.hubs[name]
hub, ok := c.hubs.Get(name)
if !ok {
hub, err := eventhub.NewHub(c.config.GetNamespace(), name, c.tokenProvider)
if err != nil {
log.Errorf("failed to create event hub connection: %v", err)
return nil, err
}
c.hubs[name] = hub
c.hubs.Set(name, hub)
return hub, nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre
// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
// the state of the relation message somewhere
log.Infof("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
log.Debugf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)
if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
Expand Down
1 change: 1 addition & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/grafana/pyroscope-go/godeltaprof v0.1.3 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
Expand Down
2 changes: 2 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw=
github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
Expand Down
Loading