diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5dfab4bc9d..f9c8b5582f 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -315,7 +315,6 @@ func (c *BigQueryConnector) SetupMetadataTables() error { return nil } -// GetLastOffset returns the last synced ID. func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) { query := fmt.Sprintf("SELECT offset FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) q := c.client.Query(query) @@ -512,7 +511,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if err != nil { return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID) if err != nil { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index f42d38a2de..0100391b3b 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -102,21 +102,11 @@ func (c *EventHubConnector) SetupMetadataTables() error { } func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) { - syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) - if err != nil { - return 0, err - } - - return syncBatchID, nil + return c.pgMetadata.GetLastBatchID(jobName) } func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) { - res, err := c.pgMetadata.FetchLastOffset(jobName) - if err != nil { - return 0, err - } - - return res, nil + return c.pgMetadata.FetchLastOffset(jobName) } func (c *EventHubConnector) SetLastOffset(jobName string, offset int64) error { diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 0c14ad1035..13e0acb2c0 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -419,7 +419,6 @@ func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) { if err != nil { return false, fmt.Errorf("error reading result row: %w", err) } - return result.Bool, nil } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 7d6fbc93aa..da281dcdbb 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -167,21 +167,11 @@ func (c *S3Connector) SetupMetadataTables() error { } func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) { - syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName) - if err != nil { - return 0, err - } - - return syncBatchID, nil + return c.pgMetadata.GetLastBatchID(jobName) } func (c *S3Connector) GetLastOffset(jobName string) (int64, error) { - res, err := c.pgMetadata.FetchLastOffset(jobName) - if err != nil { - return 0, err - } - - return res, nil + return c.pgMetadata.FetchLastOffset(jobName) } // update offset for a job @@ -200,7 +190,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes if err != nil { return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 tableNameRowsMapping := make(map[string]uint32) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 6466457091..dd5c272426 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -309,7 +309,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { return 0, fmt.Errorf("error while reading result row: %w", err) } if result.Int64 == 0 { - c.logger.Warn("Assuming zero offset means no sync has happened, returning nil") + c.logger.Warn("Assuming zero offset means no sync has happened") return 0, nil } return result.Int64, nil @@ -504,7 +504,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. if err != nil { return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 res, err := c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) if err != nil { @@ -742,15 +742,8 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { } func (c *SnowflakeConnector) checkIfTableExists(schemaIdentifier string, tableIdentifier string) (bool, error) { - rows, err := c.database.QueryContext(c.ctx, checkIfTableExistsSQL, schemaIdentifier, tableIdentifier) - if err != nil { - return false, err - } - - // this query is guaranteed to return exactly one row var result pgtype.Bool - rows.Next() - err = rows.Scan(&result) + err := c.database.QueryRowContext(c.ctx, checkIfTableExistsSQL, schemaIdentifier, tableIdentifier).Scan(&result) if err != nil { return false, fmt.Errorf("error while reading result row: %w", err) } @@ -937,15 +930,18 @@ func parseTableName(tableName string) (*tableNameComponents, error) { } func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) { - rows, err := c.database.QueryContext(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) + var result pgtype.Bool + err := c.database.QueryRowContext(c.ctx, + fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { - return false, fmt.Errorf("failed to check if job exists: %w", err) + return false, fmt.Errorf("error reading result row: %w", err) } - + return result.Bool, nil +} +func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) { var result pgtype.Bool - rows.Next() - err = rows.Scan(&result) + err := tx.QueryRowContext(c.ctx, + fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { return false, fmt.Errorf("error reading result row: %w", err) } @@ -954,7 +950,7 @@ func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) { func (c *SnowflakeConnector) updateSyncMetadata(flowJobName string, lastCP int64, syncBatchID int64, syncRecordsTx *sql.Tx) error { - jobMetadataExists, err := c.jobMetadataExists(flowJobName) + jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) }