diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 08f2938e73..ddf4da7108 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -181,7 +181,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { } func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) { - rows := p.pool.QueryRow(p.ctx, ` + rows := p.conn.QueryRow(p.ctx, ` SELECT normalize_batch_id FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 @@ -254,7 +254,7 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error { func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error { p.logger.Info("updating normalize batch id for job") - _, err := p.pool.Exec(p.ctx, ` + _, err := p.conn.Exec(p.ctx, ` UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` SET normalize_batch_id=$2 WHERE job_name=$1 `, jobName, batchID)