From 1ff80fee90d54b4666d350084696aef4f78a0624 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 Dec 2023 19:37:57 +0530 Subject: [PATCH] fix: append instead of upsert, updated_at timestamp --- flow/activities/flowable.go | 4 +++- flow/connectors/utils/monitoring/monitoring.go | 8 ++++---- nexus/catalog/migrations/V12__slot_size.sql | 3 +-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 0c6fe3f3b1..b8f289a755 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -228,7 +228,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, fmt.Errorf("failed to get slot info: %w", err) } - a.CatalogMirrorMonitor.UpsertSlotSizeInfo(ctx, input.FlowConnectionConfigs.Source.Name, slotInfo[0]) + if len(slotInfo) == 1 { + a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, input.FlowConnectionConfigs.Source.Name, slotInfo[0]) + } hasRecords := !recordBatch.WaitAndCheckEmpty() log.WithFields(log.Fields{ diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index b698d26cf7..8df32e384a 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -246,7 +246,7 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU return nil } -func (c *CatalogMirrorMonitor) UpsertSlotSizeInfo( +func (c *CatalogMirrorMonitor) AppendSlotSizeInfo( ctx context.Context, peerName string, slotInfo *protos.SlotInfo, @@ -256,9 +256,9 @@ func (c *CatalogMirrorMonitor) UpsertSlotSizeInfo( } _, err := c.catalogConn.Exec(ctx, - "INSERT INTO peerdb_stats.peer_slot_size(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size) "+ - "VALUES($1,$2,$3,$4,$5,$6) "+ - "ON CONFLICT (slot_name, peer_name) DO UPDATE SET restart_lsn = $3, redo_lsn = $4, confirmed_flush_lsn = $5, slot_size = $6", + "INSERT INTO peerdb_stats.peer_slot_size"+ + "(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size) "+ + "VALUES($1,$2,$3,$4,$5,$6);", peerName, slotInfo.SlotName, slotInfo.RestartLSN, diff --git a/nexus/catalog/migrations/V12__slot_size.sql b/nexus/catalog/migrations/V12__slot_size.sql index fe56cba983..b6125c8761 100644 --- a/nexus/catalog/migrations/V12__slot_size.sql +++ b/nexus/catalog/migrations/V12__slot_size.sql @@ -5,6 +5,5 @@ CREATE TABLE IF NOT EXISTS peerdb_stats.peer_slot_size ( restart_lsn TEXT, confirmed_flush_lsn TEXT, slot_size BIGINT, - updated_at - PRIMARY KEY (slot_name, peer_name) + updated_at TIMESTAMP NOT NULL DEFAULT NOW() );