Skip to content

Commit

Permalink
added overall throughput metric and fixed records pulled (#437)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Sep 27, 2023
1 parent bb69450 commit 412b526
Show file tree
Hide file tree
Showing 6 changed files with 1,100 additions and 539 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ services:
context: .
dockerfile: stacks/ui.Dockerfile
ports:
- 3000:3000
- 3001:3000
environment:
<<: *catalog-config
PEERDB_FLOW_SERVER_ADDRESS: flow_api:8112
Expand Down
12 changes: 11 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -238,6 +239,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("no records to push")
metrics.LogSyncMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1)
metrics.LogNormalizeMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1, 0)
metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0)
return &model.SyncResponse{
RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping,
TableSchemaDelta: recordsWithTableSchemaDelta.TableSchemaDelta,
Expand Down Expand Up @@ -294,6 +298,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)

metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName,
float64(numRecords)/(pullDuration.Seconds()+syncDuration.Seconds()))

return res, nil
}

Expand All @@ -317,8 +324,11 @@ func (a *FlowableActivity) StartNormalize(
return nil, fmt.Errorf("failed to get last sync batch ID: %v", err)
}

return nil, a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
lastSyncBatchID)
if err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}
Expand Down
13 changes: 8 additions & 5 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,19 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) (*model.R
return nil, fmt.Errorf("failed to create cdc source: %w", err)
}

startTime := time.Now()
recordsWithSchemaDelta, err := cdc.PullRecords(req)
if err != nil {
return nil, err
}

totalRecordsAtSource, err := c.getApproxTableCounts(maps.Keys(req.TableNameMapping))
if err != nil {
return nil, err
}
metrics.LogPullMetrics(c.ctx, req.FlowJobName, recordsWithSchemaDelta.RecordBatch,
totalRecordsAtSource, time.Since(startTime))
if len(recordsWithSchemaDelta.RecordBatch.Records) > 0 {
totalRecordsAtSource, err := c.getApproxTableCounts(maps.Keys(req.TableNameMapping))
if err != nil {
return nil, err
}
metrics.LogPullMetrics(c.ctx, req.FlowJobName, recordsWithSchemaDelta.RecordBatch, totalRecordsAtSource)
cdcMirrorMonitor, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*monitoring.CatalogMirrorMonitor)
if ok {
latestLSN, err := c.getCurrentLSN()
Expand Down
20 changes: 16 additions & 4 deletions flow/connectors/utils/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func LogPullMetrics(
flowJobName string,
recordBatch *model.RecordBatch,
totalRecordsAtSource int64,
duration time.Duration,
) {
if ctx.Value(shared.EnableMetricsKey) != true {
return
Expand All @@ -39,10 +40,10 @@ func LogPullMetrics(
}
}

insertRecordsPulledGauge.Update(float64(insertRecords))
updateRecordsPulledGauge.Update(float64(updateRecords))
deleteRecordsPulledGauge.Update(float64(deleteRecords))
totalRecordsPulledGauge.Update(float64(len(recordBatch.Records)))
insertRecordsPulledGauge.Update(float64(insertRecords) / duration.Seconds())
updateRecordsPulledGauge.Update(float64(updateRecords) / duration.Seconds())
deleteRecordsPulledGauge.Update(float64(deleteRecords) / duration.Seconds())
totalRecordsPulledGauge.Update(float64(len(recordBatch.Records)) / duration.Seconds())
totalRecordsAtSourceGauge.Update(float64(totalRecordsAtSource))
}

Expand Down Expand Up @@ -118,3 +119,14 @@ func LogQRepNormalizeMetrics(ctx context.Context, flowJobName string,
recordsSyncedPerSecondGauge.Update(float64(normalizedRecordsCount) / duration.Seconds())
totalRecordsAtTargetGauge.Update(float64(totalRecordsAtTarget))
}

func LogCDCRawThroughputMetrics(ctx context.Context, flowJobName string, throughput float64) {
if ctx.Value(shared.EnableMetricsKey) != true {
return
}

metricsHandler := activity.GetMetricsHandler(ctx)
totalThroughputGauge :=
metricsHandler.Gauge(fmt.Sprintf("cdcflow.%s.records_throughput", flowJobName))
totalThroughputGauge.Update(throughput)
}
Loading

0 comments on commit 412b526

Please sign in to comment.