Skip to content

Commit

Permalink
qrep metadata: set primary key on (job_name, partition_id), ON CONFLI…
Browse files Browse the repository at this point in the history
…CT DO UPDATE
  • Loading branch information
serprex committed Jan 31, 2024
1 parent 329d77c commit 079df00
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 18 deletions.
27 changes: 10 additions & 17 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,14 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
partition_id TEXT NOT NULL,
sync_partition JSON NOT NULL,
sync_start_time TIMESTAMPTZ NOT NULL,
sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW()
sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY(job_name, partition_id)
)`)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create qrep metadata table", slog.Any("error", err))
return err
}

_, err = p.conn.Exec(p.ctx,
`CREATE INDEX IF NOT EXISTS ix_qrep_metadata_partition_id ON `+
p.QualifyTable(qrepTableName)+
` USING hash (partition_id)`)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create qrep metadata index", slog.Any("error", err))
return err
}

p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName))
return nil
}
Expand Down Expand Up @@ -287,22 +279,23 @@ func (p *PostgresMetadataStore) FinishQrepPartition(

_, err = p.conn.Exec(p.ctx,
`INSERT INTO `+p.QualifyTable(qrepTableName)+
`(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4)`,
`(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4)
ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`,
jobName, partition.PartitionId, partitionJSON, startTime)
return err
}

func (p *PostgresMetadataStore) IsQrepPartitionSynced(partitionID string) (bool, error) {
var count int64
func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) {
var exists bool
err := p.conn.QueryRow(p.ctx,
`SELECT COUNT(*) FROM `+
`SELECT EXISTS(SELECT * FROM `+
p.QualifyTable(qrepTableName)+
` WHERE partition_id = $1`,
partitionID).Scan(&count)
` WHERE job_name = $1 AND partition_id = $2)`,
jobName, partitionID).Scan(&exists)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}
return count > 0, nil
return exists, nil
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *SnowflakeConnector) SyncQRepRecords(
}
c.logger.Info("Called QRep sync function and obtained table schema", flowLog)

done, err := c.pgMetadata.IsQrepPartitionSynced(partition.PartitionId)
done, err := c.pgMetadata.IsQrepPartitionSynced(config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}
Expand Down

0 comments on commit 079df00

Please sign in to comment.