Skip to content

Commit

Permalink
switched to raw throughput, fixed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Sep 27, 2023
1 parent 64da63e commit 650c988
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 32 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ services:
context: .
dockerfile: stacks/ui.Dockerfile
ports:
- 3001:3001
- 3001:3000
environment:
<<: *catalog-config
PEERDB_FLOW_SERVER_ADDRESS: flow_api:8112
Expand Down
18 changes: 4 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}).Info("no records to push")
metrics.LogSyncMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1)
metrics.LogNormalizeMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1, 0)
metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0)
return &model.SyncResponse{
RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping,
TableSchemaDelta: recordsWithTableSchemaDelta.TableSchemaDelta,
Expand Down Expand Up @@ -297,6 +298,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)

metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName,
float64(numRecords)/(pullDuration.Seconds()+syncDuration.Seconds()))

return res, nil
}

Expand Down Expand Up @@ -325,13 +329,6 @@ func (a *FlowableActivity) StartNormalize(
if err != nil {
return nil, err
}

throughput, err := a.CatalogMirrorMonitor.GetThroughputForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
lastSyncBatchID)
if err != nil {
return nil, err
}
metrics.LogCDCOverallMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, throughput)
} else if err != nil {
return nil, err
}
Expand Down Expand Up @@ -364,13 +361,6 @@ func (a *FlowableActivity) StartNormalize(
return nil, err
}

throughput, err := a.CatalogMirrorMonitor.GetThroughputForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID)
if err != nil {
return nil, err
}
metrics.LogCDCOverallMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, throughput)

// log the number of batches normalized
if res != nil {
log.Infof("normalized records from batch %d to batch %d\n", res.StartBatchID, res.EndBatchID)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func LogQRepNormalizeMetrics(ctx context.Context, flowJobName string,
totalRecordsAtTargetGauge.Update(float64(totalRecordsAtTarget))
}

func LogCDCOverallMetrics(ctx context.Context, flowJobName string, throughput float64) {
func LogCDCRawThroughputMetrics(ctx context.Context, flowJobName string, throughput float64) {
if ctx.Value(shared.EnableMetricsKey) != true {
return
}
Expand Down
16 changes: 0 additions & 16 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,6 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForCDCBatch(ctx context.Context, flo
return nil
}

func (c *CatalogMirrorMonitor) GetThroughputForCDCBatch(ctx context.Context,
flowJobName string, batchID int64) (float64, error) {
if c == nil || c.catalogConn == nil {
return 0, nil
}

row := c.catalogConn.QueryRow(ctx, `SELECT rows_in_batch/EXTRACT(EPOCH FROM end_time - start_time)
FROM peerdb_stats.cdc_batches WHERE flow_name=$1 AND batch_id=$2`, flowJobName, batchID)
var result float64
err := row.Scan(&result)
if err != nil {
return 0, fmt.Errorf("error reading throughput from cdc_batch: %w", err)
}
return result, nil
}

func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flowJobName string,
batchID int64, tableNameRowsMapping map[string]uint32) error {
if c == nil || c.catalogConn == nil {
Expand Down

0 comments on commit 650c988

Please sign in to comment.