diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 238baf67fe..d0a012e2d3 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -283,6 +283,7 @@ func (a *FlowableActivity) MaintainPull( config *protos.FlowConnectionConfigs, sessionID string, ) error { + ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { return err @@ -670,7 +671,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { func() { pgConfig := pgPeer.GetPostgresConfig() - pgConn, peerErr := connpostgres.NewPostgresConnector(ctx, pgConfig) + pgConn, peerErr := connpostgres.NewPostgresConnector(ctx, nil, pgConfig) if peerErr != nil { logger.Error(fmt.Sprintf("error creating connector for postgres peer %s with host %s: %v", pgPeer.Name, pgConfig.Host, peerErr)) diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index bb36e7aa4a..c41ed0cab6 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -337,7 +337,7 @@ func (h *FlowRequestHandler) GetSlotInfo( return nil, err } - pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig) + pgConnector, err := connpostgres.NewPostgresConnector(ctx, nil, pgConfig) if err != nil { slog.Error("Failed to create postgres connector", slog.Any("error", err)) return nil, err diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 99c9e9837b..3e870aa667 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -55,7 +55,7 @@ func (h *FlowRequestHandler) ValidateCDCMirror( return nil, errors.New("source peer config is not postgres") } - pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig) + pgPeer, err := connpostgres.NewPostgresConnector(ctx, nil, sourcePeerConfig) if err != nil { displayErr := fmt.Errorf("failed to create postgres connector: %v", err) h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, displayErr.Error()) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index ba2524bd3b..75d52506b5 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -382,7 +382,7 @@ func LoadPeer(ctx context.Context, catalogPool *pgxpool.Pool, peerName string) ( func GetConnector(ctx context.Context, env map[string]string, config *protos.Peer) (Connector, error) { switch inner := config.Config.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, inner.PostgresConfig) + return connpostgres.NewPostgresConnector(ctx, env, inner.PostgresConfig) case *protos.Peer_BigqueryConfig: return connbigquery.NewBigQueryConnector(ctx, inner.BigqueryConfig) case *protos.Peer_SnowflakeConfig: diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index ac59e6b3c3..45730a87de 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -37,7 +37,6 @@ type PostgresConnector struct { config *protos.PostgresConfig ssh *SSHTunnel conn *pgx.Conn - replConfig *pgx.ConnConfig replConn *pgx.Conn replState *ReplState customTypesMapping map[uint32]string @@ -55,18 +54,23 @@ type ReplState struct { LastOffset atomic.Int64 } -func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { +func NewPostgresConnector(ctx context.Context, env map[string]string, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { logger := logger.LoggerFromCtx(ctx) - connectionString := shared.GetPGConnectionString(pgConfig) + flowNameInApplicationName, err := peerdbenv.PeerDBApplicationNamePerMirrorName(ctx, nil) + if err != nil { + logger.Error("Failed to get flow name from application name", slog.Any("error", err)) + } + var flowName string + if flowNameInApplicationName { + flowName, _ = ctx.Value(shared.FlowNameKey).(string) + } + connectionString := shared.GetPGConnectionString(pgConfig, flowName) - // create a separate connection pool for non-replication queries as replication connections cannot - // be used for extended query protocol, i.e. prepared statements connConfig, err := pgx.ParseConfig(connectionString) if err != nil { return nil, fmt.Errorf("failed to parse connection string: %w", err) } - replConfig := connConfig.Copy() runtimeParams := connConfig.Config.RuntimeParams runtimeParams["idle_in_transaction_session_timeout"] = "0" runtimeParams["statement_timeout"] = "0" @@ -83,11 +87,6 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) return nil, fmt.Errorf("failed to create connection: %w", err) } - // ensure that replication is set to database - replConfig.Config.RuntimeParams["replication"] = "database" - replConfig.Config.RuntimeParams["bytea_output"] = "hex" - replConfig.Config.RuntimeParams["intervalstyle"] = "postgres" - customTypeMap, err := shared.GetCustomDataTypes(ctx, conn) if err != nil { logger.Error("failed to get custom type map", slog.Any("error", err)) @@ -104,7 +103,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) config: pgConfig, ssh: tunnel, conn: conn, - replConfig: replConfig, + replConn: nil, replState: nil, replLock: sync.Mutex{}, customTypesMapping: customTypeMap, @@ -116,7 +115,22 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) } func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, error) { - conn, err := c.ssh.NewPostgresConnFromConfig(ctx, c.replConfig) + // create a separate connection pool for non-replication queries as replication connections cannot + // be used for extended query protocol, i.e. prepared statements + replConfig, err := pgx.ParseConfig(c.connStr) + if err != nil { + return nil, fmt.Errorf("failed to parse connection string: %w", err) + } + + runtimeParams := replConfig.Config.RuntimeParams + runtimeParams["idle_in_transaction_session_timeout"] = "0" + runtimeParams["statement_timeout"] = "0" + // ensure that replication is set to database + replConfig.Config.RuntimeParams["replication"] = "database" + replConfig.Config.RuntimeParams["bytea_output"] = "hex" + replConfig.Config.RuntimeParams["intervalstyle"] = "postgres" + + conn, err := c.ssh.NewPostgresConnFromConfig(ctx, replConfig) if err != nil { logger.LoggerFromCtx(ctx).Error("failed to create replication connection", "error", err) return nil, fmt.Errorf("failed to create replication connection: %w", err) diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 3ac0d39229..3ed3e6244a 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -25,7 +25,7 @@ type PostgresSchemaDeltaTestSuite struct { func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite { t.Helper() - connector, err := NewPostgresConnector(context.Background(), peerdbenv.GetCatalogPostgresConfigFromEnv()) + connector, err := NewPostgresConnector(context.Background(), nil, peerdbenv.GetCatalogPostgresConfigFromEnv()) require.NoError(t, err) setupTx, err := connector.conn.Begin(context.Background()) diff --git a/flow/connectors/postgres/qrep_bench_test.go b/flow/connectors/postgres/qrep_bench_test.go index 252b7520eb..f7364681bd 100644 --- a/flow/connectors/postgres/qrep_bench_test.go +++ b/flow/connectors/postgres/qrep_bench_test.go @@ -11,7 +11,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) { query := "SELECT * FROM bench.large_table" ctx := context.Background() - connector, err := NewPostgresConnector(ctx, peerdbenv.GetCatalogPostgresConfigFromEnv()) + connector, err := NewPostgresConnector(ctx, nil, peerdbenv.GetCatalogPostgresConfigFromEnv()) if err != nil { b.Fatalf("failed to create connection: %v", err) } diff --git a/flow/connectors/postgres/qrep_query_executor_test.go b/flow/connectors/postgres/qrep_query_executor_test.go index 1870749070..69c0bc9acd 100644 --- a/flow/connectors/postgres/qrep_query_executor_test.go +++ b/flow/connectors/postgres/qrep_query_executor_test.go @@ -17,7 +17,8 @@ import ( func setupDB(t *testing.T) (*PostgresConnector, string) { t.Helper() - connector, err := NewPostgresConnector(context.Background(), peerdbenv.GetCatalogPostgresConfigFromEnv()) + connector, err := NewPostgresConnector(context.Background(), + nil, peerdbenv.GetCatalogPostgresConfigFromEnv()) if err != nil { t.Fatalf("unable to create connector: %v", err) } diff --git a/flow/connectors/postgres/ssh_wrapped_pool.go b/flow/connectors/postgres/ssh_wrapped_pool.go index 15274272e9..4e5868ccd4 100644 --- a/flow/connectors/postgres/ssh_wrapped_pool.go +++ b/flow/connectors/postgres/ssh_wrapped_pool.go @@ -14,6 +14,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -79,13 +80,21 @@ func (tunnel *SSHTunnel) NewPostgresConnFromPostgresConfig( ctx context.Context, pgConfig *protos.PostgresConfig, ) (*pgx.Conn, error) { - connectionString := shared.GetPGConnectionString(pgConfig) + flowNameInApplicationName, err := peerdbenv.PeerDBApplicationNamePerMirrorName(ctx, nil) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get flow name from application name", slog.Any("error", err)) + } + + var flowName string + if flowNameInApplicationName { + flowName, _ = ctx.Value(shared.FlowNameKey).(string) + } + connectionString := shared.GetPGConnectionString(pgConfig, flowName) connConfig, err := pgx.ParseConfig(connectionString) if err != nil { return nil, err } - connConfig.RuntimeParams["application_name"] = "peerdb" return tunnel.NewPostgresConnFromConfig(ctx, connConfig) } diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index ccdd4a79eb..0f4b93c670 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -107,7 +107,8 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error { func SetupPostgres(t *testing.T, suffix string) (*connpostgres.PostgresConnector, error) { t.Helper() - connector, err := connpostgres.NewPostgresConnector(context.Background(), peerdbenv.GetCatalogPostgresConfigFromEnv()) + connector, err := connpostgres.NewPostgresConnector(context.Background(), + nil, peerdbenv.GetCatalogPostgresConfigFromEnv()) if err != nil { return nil, fmt.Errorf("failed to create postgres connection: %w", err) } diff --git a/flow/peerdbenv/catalog.go b/flow/peerdbenv/catalog.go index 1014e83060..df44ddc307 100644 --- a/flow/peerdbenv/catalog.go +++ b/flow/peerdbenv/catalog.go @@ -38,7 +38,7 @@ func GetCatalogConnectionPoolFromEnv(ctx context.Context) (*pgxpool.Pool, error) } func GetCatalogConnectionStringFromEnv() string { - return shared.GetPGConnectionString(GetCatalogPostgresConfigFromEnv()) + return shared.GetPGConnectionString(GetCatalogPostgresConfigFromEnv(), "") } func GetCatalogPostgresConfigFromEnv() *protos.PostgresConfig { diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 24f54b8e9a..28b01af437 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -172,6 +172,14 @@ DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); END;`, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_APPLICATION_NAME_PER_MIRROR_NAME", + Description: "Set Postgres application_name to have mirror name as suffix for each mirror", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, + TargetForSetting: protos.DynconfTarget_ALL, + }, } var DynamicIndex = func() map[string]int { @@ -342,3 +350,7 @@ func PeerDBQueueForceTopicCreation(ctx context.Context, env map[string]string) ( func PeerDBIntervalSinceLastNormalizeThresholdMinutes(ctx context.Context, env map[string]string) (uint32, error) { return dynamicConfUnsigned[uint32](ctx, env, "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES") } + +func PeerDBApplicationNamePerMirrorName(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_APPLICATION_NAME_PER_MIRROR_NAME") +} diff --git a/flow/shared/postgres.go b/flow/shared/postgres.go index f0f8e43d40..be3cf7d07d 100644 --- a/flow/shared/postgres.go +++ b/flow/shared/postgres.go @@ -28,16 +28,22 @@ const ( POSTGRES_15 PGVersion = 150000 ) -func GetPGConnectionString(pgConfig *protos.PostgresConfig) string { +func GetPGConnectionString(pgConfig *protos.PostgresConfig, flowName string) string { passwordEscaped := url.QueryEscape(pgConfig.Password) + applicationName := "peerdb" + if flowName != "" { + applicationName = "peerdb_" + flowName + } + // for a url like postgres://user:password@host:port/dbname connString := fmt.Sprintf( - "postgres://%s:%s@%s:%d/%s?application_name=peerdb&client_encoding=UTF8", + "postgres://%s:%s@%s:%d/%s?application_name=%s&client_encoding=UTF8", pgConfig.User, passwordEscaped, pgConfig.Host, pgConfig.Port, pgConfig.Database, + applicationName, ) return connString }