Skip to content

Commit

Permalink
bq: fix line length. sf: cleanup SetLastOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 14, 2023
1 parent c05f476 commit d601ed8
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 22 deletions.
8 changes: 7 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,13 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
}

func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error {
query := fmt.Sprintf("UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, lastOffset, jobName)
query := fmt.Sprintf(
"UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'",
c.datasetID,
MirrorJobsTable,
lastOffset,
jobName,
)
q := c.client.Query(query)
_, err := q.Read(c.ctx)
if err != nil {
Expand Down
22 changes: 1 addition & 21 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,31 +316,11 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
}

func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) error {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(setLastOffsetSQL,
_, err := c.database.ExecContext(c.ctx, fmt.Sprintf(setLastOffsetSQL,
c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName)
if err != nil {
return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
}
defer func() {
// not sure if the errors these two return are same or different?
err = errors.Join(rows.Close(), rows.Err())
if err != nil {
c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err))
}
}()

if !rows.Next() {
c.logger.Warn("No row found, returning 0")
return nil
}
var result pgtype.Int8
err = rows.Scan(&result)
if err != nil {
return fmt.Errorf("error while reading result row: %w", err)
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened, returning nil")
}
return nil
}

Expand Down

0 comments on commit d601ed8

Please sign in to comment.