From ec342eae95a8c740e424ca0dac41b171e52d72b3 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 18 Sep 2023 21:10:28 +0530 Subject: [PATCH] Metrics, timing for Eventhub CDC (#390) --- flow/activities/flowable.go | 19 +++++++++------- flow/connectors/eventhub/eventhub.go | 34 +++++++++++++++++++++------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 625cad29c9..8bbee11bcf 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -221,11 +221,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } } - // log the number of records + pullDuration := time.Since(startTime) numRecords := len(recordBatch.Records) log.WithFields(log.Fields{ "flowName": input.FlowConnectionConfigs.FlowJobName, - }).Printf("pulled %d records", numRecords) + }).Infof("pulled %d records in %d seconds\n", numRecords, int(pullDuration.Seconds())) activity.RecordHeartbeat(ctx, fmt.Sprintf("pulled %d records", numRecords)) if numRecords == 0 { @@ -238,6 +238,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }, nil } + syncStartTime := time.Now() res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{ Records: recordBatch, FlowJobName: input.FlowConnectionConfigs.FlowJobName, @@ -250,9 +251,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, log.Warnf("failed to push records: %v", err) return nil, fmt.Errorf("failed to push records: %w", err) } + + syncDuration := time.Since(syncStartTime) log.WithFields(log.Fields{ "flowName": input.FlowConnectionConfigs.FlowJobName, - }).Infof("pushed %d records", res.NumRecordsSynced) + }).Infof("pushed %d records in %d seconds\n", numRecords, int(syncDuration.Seconds())) err = a.CatalogMirrorMonitor. UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName, @@ -333,7 +336,7 @@ func (a *FlowableActivity) StartNormalize( // log the number of batches normalized if res != nil { - log.Printf("normalized records from batch %d to batch %d\n", res.StartBatchID, res.EndBatchID) + log.Infof("normalized records from batch %d to batch %d\n", res.StartBatchID, res.EndBatchID) } return res, nil @@ -444,7 +447,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } defer connectors.CloseConnector(dstConn) - log.Printf("replicating partition %s\n", partition.PartitionId) + log.Infof("replicating partition %s\n", partition.PartitionId) var stream *model.QRecordStream bufferSize := shared.FetchAndChannelSize @@ -487,7 +490,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, numRecords = int64(recordBatch.NumRecords) log.WithFields(log.Fields{ "flowName": config.FlowJobName, - }).Printf("pulled %d records\n", len(recordBatch.Records)) + }).Infof("pulled %d records\n", len(recordBatch.Records)) err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) if err != nil { @@ -516,7 +519,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, if res == 0 { log.WithFields(log.Fields{ "flowName": config.FlowJobName, - }).Printf("no records to push for partition %s\n", partition.PartitionId) + }).Infof("no records to push for partition %s\n", partition.PartitionId) return nil } @@ -526,7 +529,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } log.WithFields(log.Fields{ "flowName": config.FlowJobName, - }).Printf("pushed %d records\n", res) + }).Infof("pushed %d records\n", res) err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition) if err != nil { return err diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 4376036cba..c2f3263097 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -14,6 +14,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" log "github.com/sirupsen/logrus" @@ -103,13 +104,20 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S defer func() { shutdown <- true }() - + tableNameRowsMapping := make(map[string]uint32) batch := req.Records eventsPerHeartBeat := 1000 eventsPerBatch := int(req.PushBatchSize) + if eventsPerBatch <= 0 { + eventsPerBatch = 10000 + } maxParallelism := req.PushParallelism + if maxParallelism <= 0 { + maxParallelism = 10 + } batchPerTopic := make(map[string][]*eventhub.Event) + startTime := time.Now() for i, record := range batch.Records { json, err := record.GetItems().ToJSON() if err != nil { @@ -134,7 +142,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if (i+1)%eventsPerBatch == 0 { err := c.sendEventBatch(batchPerTopic, maxParallelism, - req.FlowJobName) + req.FlowJobName, tableNameRowsMapping) if err != nil { return nil, err } @@ -146,15 +154,15 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S // send the remaining events. if len(batchPerTopic) > 0 { err := c.sendEventBatch(batchPerTopic, maxParallelism, - req.FlowJobName) + req.FlowJobName, tableNameRowsMapping) if err != nil { return nil, err } } - + rowsSynced := len(batch.Records) log.WithFields(log.Fields{ "flowName": req.FlowJobName, - }).Infof("[total] successfully sent %d records to event hub", len(batch.Records)) + }).Infof("[total] successfully sent %d records to event hub", rowsSynced) err := c.updateLastOffset(req.FlowJobName, batch.LastCheckPointID) if err != nil { @@ -167,6 +175,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, err } + metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime)) + metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), + time.Since(startTime), int64(rowsSynced)) return &model.SyncResponse{ FirstSyncedCheckPointID: batch.FirstCheckPointID, LastSyncedCheckPointID: batch.LastCheckPointID, @@ -176,7 +187,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, maxParallelism int64, - flowName string) error { + flowName string, + tableNameRowsMapping map[string]uint32) error { if len(events) == 0 { log.WithFields(log.Fields{ "flowName": flowName, @@ -191,6 +203,7 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, var wg sync.WaitGroup var once sync.Once var firstErr error + var mapLock sync.Mutex // Limiting concurrent sends guard := make(chan struct{}, maxParallelism) @@ -223,6 +236,9 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, "flowName": flowName, }).Infof("pushed %d events to event hub: %s", numEventsPushed, tblName) + mapLock.Lock() + defer mapLock.Unlock() + tableNameRowsMapping[tblName] += uint32(len(eventBatch)) }(tblName, eventBatch) } @@ -327,8 +343,10 @@ func (c *EventHubConnector) getEventHubMgmtClient() (*armeventhub.EventHubsClien func (c *EventHubConnector) SetupNormalizedTables( req *protos.SetupNormalizedTableBatchInput) ( *protos.SetupNormalizedTableBatchOutput, error) { - log.Infof("setting up tables for Eventhub is a no-op") - return nil, nil + log.Infof("normalization for event hub is a no-op") + return &protos.SetupNormalizedTableBatchOutput{ + TableExistsMapping: nil, + }, nil } func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {