From dbdb9b0ca89c9e1ab0e46ffd54e5b25b038d5189 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 7 Dec 2023 21:25:41 +0530 Subject: [PATCH] suggested change --- flow/activities/flowable.go | 50 +++++++++++++++---- flow/connectors/postgres/client.go | 2 + .../connectors/utils/monitoring/monitoring.go | 2 +- nexus/catalog/migrations/V12__slot_size.sql | 3 ++ 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index b8f289a755..bd6428da0f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -160,10 +160,39 @@ func (a *FlowableActivity) CreateNormalizedTable( return conn.SetupNormalizedTables(config) } +func (a *FlowableActivity) recordSlotSizePeriodically( + ctx context.Context, + done <-chan struct{}, + peerName string, + slotInfo []*protos.SlotInfo, +) { + timeout := 10 * time.Minute + ticker := time.NewTicker(timeout) + + defer ticker.Stop() + for { + if len(slotInfo) == 0 { + continue + } + + select { + case <-ticker.C: + a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + case <-done: + a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + } + ticker.Stop() + ticker = time.NewTicker(timeout) + } + +} + // StartFlow implements StartFlow. func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") + done := make(chan struct{}) + defer close(done) conn := input.FlowConnectionConfigs ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) @@ -205,6 +234,16 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } defer connectors.CloseConnector(srcConn) + slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName) + if input.FlowConnectionConfigs.ReplicationSlotName != "" { + slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName + } + slotInfo, err := srcConn.GetSlotInfo(slotNameForMetrics) + if err != nil { + return nil, fmt.Errorf("failed to get slot info: %w", err) + } + + go a.recordSlotSizePeriodically(ctx, done, input.FlowConnectionConfigs.Source.Name, slotInfo) // start a goroutine to pull records from the source errGroup.Go(func() error { return srcConn.PullRecords(&model.PullRecordsRequest{ @@ -222,16 +261,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) }) - slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName) - slotInfo, err := srcConn.GetSlotInfo(slotNameForMetrics) - if err != nil { - return nil, fmt.Errorf("failed to get slot info: %w", err) - } - - if len(slotInfo) == 1 { - a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, input.FlowConnectionConfigs.Source.Name, slotInfo[0]) - } - hasRecords := !recordBatch.WaitAndCheckEmpty() log.WithFields(log.Fields{ "flowName": input.FlowConnectionConfigs.FlowJobName, @@ -339,6 +368,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) activity.RecordHeartbeat(ctx, pushedRecordsWithCount) + done <- struct{}{} return res, nil } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 7e21c6a1fe..92a1b181c9 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -198,6 +198,8 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str } // GetSlotInfo gets the information about the replication slot size and LSNs +// If slotName input is empty, all slot info rows are returned - this is for UI. +// Else, only the row pertaining to that slotName will be returned. func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) { specificSlotClause := "" if slotName != "" { diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 8df32e384a..936aaa909c 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -258,7 +258,7 @@ func (c *CatalogMirrorMonitor) AppendSlotSizeInfo( _, err := c.catalogConn.Exec(ctx, "INSERT INTO peerdb_stats.peer_slot_size"+ "(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size) "+ - "VALUES($1,$2,$3,$4,$5,$6);", + "VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING;", peerName, slotInfo.SlotName, slotInfo.RestartLSN, diff --git a/nexus/catalog/migrations/V12__slot_size.sql b/nexus/catalog/migrations/V12__slot_size.sql index b6125c8761..426bdb8bc2 100644 --- a/nexus/catalog/migrations/V12__slot_size.sql +++ b/nexus/catalog/migrations/V12__slot_size.sql @@ -1,4 +1,5 @@ CREATE TABLE IF NOT EXISTS peerdb_stats.peer_slot_size ( + id SERIAL PRIMARY KEY, slot_name TEXT NOT NULL, peer_name TEXT NOT NULL, redo_lsn TEXT, @@ -7,3 +8,5 @@ CREATE TABLE IF NOT EXISTS peerdb_stats.peer_slot_size ( slot_size BIGINT, updated_at TIMESTAMP NOT NULL DEFAULT NOW() ); + +CREATE INDEX index_slot_name ON peerdb_stats.peer_slot_size (slot_name);