diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 6834402b3b..cdb9dbce74 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -512,13 +512,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { return err } - command := ` - BEGIN; - DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` + command := peerdbenv.PeerDBWALHeartbeatQuery() // run above command for each Postgres peer for _, pgPeer := range pgPeers { activity.RecordHeartbeat(ctx, pgPeer.Name) diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index aadc38c36a..895b9376a6 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -106,6 +106,15 @@ func PeerDBEnableWALHeartbeat() bool { return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) } +// PEERDB_WAL_HEARTBEAT_QUERY +func PeerDBWALHeartbeatQuery() string { + return GetEnvString("PEERDB_WAL_HEARTBEAT_QUERY", `BEGIN; +DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); +CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); +DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); +END;`) +} + // PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE func PeerDBEnableParallelSyncNormalize() bool { return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)