diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 3be58019b..93649ffcd 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -139,22 +139,14 @@ func (p *PostgresMetadataStore) SetupMetadata() error { partition_id TEXT NOT NULL, sync_partition JSON NOT NULL, sync_start_time TIMESTAMPTZ NOT NULL, - sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW() + sync_finish_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY(job_name, partition_id) )`) if err != nil && !utils.IsUniqueError(err) { p.logger.Error("failed to create qrep metadata table", slog.Any("error", err)) return err } - _, err = p.conn.Exec(p.ctx, - `CREATE INDEX IF NOT EXISTS ix_qrep_metadata_partition_id ON `+ - p.QualifyTable(qrepTableName)+ - ` USING hash (partition_id)`) - if err != nil && !utils.IsUniqueError(err) { - p.logger.Error("failed to create qrep metadata index", slog.Any("error", err)) - return err - } - p.logger.Info(fmt.Sprintf("created external metadata table %s.%s", p.schemaName, lastSyncStateTableName)) return nil } @@ -287,22 +279,23 @@ func (p *PostgresMetadataStore) FinishQrepPartition( _, err = p.conn.Exec(p.ctx, `INSERT INTO `+p.QualifyTable(qrepTableName)+ - `(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4)`, + `(job_name, partition_id, sync_partition, sync_start_time) VALUES ($1, $2, $3, $4) + ON CONFLICT (job_name, partition_id) DO UPDATE SET sync_partition = $3, sync_start_time = $4, sync_finish_time = NOW()`, jobName, partition.PartitionId, partitionJSON, startTime) return err } -func (p *PostgresMetadataStore) IsQrepPartitionSynced(partitionID string) (bool, error) { - var count int64 +func (p *PostgresMetadataStore) IsQrepPartitionSynced(jobName string, partitionID string) (bool, error) { + var exists bool err := p.conn.QueryRow(p.ctx, - `SELECT COUNT(*) FROM `+ + `SELECT EXISTS(SELECT * FROM `+ p.QualifyTable(qrepTableName)+ - ` WHERE partition_id = $1`, - partitionID).Scan(&count) + ` WHERE job_name = $1 AND partition_id = $2)`, + jobName, partitionID).Scan(&exists) if err != nil { return false, fmt.Errorf("failed to execute query: %w", err) } - return count > 0, nil + return exists, nil } func (p *PostgresMetadataStore) DropMetadata(jobName string) error { diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index e3037d298..a5bd38fe0 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -33,7 +33,7 @@ func (c *SnowflakeConnector) SyncQRepRecords( } c.logger.Info("Called QRep sync function and obtained table schema", flowLog) - done, err := c.pgMetadata.IsQrepPartitionSynced(partition.PartitionId) + done, err := c.pgMetadata.IsQrepPartitionSynced(config.FlowJobName, partition.PartitionId) if err != nil { return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) }