Skip to content

Commit

Permalink
added overall throughput metric and fixed records pulled
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Sep 26, 2023
1 parent bb69450 commit 64da63e
Show file tree
Hide file tree
Showing 7 changed files with 1,126 additions and 539 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ services:
context: .
dockerfile: stacks/ui.Dockerfile
ports:
- 3000:3000
- 3001:3001
environment:
<<: *catalog-config
PEERDB_FLOW_SERVER_ADDRESS: flow_api:8112
Expand Down
22 changes: 21 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -238,6 +239,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("no records to push")
metrics.LogSyncMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1)
metrics.LogNormalizeMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1, 0)
return &model.SyncResponse{
RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping,
TableSchemaDelta: recordsWithTableSchemaDelta.TableSchemaDelta,
Expand Down Expand Up @@ -317,8 +320,18 @@ func (a *FlowableActivity) StartNormalize(
return nil, fmt.Errorf("failed to get last sync batch ID: %v", err)
}

return nil, a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
lastSyncBatchID)
if err != nil {
return nil, err
}

throughput, err := a.CatalogMirrorMonitor.GetThroughputForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
lastSyncBatchID)
if err != nil {
return nil, err
}
metrics.LogCDCOverallMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, throughput)
} else if err != nil {
return nil, err
}
Expand Down Expand Up @@ -351,6 +364,13 @@ func (a *FlowableActivity) StartNormalize(
return nil, err
}

throughput, err := a.CatalogMirrorMonitor.GetThroughputForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID)
if err != nil {
return nil, err
}
metrics.LogCDCOverallMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, throughput)

// log the number of batches normalized
if res != nil {
log.Infof("normalized records from batch %d to batch %d\n", res.StartBatchID, res.EndBatchID)
Expand Down
13 changes: 8 additions & 5 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,19 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) (*model.R
return nil, fmt.Errorf("failed to create cdc source: %w", err)
}

startTime := time.Now()
recordsWithSchemaDelta, err := cdc.PullRecords(req)
if err != nil {
return nil, err
}

totalRecordsAtSource, err := c.getApproxTableCounts(maps.Keys(req.TableNameMapping))
if err != nil {
return nil, err
}
metrics.LogPullMetrics(c.ctx, req.FlowJobName, recordsWithSchemaDelta.RecordBatch,
totalRecordsAtSource, time.Since(startTime))
if len(recordsWithSchemaDelta.RecordBatch.Records) > 0 {
totalRecordsAtSource, err := c.getApproxTableCounts(maps.Keys(req.TableNameMapping))
if err != nil {
return nil, err
}
metrics.LogPullMetrics(c.ctx, req.FlowJobName, recordsWithSchemaDelta.RecordBatch, totalRecordsAtSource)
cdcMirrorMonitor, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*monitoring.CatalogMirrorMonitor)
if ok {
latestLSN, err := c.getCurrentLSN()
Expand Down
20 changes: 16 additions & 4 deletions flow/connectors/utils/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func LogPullMetrics(
flowJobName string,
recordBatch *model.RecordBatch,
totalRecordsAtSource int64,
duration time.Duration,
) {
if ctx.Value(shared.EnableMetricsKey) != true {
return
Expand All @@ -39,10 +40,10 @@ func LogPullMetrics(
}
}

insertRecordsPulledGauge.Update(float64(insertRecords))
updateRecordsPulledGauge.Update(float64(updateRecords))
deleteRecordsPulledGauge.Update(float64(deleteRecords))
totalRecordsPulledGauge.Update(float64(len(recordBatch.Records)))
insertRecordsPulledGauge.Update(float64(insertRecords) / duration.Seconds())
updateRecordsPulledGauge.Update(float64(updateRecords) / duration.Seconds())
deleteRecordsPulledGauge.Update(float64(deleteRecords) / duration.Seconds())
totalRecordsPulledGauge.Update(float64(len(recordBatch.Records)) / duration.Seconds())
totalRecordsAtSourceGauge.Update(float64(totalRecordsAtSource))
}

Expand Down Expand Up @@ -118,3 +119,14 @@ func LogQRepNormalizeMetrics(ctx context.Context, flowJobName string,
recordsSyncedPerSecondGauge.Update(float64(normalizedRecordsCount) / duration.Seconds())
totalRecordsAtTargetGauge.Update(float64(totalRecordsAtTarget))
}

func LogCDCOverallMetrics(ctx context.Context, flowJobName string, throughput float64) {
if ctx.Value(shared.EnableMetricsKey) != true {
return
}

metricsHandler := activity.GetMetricsHandler(ctx)
totalThroughputGauge :=
metricsHandler.Gauge(fmt.Sprintf("cdcflow.%s.records_throughput", flowJobName))
totalThroughputGauge.Update(throughput)
}
16 changes: 16 additions & 0 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,22 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForCDCBatch(ctx context.Context, flo
return nil
}

func (c *CatalogMirrorMonitor) GetThroughputForCDCBatch(ctx context.Context,
flowJobName string, batchID int64) (float64, error) {
if c == nil || c.catalogConn == nil {
return 0, nil
}

row := c.catalogConn.QueryRow(ctx, `SELECT rows_in_batch/EXTRACT(EPOCH FROM end_time - start_time)
FROM peerdb_stats.cdc_batches WHERE flow_name=$1 AND batch_id=$2`, flowJobName, batchID)
var result float64
err := row.Scan(&result)
if err != nil {
return 0, fmt.Errorf("error reading throughput from cdc_batch: %w", err)
}
return result, nil
}

func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flowJobName string,
batchID int64, tableNameRowsMapping map[string]uint32) error {
if c == nil || c.catalogConn == nil {
Expand Down
Loading

0 comments on commit 64da63e

Please sign in to comment.