Skip to content

Commit

Permalink
Metrics, timing for Eventhub CDC (#390)
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Sep 18, 2023
1 parent 9768efe commit ec342ea
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
19 changes: 11 additions & 8 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
}

// log the number of records
pullDuration := time.Since(startTime)
numRecords := len(recordBatch.Records)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Printf("pulled %d records", numRecords)
}).Infof("pulled %d records in %d seconds\n", numRecords, int(pullDuration.Seconds()))
activity.RecordHeartbeat(ctx, fmt.Sprintf("pulled %d records", numRecords))

if numRecords == 0 {
Expand All @@ -238,6 +238,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}, nil
}

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
Expand All @@ -250,9 +251,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
log.Warnf("failed to push records: %v", err)
return nil, fmt.Errorf("failed to push records: %w", err)
}

syncDuration := time.Since(syncStartTime)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("pushed %d records", res.NumRecordsSynced)
}).Infof("pushed %d records in %d seconds\n", numRecords, int(syncDuration.Seconds()))

err = a.CatalogMirrorMonitor.
UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
Expand Down Expand Up @@ -333,7 +336,7 @@ func (a *FlowableActivity) StartNormalize(

// log the number of batches normalized
if res != nil {
log.Printf("normalized records from batch %d to batch %d\n", res.StartBatchID, res.EndBatchID)
log.Infof("normalized records from batch %d to batch %d\n", res.StartBatchID, res.EndBatchID)
}

return res, nil
Expand Down Expand Up @@ -444,7 +447,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

log.Printf("replicating partition %s\n", partition.PartitionId)
log.Infof("replicating partition %s\n", partition.PartitionId)

var stream *model.QRecordStream
bufferSize := shared.FetchAndChannelSize
Expand Down Expand Up @@ -487,7 +490,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
numRecords = int64(recordBatch.NumRecords)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Printf("pulled %d records\n", len(recordBatch.Records))
}).Infof("pulled %d records\n", len(recordBatch.Records))

err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
if err != nil {
Expand Down Expand Up @@ -516,7 +519,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if res == 0 {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Printf("no records to push for partition %s\n", partition.PartitionId)
}).Infof("no records to push for partition %s\n", partition.PartitionId)
return nil
}

Expand All @@ -526,7 +529,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Printf("pushed %d records\n", res)
}).Infof("pushed %d records\n", res)
err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
if err != nil {
return err
Expand Down
34 changes: 26 additions & 8 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -103,13 +104,20 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
defer func() {
shutdown <- true
}()

tableNameRowsMapping := make(map[string]uint32)
batch := req.Records
eventsPerHeartBeat := 1000
eventsPerBatch := int(req.PushBatchSize)
if eventsPerBatch <= 0 {
eventsPerBatch = 10000
}
maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
}

batchPerTopic := make(map[string][]*eventhub.Event)
startTime := time.Now()
for i, record := range batch.Records {
json, err := record.GetItems().ToJSON()
if err != nil {
Expand All @@ -134,7 +142,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

if (i+1)%eventsPerBatch == 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName)
req.FlowJobName, tableNameRowsMapping)
if err != nil {
return nil, err
}
Expand All @@ -146,15 +154,15 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
// send the remaining events.
if len(batchPerTopic) > 0 {
err := c.sendEventBatch(batchPerTopic, maxParallelism,
req.FlowJobName)
req.FlowJobName, tableNameRowsMapping)
if err != nil {
return nil, err
}
}

rowsSynced := len(batch.Records)
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("[total] successfully sent %d records to event hub", len(batch.Records))
}).Infof("[total] successfully sent %d records to event hub", rowsSynced)

err := c.updateLastOffset(req.FlowJobName, batch.LastCheckPointID)
if err != nil {
Expand All @@ -167,6 +175,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(rowsSynced), time.Since(startTime))
metrics.LogNormalizeMetrics(c.ctx, req.FlowJobName, int64(rowsSynced),
time.Since(startTime), int64(rowsSynced))
return &model.SyncResponse{
FirstSyncedCheckPointID: batch.FirstCheckPointID,
LastSyncedCheckPointID: batch.LastCheckPointID,
Expand All @@ -176,7 +187,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
maxParallelism int64,
flowName string) error {
flowName string,
tableNameRowsMapping map[string]uint32) error {
if len(events) == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
Expand All @@ -191,6 +203,7 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
var wg sync.WaitGroup
var once sync.Once
var firstErr error
var mapLock sync.Mutex
// Limiting concurrent sends
guard := make(chan struct{}, maxParallelism)

Expand Down Expand Up @@ -223,6 +236,9 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event,
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s",
numEventsPushed, tblName)
mapLock.Lock()
defer mapLock.Unlock()
tableNameRowsMapping[tblName] += uint32(len(eventBatch))
}(tblName, eventBatch)
}

Expand Down Expand Up @@ -327,8 +343,10 @@ func (c *EventHubConnector) getEventHubMgmtClient() (*armeventhub.EventHubsClien
func (c *EventHubConnector) SetupNormalizedTables(
req *protos.SetupNormalizedTableBatchInput) (
*protos.SetupNormalizedTableBatchOutput, error) {
log.Infof("setting up tables for Eventhub is a no-op")
return nil, nil
log.Infof("normalization for event hub is a no-op")
return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: nil,
}, nil
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
Expand Down

0 comments on commit ec342ea

Please sign in to comment.