Skip to content

Commit

Permalink
Add SendWALHeartbeat Method (#462)
Browse files Browse the repository at this point in the history
Implemented `SendWALHeartbeat()` method to maintain PostgreSQL WAL
activity.

To ensure consistent WAL activity in idle PostgreSQL databases and
prevent indefinite growth of replication slots, a heartbeat mechanism is
introduced. This method generates WAL activity by creating and
immediately dropping an aggregate function, ensuring a guaranteed and
lightweight WAL entry without leaving persistent changes in the database
schema. Using an aggregate was chosen for its guaranteed WAL generation
and minimal impact.

fixes: #429
  • Loading branch information
iskakaushik authored Oct 2, 2023
1 parent 43b4a16 commit bb9deb7
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 0 deletions.
15 changes: 15 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,18 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
}
return nil
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos.FlowConnectionConfigs) error {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(srcConn)

err = srcConn.SendWALHeartbeat()
if err != nil {
return fmt.Errorf("failed to send WAL heartbeat: %w", err)
}

return nil
}
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type CDCPullConnector interface {

// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error

// SendWALHeartbeat allows for activity to progress restart_lsn on postgres.
SendWALHeartbeat() error
}

type CDCSyncConnector interface {
Expand Down
16 changes: 16 additions & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,22 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error {
return nil
}

func (c *PostgresConnector) SendWALHeartbeat() error {
command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
_, err := c.pool.Exec(c.ctx, command)
if err != nil {
return fmt.Errorf("error bumping wal position: %w", err)
}

return nil
}

// parseSchemaTable parses a table name into schema and table name.
func parseSchemaTable(tableName string) (*SchemaTable, error) {
parts := strings.Split(tableName, ".")
Expand Down
17 changes: 17 additions & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ func (s *CDCFlowState) TruncateProgress() {
}
}

func (s *CDCFlowState) SendWALHeartbeat(ctx workflow.Context, cfg *protos.FlowConnectionConfigs) error {
walHeartbeatCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})

if err := workflow.ExecuteActivity(walHeartbeatCtx, flowable.SendWALHeartbeat, cfg).Get(ctx, nil); err != nil {
return fmt.Errorf("failed to send WAL heartbeat: %w", err)
}

return nil
}

// CDCFlowWorkflowExecution represents the state for execution of a peer flow.
type CDCFlowWorkflowExecution struct {
flowExecutionID string
Expand Down Expand Up @@ -336,6 +348,11 @@ func CDCFlowWorkflowWithConfig(
}
}

// send WAL heartbeat
if err := state.SendWALHeartbeat(ctx, cfg); err != nil {
return state, err
}

state.TruncateProgress()
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
}

0 comments on commit bb9deb7

Please sign in to comment.