From 04a26ece823ec0d8757cf3979e446e8e1bfaa185 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 22 Dec 2023 15:02:22 +0530 Subject: [PATCH] remove SendWALHeartbeat from the CDCPullConnector interface --- flow/connectors/core.go | 3 --- flow/connectors/postgres/postgres.go | 16 ---------------- 2 files changed, 19 deletions(-) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e5efec63ca..2b5555c256 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -43,9 +43,6 @@ type CDCPullConnector interface { // PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR PullFlowCleanup(jobName string) error - // SendWALHeartbeat allows for activity to progress restart_lsn on postgres. - SendWALHeartbeat() error - // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index a46005fe01..190e8fd049 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -880,22 +880,6 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error { return nil } -func (c *PostgresConnector) SendWALHeartbeat() error { - command := ` - BEGIN; - DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` - _, err := c.pool.Exec(c.ctx, command) - if err != nil { - return fmt.Errorf("error bumping wal position: %w", err) - } - - return nil -} - // GetLastOffset returns the last synced offset for a job. func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error) { row := c.pool.