diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 193c9751e5..11c1fe3638 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -625,3 +625,18 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown } return nil } + +func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos.FlowConnectionConfigs) error { + srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source) + if err != nil { + return fmt.Errorf("failed to get destination connector: %w", err) + } + defer connectors.CloseConnector(srcConn) + + err = srcConn.SendWALHeartbeat() + if err != nil { + return fmt.Errorf("failed to send WAL heartbeat: %w", err) + } + + return nil +} diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 48c3e7e612..dbc040a5b2 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -40,6 +40,9 @@ type CDCPullConnector interface { // PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR PullFlowCleanup(jobName string) error + + // SendWALHeartbeat allows for activity to progress restart_lsn on postgres. + SendWALHeartbeat() error } type CDCSyncConnector interface { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f37fc88bdf..066098405d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -858,6 +858,22 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error { return nil } +func (c *PostgresConnector) SendWALHeartbeat() error { + command := ` + BEGIN; + DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); + CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); + DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); + END; + ` + _, err := c.pool.Exec(c.ctx, command) + if err != nil { + return fmt.Errorf("error bumping wal position: %w", err) + } + + return nil +} + // parseSchemaTable parses a table name into schema and table name. func parseSchemaTable(tableName string) (*SchemaTable, error) { parts := strings.Split(tableName, ".") diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 46d44ad51b..f9209820d4 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -104,6 +104,18 @@ func (s *CDCFlowState) TruncateProgress() { } } +func (s *CDCFlowState) SendWALHeartbeat(ctx workflow.Context, cfg *protos.FlowConnectionConfigs) error { + walHeartbeatCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + if err := workflow.ExecuteActivity(walHeartbeatCtx, flowable.SendWALHeartbeat, cfg).Get(ctx, nil); err != nil { + return fmt.Errorf("failed to send WAL heartbeat: %w", err) + } + + return nil +} + // CDCFlowWorkflowExecution represents the state for execution of a peer flow. type CDCFlowWorkflowExecution struct { flowExecutionID string @@ -336,6 +348,11 @@ func CDCFlowWorkflowWithConfig( } } + // send WAL heartbeat + if err := state.SendWALHeartbeat(ctx, cfg); err != nil { + return state, err + } + state.TruncateProgress() return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) }