From a9bdf2f5d5283b21f89762e73e8828e4a6efe979 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 2 Oct 2023 10:00:30 -0400 Subject: [PATCH] Add SendWALHeartbeat Method Implemented `SendWALHeartbeat()` method to maintain PostgreSQL WAL activity. To ensure consistent WAL activity in idle PostgreSQL databases and prevent indefinite growth of replication slots, a heartbeat mechanism is introduced. This method generates WAL activity by creating and immediately dropping an aggregate function, ensuring a guaranteed and lightweight WAL entry without leaving persistent changes in the database schema. Using an aggregate was chosen for its guaranteed WAL generation and minimal impact. fixes: https://github.com/PeerDB-io/peerdb/issues/429 --- flow/activities/flowable.go | 15 +++++++++++++++ flow/connectors/core.go | 3 +++ flow/connectors/postgres/postgres.go | 16 ++++++++++++++++ flow/workflows/cdc_flow.go | 17 +++++++++++++++++ 4 files changed, 51 insertions(+) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 193c9751e5..11c1fe3638 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -625,3 +625,18 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown } return nil } + +func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos.FlowConnectionConfigs) error { + srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source) + if err != nil { + return fmt.Errorf("failed to get destination connector: %w", err) + } + defer connectors.CloseConnector(srcConn) + + err = srcConn.SendWALHeartbeat() + if err != nil { + return fmt.Errorf("failed to send WAL heartbeat: %w", err) + } + + return nil +} diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 48c3e7e612..dbc040a5b2 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -40,6 +40,9 @@ type CDCPullConnector interface { // PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR PullFlowCleanup(jobName string) error + + // SendWALHeartbeat allows for activity to progress restart_lsn on postgres. + SendWALHeartbeat() error } type CDCSyncConnector interface { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f37fc88bdf..066098405d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -858,6 +858,22 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error { return nil } +func (c *PostgresConnector) SendWALHeartbeat() error { + command := ` + BEGIN; + DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); + CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); + DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); + END; + ` + _, err := c.pool.Exec(c.ctx, command) + if err != nil { + return fmt.Errorf("error bumping wal position: %w", err) + } + + return nil +} + // parseSchemaTable parses a table name into schema and table name. func parseSchemaTable(tableName string) (*SchemaTable, error) { parts := strings.Split(tableName, ".") diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 46d44ad51b..f9209820d4 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -104,6 +104,18 @@ func (s *CDCFlowState) TruncateProgress() { } } +func (s *CDCFlowState) SendWALHeartbeat(ctx workflow.Context, cfg *protos.FlowConnectionConfigs) error { + walHeartbeatCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + if err := workflow.ExecuteActivity(walHeartbeatCtx, flowable.SendWALHeartbeat, cfg).Get(ctx, nil); err != nil { + return fmt.Errorf("failed to send WAL heartbeat: %w", err) + } + + return nil +} + // CDCFlowWorkflowExecution represents the state for execution of a peer flow. type CDCFlowWorkflowExecution struct { flowExecutionID string @@ -336,6 +348,11 @@ func CDCFlowWorkflowWithConfig( } } + // send WAL heartbeat + if err := state.SendWALHeartbeat(ctx, cfg); err != nil { + return state, err + } + state.TruncateProgress() return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) }