diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index b1631e68d4..129d7717ba 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -137,7 +137,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.DropFlowWorkflow) w.RegisterActivity(&activities.FlowableActivity{ EnableMetrics: opts.EnableMetrics, - CatalogMirrorMonitor: &catalogMirrorMonitor, + CatalogMirrorMonitor: catalogMirrorMonitor, }) err = w.Run(worker.InterruptCh()) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index f297e7cee0..a30ccd9ee5 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -25,8 +25,8 @@ type CDCBatchInfo struct { StartTime time.Time } -func NewCatalogMirrorMonitor(catalogConn *pgxpool.Pool) CatalogMirrorMonitor { - return CatalogMirrorMonitor{ +func NewCatalogMirrorMonitor(catalogConn *pgxpool.Pool) *CatalogMirrorMonitor { + return &CatalogMirrorMonitor{ catalogConn: catalogConn, } } @@ -189,6 +189,11 @@ func (c *CatalogMirrorMonitor) AddPartitionToQRepRun(ctx context.Context, flowJo return nil } + if partition.Range == nil && partition.FullTablePartition { + log.Infof("partition %s is a full table partition. Metrics logging is skipped.", partition.PartitionId) + return nil + } + var rangeStart, rangeEnd string switch x := partition.Range.Range.(type) { case *protos.PartitionRange_IntRange: