Skip to content

Commit

Permalink
Fix an issue where we were trying to log metrics for full table (#473)
Browse files Browse the repository at this point in the history
For full table partitions skip logging metrics.
  • Loading branch information
iskakaushik authored Oct 3, 2023
1 parent 8a434aa commit af126b5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterActivity(&activities.FlowableActivity{
EnableMetrics: opts.EnableMetrics,
CatalogMirrorMonitor: &catalogMirrorMonitor,
CatalogMirrorMonitor: catalogMirrorMonitor,
})

err = w.Run(worker.InterruptCh())
Expand Down
9 changes: 7 additions & 2 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type CDCBatchInfo struct {
StartTime time.Time
}

func NewCatalogMirrorMonitor(catalogConn *pgxpool.Pool) CatalogMirrorMonitor {
return CatalogMirrorMonitor{
func NewCatalogMirrorMonitor(catalogConn *pgxpool.Pool) *CatalogMirrorMonitor {
return &CatalogMirrorMonitor{
catalogConn: catalogConn,
}
}
Expand Down Expand Up @@ -189,6 +189,11 @@ func (c *CatalogMirrorMonitor) AddPartitionToQRepRun(ctx context.Context, flowJo
return nil
}

if partition.Range == nil && partition.FullTablePartition {
log.Infof("partition %s is a full table partition. Metrics logging is skipped.", partition.PartitionId)
return nil
}

var rangeStart, rangeEnd string
switch x := partition.Range.Range.(type) {
case *protos.PartitionRange_IntRange:
Expand Down

0 comments on commit af126b5

Please sign in to comment.