From 40d62cf767a098635b4cef8289548e9229ff5a4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 14 Dec 2023 18:52:03 +0000 Subject: [PATCH 1/3] GetLastOffset: return int64 (#825) Only keep `LastSyncState` around enough for backwards compatibility --- flow/activities/flowable.go | 9 +++++++-- flow/connectors/bigquery/bigquery.go | 15 ++++++--------- flow/connectors/core.go | 2 +- flow/connectors/eventhub/eventhub.go | 16 +++------------- flow/connectors/external_metadata/store.go | 12 ++++-------- flow/connectors/postgres/cdc.go | 6 +++--- flow/connectors/postgres/postgres.go | 17 +++++++---------- flow/connectors/s3/s3.go | 18 ++++-------------- flow/connectors/snowflake/snowflake.go | 17 +++++++---------- flow/model/model.go | 6 +++--- 10 files changed, 45 insertions(+), 73 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d29981f703..9a86549097 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -88,7 +88,12 @@ func (a *FlowableActivity) GetLastSyncedID( } defer connectors.CloseConnector(dstConn) - return dstConn.GetLastOffset(config.FlowJobName) + var lastOffset int64 + lastOffset, err = dstConn.GetLastOffset(config.FlowJobName) + if err != nil { + return nil, err + } + return &protos.LastSyncState{Checkpoint: lastOffset}, nil } // EnsurePullability implements EnsurePullability. @@ -252,7 +257,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, FlowJobName: input.FlowConnectionConfigs.FlowJobName, SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, - LastSyncState: input.LastSyncState, + LastOffset: input.LastSyncState.Checkpoint, MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(), TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index eb6ac8d198..e1f57e9c28 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -315,30 +315,27 @@ func (c *BigQueryConnector) SetupMetadataTables() error { return nil } -// GetLastOffset returns the last synced ID. -func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) { query := fmt.Sprintf("SELECT offset FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) it, err := q.Read(c.ctx) if err != nil { err = fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) - return nil, err + return 0, err } var row []bigquery.Value err = it.Next(&row) if err != nil { c.logger.Info("no row found, returning nil") - return nil, nil + return 0, nil } if row[0] == nil { c.logger.Info("no offset found, returning nil") - return nil, nil + return 0, nil } else { - return &protos.LastSyncState{ - Checkpoint: row[0].(int64), - }, nil + return row[0].(int64), nil } } @@ -497,7 +494,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if err != nil { return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID) if err != nil { diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 6c482f852a..2845e371d0 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -59,7 +59,7 @@ type CDCSyncConnector interface { SetupMetadataTables() error // GetLastOffset gets the last offset from the metadata table on the destination - GetLastOffset(jobName string) (*protos.LastSyncState, error) + GetLastOffset(jobName string) (int64, error) // GetLastSyncBatchID gets the last batch synced to the destination from the metadata table GetLastSyncBatchID(jobName string) (int64, error) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 11679e1dab..01b8510e42 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -102,21 +102,11 @@ func (c *EventHubConnector) SetupMetadataTables() error { } func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { - syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) - if err != nil { - return 0, err - } - - return syncBatchID, nil + return c.pgMetadata.GetLastBatchID(jobName) } -func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { - res, err := c.pgMetadata.FetchLastOffset(jobName) - if err != nil { - return nil, err - } - - return res, nil +func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) { + return c.pgMetadata.FetchLastOffset(jobName) } func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index ddfd6b63b0..e2127b3099 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -136,7 +136,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { return nil } -func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyncState, error) { +func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { rows := p.pool.QueryRow(p.ctx, ` SELECT last_offset FROM `+p.schemaName+`.`+lastSyncStateTableName+` @@ -147,20 +147,16 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (*protos.LastSyn if err != nil { // if the job doesn't exist, return 0 if err.Error() == "no rows in result set" { - return &protos.LastSyncState{ - Checkpoint: 0, - }, nil + return 0, nil } p.logger.Error("failed to get last offset", slog.Any("error", err)) - return nil, err + return 0, err } p.logger.Info("got last offset for job", slog.Int64("offset", offset.Int64)) - return &protos.LastSyncState{ - Checkpoint: offset.Int64, - }, nil + return offset.Int64, nil } func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index cf75fb2e2c..a56ee49edf 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -143,9 +143,9 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error { // start replication p.startLSN = 0 - if req.LastSyncState != nil && req.LastSyncState.Checkpoint > 0 { - p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastSyncState.Checkpoint)) - p.startLSN = pglogrepl.LSN(req.LastSyncState.Checkpoint + 1) + if req.LastOffset > 0 { + p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset)) + p.startLSN = pglogrepl.LSN(req.LastOffset + 1) } err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e5bec86b06..5ac489a6df 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -168,31 +168,28 @@ func (c *PostgresConnector) SetupMetadataTables() error { } // GetLastOffset returns the last synced offset for a job. -func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) { rows, err := c.pool. Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return nil, fmt.Errorf("error getting last offset for job %s: %w", jobName, err) + return 0, fmt.Errorf("error getting last offset for job %s: %w", jobName, err) } defer rows.Close() if !rows.Next() { c.logger.Info("No row found, returning nil") - return nil, nil + return 0, nil } var result pgtype.Int8 err = rows.Scan(&result) if err != nil { - return nil, fmt.Errorf("error while reading result row: %w", err) + return 0, fmt.Errorf("error while reading result row: %w", err) } if result.Int64 == 0 { - c.logger.Warn("Assuming zero offset means no sync has happened, returning nil") - return nil, nil + c.logger.Warn("Assuming zero offset means no sync has happened") } - return &protos.LastSyncState{ - Checkpoint: result.Int64, - }, nil + return result.Int64, nil } // PullRecords pulls records from the source. @@ -273,7 +270,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if err != nil { return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 records := make([][]interface{}, 0) tableNameRowsMapping := make(map[string]uint32) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 7dd102a305..6707d96200 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -167,21 +167,11 @@ func (c *S3Connector) SetupMetadataTables() error { } func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { - syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) - if err != nil { - return 0, err - } - - return syncBatchID, nil + return c.pgMetadata.GetLastBatchID(jobName) } -func (c *S3Connector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { - res, err := c.pgMetadata.FetchLastOffset(jobName) - if err != nil { - return nil, err - } - - return res, nil +func (c *S3Connector) GetLastOffset(jobName string) (int64, error) { + return c.pgMetadata.FetchLastOffset(jobName) } // update offset for a job @@ -200,7 +190,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes if err != nil { return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 tableNameRowsMapping := make(map[string]uint32) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 1d7dcba312..b1dcbb3c62 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -284,11 +284,11 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T return res, nil } -func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncState, error) { +func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) if err != nil { - return nil, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) + return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) } defer func() { // not sure if the errors these two return are same or different? @@ -300,20 +300,17 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (*protos.LastSyncStat if !rows.Next() { c.logger.Warn("No row found ,returning nil") - return nil, nil + return 0, nil } var result pgtype.Int8 err = rows.Scan(&result) if err != nil { - return nil, fmt.Errorf("error while reading result row: %w", err) + return 0, fmt.Errorf("error while reading result row: %w", err) } if result.Int64 == 0 { - c.logger.Warn("Assuming zero offset means no sync has happened, returning nil") - return nil, nil + c.logger.Warn("Assuming zero offset means no sync has happened") } - return &protos.LastSyncState{ - Checkpoint: result.Int64, - }, nil + return result.Int64, nil } func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { @@ -496,7 +493,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. if err != nil { return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 res, err := c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) if err != nil { diff --git a/flow/model/model.go b/flow/model/model.go index 320ad47e61..02949f3c2e 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -30,13 +30,13 @@ func NewNameAndExclude(name string, exclude []string) NameAndExclude { type PullRecordsRequest struct { // FlowJobName is the name of the flow job. FlowJobName string - // LastSyncedID is the last ID that was synced. - LastSyncState *protos.LastSyncState + // LastOffset is the latest LSN that was synced. + LastOffset int64 // MaxBatchSize is the max number of records to fetch. MaxBatchSize uint32 // IdleTimeout is the timeout to wait for new records. IdleTimeout time.Duration - //relId to name Mapping + // relId to name Mapping SrcTableIDNameMapping map[uint32]string // source to destination table name mapping TableNameMapping map[string]NameAndExclude From 153799127bc964c4fcd18a6eaa077c83b4504c75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 14 Dec 2023 19:14:44 +0000 Subject: [PATCH 2/3] Remove shared.CDCMirrorMonitorKey (#824) Pass catalog connection as parameter instead --- flow/activities/flowable.go | 4 +--- flow/connectors/core.go | 3 ++- flow/connectors/postgres/postgres.go | 19 ++++++++----------- flow/shared/constants.go | 7 +++---- 4 files changed, 14 insertions(+), 19 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9a86549097..a4a8b21205 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -120,7 +120,6 @@ func (a *FlowableActivity) CreateRawTable( ctx context.Context, config *protos.CreateRawTableInput, ) (*protos.CreateRawTableOutput, error) { - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool) dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig) if err != nil { return nil, fmt.Errorf("failed to get connector: %w", err) @@ -215,7 +214,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") conn := input.FlowConnectionConfigs - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool) dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -253,7 +251,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, // start a goroutine to pull records from the source errGroup.Go(func() error { - return srcConn.PullRecords(&model.PullRecordsRequest{ + return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{ FlowJobName: input.FlowConnectionConfigs.FlowJobName, SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 2845e371d0..f7a518a6a5 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -14,6 +14,7 @@ import ( connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/jackc/pgx/v5/pgxpool" ) var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality") @@ -37,7 +38,7 @@ type CDCPullConnector interface { // PullRecords pulls records from the source, and returns a RecordBatch. // This method should be idempotent, and should be able to be called multiple times with the same request. - PullRecords(req *model.PullRecordsRequest) error + PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error // PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR PullFlowCleanup(jobName string) error diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 5ac489a6df..cf2a92b5b2 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -193,7 +193,7 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) { } // PullRecords pulls records from the source. -func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error { +func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error { defer func() { req.RecordStream.Close() }() @@ -246,16 +246,13 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error { return err } - catalogPool, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*pgxpool.Pool) - if ok { - latestLSN, err := c.getCurrentLSN() - if err != nil { - return fmt.Errorf("failed to get current LSN: %w", err) - } - err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, catalogPool, req.FlowJobName, latestLSN) - if err != nil { - return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err) - } + latestLSN, err := c.getCurrentLSN() + if err != nil { + return fmt.Errorf("failed to get current LSN: %w", err) + } + err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, catalogPool, req.FlowJobName, latestLSN) + if err != nil { + return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err) } return nil diff --git a/flow/shared/constants.go b/flow/shared/constants.go index e49de60189..8379b6718f 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -23,10 +23,9 @@ const ( ShutdownSignal PauseSignal - CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor" - FlowNameKey ContextKey = "flowName" - PartitionIDKey ContextKey = "partitionId" - DeploymentUIDKey ContextKey = "deploymentUid" + FlowNameKey ContextKey = "flowName" + PartitionIDKey ContextKey = "partitionId" + DeploymentUIDKey ContextKey = "deploymentUid" ) type TaskQueueID int64 From f62f3e2fc701cc83fedfc1af74302eb5cf54ab57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 14 Dec 2023 19:26:28 +0000 Subject: [PATCH 3/3] Refactor jobMetadataExists in postgres/snowflake connectors (#826) Use QueryRow instead of Query, & introduce a Tx variant which can reuse existing connection --- flow/connectors/postgres/client.go | 19 +++++++++++-------- flow/connectors/snowflake/snowflake.go | 26 +++++++++++--------------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 871fd7403b..fd4a162e15 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -412,16 +412,19 @@ func (c *PostgresConnector) getLastNormalizeBatchID(jobName string) (int64, erro } func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) { - rows, err := c.pool.Query(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) + var result pgtype.Bool + err := c.pool.QueryRow(c.ctx, + fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { - return false, fmt.Errorf("failed to check if job exists: %w", err) + return false, fmt.Errorf("error reading result row: %w", err) } - defer rows.Close() + return result.Bool, nil +} +func (c *PostgresConnector) jobMetadataExistsTx(tx pgx.Tx, jobName string) (bool, error) { var result pgtype.Bool - rows.Next() - err = rows.Scan(&result) + err := tx.QueryRow(c.ctx, + fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { return false, fmt.Errorf("error reading result row: %w", err) } @@ -440,7 +443,7 @@ func (c *PostgresConnector) majorVersionCheck(majorVersion int) (bool, error) { func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64, syncBatchID int64, syncRecordsTx pgx.Tx) error { - jobMetadataExists, err := c.jobMetadataExists(flowJobName) + jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) } @@ -466,7 +469,7 @@ func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64, func (c *PostgresConnector) updateNormalizeMetadata(flowJobName string, normalizeBatchID int64, normalizeRecordsTx pgx.Tx) error { - jobMetadataExists, err := c.jobMetadataExists(flowJobName) + jobMetadataExists, err := c.jobMetadataExistsTx(normalizeRecordsTx, flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index b1dcbb3c62..9920cdad36 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -731,15 +731,8 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { } func (c *SnowflakeConnector) checkIfTableExists(schemaIdentifier string, tableIdentifier string) (bool, error) { - rows, err := c.database.QueryContext(c.ctx, checkIfTableExistsSQL, schemaIdentifier, tableIdentifier) - if err != nil { - return false, err - } - - // this query is guaranteed to return exactly one row var result pgtype.Bool - rows.Next() - err = rows.Scan(&result) + err := c.database.QueryRowContext(c.ctx, checkIfTableExistsSQL, schemaIdentifier, tableIdentifier).Scan(&result) if err != nil { return false, fmt.Errorf("error while reading result row: %w", err) } @@ -926,15 +919,18 @@ func parseTableName(tableName string) (*tableNameComponents, error) { } func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) { - rows, err := c.database.QueryContext(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) + var result pgtype.Bool + err := c.database.QueryRowContext(c.ctx, + fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { - return false, fmt.Errorf("failed to check if job exists: %w", err) + return false, fmt.Errorf("error reading result row: %w", err) } - + return result.Bool, nil +} +func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) { var result pgtype.Bool - rows.Next() - err = rows.Scan(&result) + err := tx.QueryRowContext(c.ctx, + fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { return false, fmt.Errorf("error reading result row: %w", err) } @@ -943,7 +939,7 @@ func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) { func (c *SnowflakeConnector) updateSyncMetadata(flowJobName string, lastCP int64, syncBatchID int64, syncRecordsTx *sql.Tx) error { - jobMetadataExists, err := c.jobMetadataExists(flowJobName) + jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) }