From af126b54a9cf4ad75c73ee906b2ffed8ddad22d1 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 3 Oct 2023 13:59:12 -0400 Subject: [PATCH] Fix an issue where we were trying to log metrics for full table (#473) For full table partitions skip logging metrics. --- flow/cmd/worker.go | 2 +- flow/connectors/utils/monitoring/monitoring.go | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index b1631e68d4..129d7717ba 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -137,7 +137,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.DropFlowWorkflow) w.RegisterActivity(&activities.FlowableActivity{ EnableMetrics: opts.EnableMetrics, - CatalogMirrorMonitor: &catalogMirrorMonitor, + CatalogMirrorMonitor: catalogMirrorMonitor, }) err = w.Run(worker.InterruptCh()) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index f297e7cee0..a30ccd9ee5 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -25,8 +25,8 @@ type CDCBatchInfo struct { StartTime time.Time } -func NewCatalogMirrorMonitor(catalogConn *pgxpool.Pool) CatalogMirrorMonitor { - return CatalogMirrorMonitor{ +func NewCatalogMirrorMonitor(catalogConn *pgxpool.Pool) *CatalogMirrorMonitor { + return &CatalogMirrorMonitor{ catalogConn: catalogConn, } } @@ -189,6 +189,11 @@ func (c *CatalogMirrorMonitor) AddPartitionToQRepRun(ctx context.Context, flowJo return nil } + if partition.Range == nil && partition.FullTablePartition { + log.Infof("partition %s is a full table partition. Metrics logging is skipped.", partition.PartitionId) + return nil + } + var rangeStart, rangeEnd string switch x := partition.Range.Range.(type) { case *protos.PartitionRange_IntRange: