From 3eb09d7f57cc011deec5338ed3b73e943fba2ee3 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 16 Feb 2024 05:12:01 +0530 Subject: [PATCH 1/9] avoid sql placeholder --- flow/connectors/postgres/validate.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 4928e7c6d5..198f2ae0a0 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -67,7 +67,8 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use } var replicationRes bool - err := c.conn.QueryRow(ctx, "SELECT rolreplication FROM pg_roles WHERE rolname = $1", username).Scan(&replicationRes) + err := c.conn.QueryRow(ctx, "SELECT rolreplication FROM pg_roles WHERE rolname = "+ + QuoteLiteral(username)).Scan(&replicationRes) if err != nil { return err } From b704744ce59fedaab24a74eaaf9f2e701cf62a56 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 16 Feb 2024 05:21:49 +0530 Subject: [PATCH 2/9] replace another occurrence --- flow/connectors/postgres/validate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 198f2ae0a0..9d574c3703 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -35,7 +35,7 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableStr := strings.Join(tableArr, ",") // Check if publication exists - err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) + err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname="+QuoteLiteral(pubName)).Scan(nil) if err != nil { if err == pgx.ErrNoRows { return fmt.Errorf("publication does not exist: %s", pubName) From 46eab45a5219d1336a1e9e0351389181b88a2d4c Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 16 Feb 2024 05:33:27 +0530 Subject: [PATCH 3/9] check no rows for rds --- flow/connectors/postgres/validate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 9d574c3703..b1d1ba77f1 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -77,7 +77,7 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use // RDS case: check pg_settings for rds.logical_replication var setting string err := c.conn.QueryRow(ctx, "SELECT setting FROM pg_settings WHERE name = 'rds.logical_replication'").Scan(&setting) - if err != nil || setting != "on" { + if (err != nil && err != pgx.ErrNoRows) || setting != "on" { return errors.New("postgres user does not have replication role") } } From fe325604acdcfee922f46708531c9cc4a16f1525 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 16 Feb 2024 05:38:44 +0530 Subject: [PATCH 4/9] dont error for no rows in rolrepl check --- flow/connectors/postgres/validate.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index b1d1ba77f1..0b12604886 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "strconv" "strings" @@ -70,7 +71,12 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use err := c.conn.QueryRow(ctx, "SELECT rolreplication FROM pg_roles WHERE rolname = "+ QuoteLiteral(username)).Scan(&replicationRes) if err != nil { - return err + if err != pgx.ErrNoRows { + c.logger.Warn("No rows in pg_roles for user. Skipping rolereplication check", + slog.String("username", username)) + } else { + return err + } } if !replicationRes { From dd026a19c0b3eb2e4ad615b96e40031835f6a962 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 16 Feb 2024 05:41:43 +0530 Subject: [PATCH 5/9] silly error --- flow/connectors/postgres/validate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 0b12604886..4e8d0910c0 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -71,7 +71,7 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use err := c.conn.QueryRow(ctx, "SELECT rolreplication FROM pg_roles WHERE rolname = "+ QuoteLiteral(username)).Scan(&replicationRes) if err != nil { - if err != pgx.ErrNoRows { + if err == pgx.ErrNoRows { c.logger.Warn("No rows in pg_roles for user. Skipping rolereplication check", slog.String("username", username)) } else { From 3b44193f5a4d1143441cda55784ede2dc1f82d45 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 16 Feb 2024 05:49:27 +0530 Subject: [PATCH 6/9] minor fix --- flow/connectors/postgres/validate.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 4e8d0910c0..444f4edc65 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -83,8 +83,10 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use // RDS case: check pg_settings for rds.logical_replication var setting string err := c.conn.QueryRow(ctx, "SELECT setting FROM pg_settings WHERE name = 'rds.logical_replication'").Scan(&setting) - if (err != nil && err != pgx.ErrNoRows) || setting != "on" { - return errors.New("postgres user does not have replication role") + if err != pgx.ErrNoRows { + if err != nil || setting != "on" { + return errors.New("postgres user does not have replication role") + } } } From 0afc48b4199b727187ac3ca89314aa8d758c2f62 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Fri, 16 Feb 2024 05:55:14 +0530 Subject: [PATCH 7/9] Update flow/connectors/postgres/validate.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Philip Dubé --- flow/connectors/postgres/validate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 444f4edc65..34298122db 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -36,7 +36,7 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableStr := strings.Join(tableArr, ",") // Check if publication exists - err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname="+QuoteLiteral(pubName)).Scan(nil) + err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) if err != nil { if err == pgx.ErrNoRows { return fmt.Errorf("publication does not exist: %s", pubName) From ff2e28e9de8adc28efe049c9115f2803eecf3f07 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 16 Feb 2024 05:56:11 +0530 Subject: [PATCH 8/9] revert back to placeholder --- flow/connectors/postgres/validate.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 34298122db..c38b5fb130 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -68,8 +68,7 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use } var replicationRes bool - err := c.conn.QueryRow(ctx, "SELECT rolreplication FROM pg_roles WHERE rolname = "+ - QuoteLiteral(username)).Scan(&replicationRes) + err := c.conn.QueryRow(ctx, "SELECT rolreplication FROM pg_roles WHERE rolname = $1", username).Scan(&replicationRes) if err != nil { if err == pgx.ErrNoRows { c.logger.Warn("No rows in pg_roles for user. Skipping rolereplication check", From 93d28110bca44121a7ca522ff604e996af1c581d Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 16 Feb 2024 06:03:17 +0530 Subject: [PATCH 9/9] minor change --- flow/connectors/postgres/validate.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index c38b5fb130..8e715278d8 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log/slog" "strconv" "strings" @@ -72,7 +71,7 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use if err != nil { if err == pgx.ErrNoRows { c.logger.Warn("No rows in pg_roles for user. Skipping rolereplication check", - slog.String("username", username)) + "username", username) } else { return err }