From 346e34c89d8ab1cb55f77a40dc7d2384456d1c3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 23 Jan 2024 16:23:38 +0000 Subject: [PATCH] external metadata: prevent sql injection (#1132) --- flow/connectors/eventhub/eventhub.go | 6 +----- flow/connectors/external_metadata/store.go | 19 ++++++++++++------- flow/connectors/s3/s3.go | 6 +----- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index b14400cb34..9eab9ae1b2 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -302,9 +302,5 @@ func (c *EventHubConnector) SetupNormalizedTables( } func (c *EventHubConnector) SyncFlowCleanup(jobName string) error { - err := c.pgMetadata.DropMetadata(jobName) - if err != nil { - return err - } - return nil + return c.pgMetadata.DropMetadata(jobName) } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 4bcfcaa583..e1858680cf 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -67,6 +68,10 @@ func (p *PostgresMetadataStore) Close() error { return nil } +func (p *PostgresMetadataStore) QualifyTable(table string) string { + return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table) +} + func (p *PostgresMetadataStore) Ping() error { if p.pool == nil { return fmt.Errorf("metadata db ping failed as pool does not exist") @@ -107,7 +112,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { // create the last sync state table _, err = p.pool.Exec(p.ctx, ` - CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` ( + CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` ( job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT NOW(), @@ -126,7 +131,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { rows := p.pool.QueryRow(p.ctx, ` SELECT last_offset - FROM `+p.schemaName+`.`+lastSyncStateTableName+` + FROM `+p.QualifyTable(lastSyncStateTableName)+` WHERE job_name = $1 `, jobName) var offset pgtype.Int8 @@ -148,7 +153,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { rows := p.pool.QueryRow(p.ctx, ` SELECT sync_batch_id - FROM `+p.schemaName+`.`+lastSyncStateTableName+` + FROM `+p.QualifyTable(lastSyncStateTableName)+` WHERE job_name = $1 `, jobName) @@ -180,10 +185,10 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e // update the last offset p.logger.Info("updating last offset", slog.Int64("offset", offset)) _, err = tx.Exec(p.ctx, ` - INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) + INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) - DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), + DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset), updated_at = NOW() `, jobName, offset, 0) @@ -206,7 +211,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e func (p *PostgresMetadataStore) IncrementID(jobName string) error { p.logger.Info("incrementing sync batch id for job") _, err := p.pool.Exec(p.ctx, ` - UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` + UPDATE `+p.QualifyTable(lastSyncStateTableName)+` SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1 `, jobName) if err != nil { @@ -219,7 +224,7 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error { func (p *PostgresMetadataStore) DropMetadata(jobName string) error { _, err := p.pool.Exec(p.ctx, ` - DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+` + DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+` WHERE job_name = $1 `, jobName) return err diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 7a4e723e32..db96a927fb 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -242,9 +242,5 @@ func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatc } func (c *S3Connector) SyncFlowCleanup(jobName string) error { - err := c.pgMetadata.DropMetadata(jobName) - if err != nil { - return err - } - return nil + return c.pgMetadata.DropMetadata(jobName) }