Skip to content

Commit

Permalink
external metadata: prevent sql injection (#1132)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 23, 2024
1 parent 87bdc8c commit 346e34c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 17 deletions.
6 changes: 1 addition & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,5 @@ func (c *EventHubConnector) SetupNormalizedTables(
}

func (c *EventHubConnector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
return err
}
return nil
return c.pgMetadata.DropMetadata(jobName)
}
19 changes: 12 additions & 7 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -67,6 +68,10 @@ func (p *PostgresMetadataStore) Close() error {
return nil
}

func (p *PostgresMetadataStore) QualifyTable(table string) string {
return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table)
}

func (p *PostgresMetadataStore) Ping() error {
if p.pool == nil {
return fmt.Errorf("metadata db ping failed as pool does not exist")
Expand Down Expand Up @@ -107,7 +112,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {

// create the last sync state table
_, err = p.pool.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.schemaName+`.`+lastSyncStateTableName+` (
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
Expand All @@ -126,7 +131,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
var offset pgtype.Int8
Expand All @@ -148,7 +153,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
SELECT sync_batch_id
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)

Expand Down Expand Up @@ -180,10 +185,10 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
// update the last offset
p.logger.Info("updating last offset", slog.Int64("offset", offset))
_, err = tx.Exec(p.ctx, `
INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
DO UPDATE SET last_offset = GREATEST(`+connpostgres.QuoteIdentifier(lastSyncStateTableName)+`.last_offset, excluded.last_offset),
updated_at = NOW()
`, jobName, offset, 0)

Expand All @@ -206,7 +211,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
func (p *PostgresMetadataStore) IncrementID(jobName string) error {
p.logger.Info("incrementing sync batch id for job")
_, err := p.pool.Exec(p.ctx, `
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
UPDATE `+p.QualifyTable(lastSyncStateTableName)+`
SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1
`, jobName)
if err != nil {
Expand All @@ -219,7 +224,7 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error {

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx, `
DELETE FROM `+p.schemaName+`.`+lastSyncStateTableName+`
DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
return err
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,5 @@ func (c *S3Connector) SetupNormalizedTables(req *protos.SetupNormalizedTableBatc
}

func (c *S3Connector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
return err
}
return nil
return c.pgMetadata.DropMetadata(jobName)
}

0 comments on commit 346e34c

Please sign in to comment.