diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 454dceb20e..34f31219ed 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -209,7 +209,7 @@ func (h *FlowRequestHandler) GetSlotInfo( return &protos.PeerSlotResponse{SlotData: nil}, err } - pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig) + pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false) if err != nil { slog.Error("Failed to create postgres connector", slog.Any("error", err)) return &protos.PeerSlotResponse{SlotData: nil}, err @@ -236,7 +236,7 @@ func (h *FlowRequestHandler) GetStatInfo( return &protos.PeerStatResponse{StatData: nil}, err } - pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig) + pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false) if err != nil { slog.Error("Failed to create postgres connector", slog.Any("error", err)) return &protos.PeerStatResponse{StatData: nil}, err diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e5efec63ca..477b7cf46b 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -43,9 +43,6 @@ type CDCPullConnector interface { // PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR PullFlowCleanup(jobName string) error - // SendWALHeartbeat allows for activity to progress restart_lsn on postgres. - SendWALHeartbeat() error - // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) @@ -140,7 +137,7 @@ func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConne inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), true) default: return nil, ErrUnsupportedFunctionality } @@ -150,7 +147,7 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false) case *protos.Peer_BigqueryConfig: return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig()) case *protos.Peer_SnowflakeConfig: @@ -172,7 +169,7 @@ func GetCDCNormalizeConnector(ctx context.Context, inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false) case *protos.Peer_BigqueryConfig: return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig()) case *protos.Peer_SnowflakeConfig: @@ -186,7 +183,7 @@ func GetQRepPullConnector(ctx context.Context, config *protos.Peer) (QRepPullCon inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false) case *protos.Peer_SqlserverConfig: return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) default: @@ -198,7 +195,7 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false) case *protos.Peer_BigqueryConfig: return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig()) case *protos.Peer_SnowflakeConfig: @@ -219,7 +216,10 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { if pgConfig == nil { return nil, fmt.Errorf("missing postgres config for %s peer %s", peer.Type.String(), peer.Name) } - return connpostgres.NewPostgresConnector(ctx, pgConfig) + // we can't decide if a PG peer should have replication permissions on it because we don't know + // what the user wants to do with it, so defaulting to being permissive. + // can be revisited in the future or we can use some UI wizardry. + return connpostgres.NewPostgresConnector(ctx, pgConfig, false) case protos.DBType_BIGQUERY: bqConfig := peer.GetBigqueryConfig() if bqConfig == nil { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index a46005fe01..1152493b01 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -34,7 +34,7 @@ type PostgresConnector struct { } // NewPostgresConnector creates a new instance of PostgresConnector. -func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { +func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig, initializeReplPool bool) (*PostgresConnector, error) { connectionString := utils.GetPGConnectionString(pgConfig) // create a separate connection pool for non-replication queries as replication connections cannot @@ -62,21 +62,23 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) return nil, fmt.Errorf("failed to get custom type map: %w", err) } - // ensure that replication is set to database - replConnConfig, err := pgxpool.ParseConfig(connectionString) - if err != nil { - return nil, fmt.Errorf("failed to parse connection string: %w", err) - } + // only initialize for CDCPullConnector to reduce number of idle connections + var replPool *SSHWrappedPostgresPool + if initializeReplPool { + // ensure that replication is set to database + replConnConfig, err := pgxpool.ParseConfig(connectionString) + if err != nil { + return nil, fmt.Errorf("failed to parse connection string: %w", err) + } - replConnConfig.ConnConfig.RuntimeParams["replication"] = "database" - replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex" - replConnConfig.MaxConns = 1 + replConnConfig.ConnConfig.RuntimeParams["replication"] = "database" + replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex" + replConnConfig.MaxConns = 1 - // TODO: replPool not initializing might be intentional, if we only want to use QRep mirrors - // and the user doesn't have the REPLICATION permission - replPool, err := NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig) - if err != nil { - return nil, fmt.Errorf("failed to create connection pool: %w", err) + replPool, err = NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig) + if err != nil { + return nil, fmt.Errorf("failed to create replication connection pool: %w", err) + } } metadataSchema := "_peerdb_internal" @@ -880,22 +882,6 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error { return nil } -func (c *PostgresConnector) SendWALHeartbeat() error { - command := ` - BEGIN; - DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` - _, err := c.pool.Exec(c.ctx, command) - if err != nil { - return fmt.Errorf("error bumping wal position: %w", err) - } - - return nil -} - // GetLastOffset returns the last synced offset for a job. func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error) { row := c.pool. diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index b50a1f89fc..df3a7de13f 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -28,7 +28,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() { User: "postgres", Password: "postgres", Database: "postgres", - }) + }, true) require.NoError(suite.T(), err) setupTx, err := suite.connector.pool.Begin(context.Background()) diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 98a6a47b99..8a919eb214 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -32,7 +32,7 @@ func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() { User: "postgres", Password: "postgres", Database: "postgres", - }) + }, false) suite.failTestError(err) setupTx, err := suite.connector.pool.Begin(context.Background()) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 1c86c973b9..0bb886f9a3 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -55,7 +55,7 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { User: "postgres", Password: "postgres", Database: "postgres", - }) + }, false) s.NoError(err) }