Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into start-lsn-investigation
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 14, 2023
2 parents d601ed8 + f62f3e2 commit ef83b6f
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 45 deletions.
3 changes: 1 addition & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ func (c *BigQueryConnector) SetupMetadataTables() error {
return nil
}

// GetLastOffset returns the last synced ID.
func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT offset FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
Expand Down Expand Up @@ -512,7 +511,7 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1

res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID)
if err != nil {
Expand Down
14 changes: 2 additions & 12 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,11 @@ func (c *EventHubConnector) SetupMetadataTables() error {
}

func (c *EventHubConnector) GetLastSyncBatchID(jobName string) (int64, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName)
if err != nil {
return 0, err
}

return syncBatchID, nil
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return 0, err
}

return res, nil
return c.pgMetadata.FetchLastOffset(jobName)
}

func (c *EventHubConnector) SetLastOffset(jobName string, offset int64) error {
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@ func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) {
if err != nil {
return false, fmt.Errorf("error reading result row: %w", err)
}

return result.Bool, nil
}

Expand Down
16 changes: 3 additions & 13 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,11 @@ func (c *S3Connector) SetupMetadataTables() error {
}

func (c *S3Connector) GetLastSyncBatchID(jobName string) (int64, error) {
syncBatchID, err := c.pgMetadata.GetLastBatchID(jobName)
if err != nil {
return 0, err
}

return syncBatchID, nil
return c.pgMetadata.GetLastBatchID(jobName)
}

func (c *S3Connector) GetLastOffset(jobName string) (int64, error) {
res, err := c.pgMetadata.FetchLastOffset(jobName)
if err != nil {
return 0, err
}

return res, nil
return c.pgMetadata.FetchLastOffset(jobName)
}

// update offset for a job
Expand All @@ -200,7 +190,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
if err != nil {
return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1

tableNameRowsMapping := make(map[string]uint32)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
Expand Down
30 changes: 13 additions & 17 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
return 0, fmt.Errorf("error while reading result row: %w", err)
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened, returning nil")
c.logger.Warn("Assuming zero offset means no sync has happened")
return 0, nil
}
return result.Int64, nil
Expand Down Expand Up @@ -504,7 +504,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.
if err != nil {
return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1

res, err := c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID)
if err != nil {
Expand Down Expand Up @@ -742,15 +742,8 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error {
}

func (c *SnowflakeConnector) checkIfTableExists(schemaIdentifier string, tableIdentifier string) (bool, error) {
rows, err := c.database.QueryContext(c.ctx, checkIfTableExistsSQL, schemaIdentifier, tableIdentifier)
if err != nil {
return false, err
}

// this query is guaranteed to return exactly one row
var result pgtype.Bool
rows.Next()
err = rows.Scan(&result)
err := c.database.QueryRowContext(c.ctx, checkIfTableExistsSQL, schemaIdentifier, tableIdentifier).Scan(&result)
if err != nil {
return false, fmt.Errorf("error while reading result row: %w", err)
}
Expand Down Expand Up @@ -937,15 +930,18 @@ func parseTableName(tableName string) (*tableNameComponents, error) {
}

func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) {
rows, err := c.database.QueryContext(c.ctx,
fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
var result pgtype.Bool
err := c.database.QueryRowContext(c.ctx,
fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result)
if err != nil {
return false, fmt.Errorf("failed to check if job exists: %w", err)
return false, fmt.Errorf("error reading result row: %w", err)
}

return result.Bool, nil
}
func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) {
var result pgtype.Bool
rows.Next()
err = rows.Scan(&result)
err := tx.QueryRowContext(c.ctx,
fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result)
if err != nil {
return false, fmt.Errorf("error reading result row: %w", err)
}
Expand All @@ -954,7 +950,7 @@ func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) {

func (c *SnowflakeConnector) updateSyncMetadata(flowJobName string, lastCP int64,
syncBatchID int64, syncRecordsTx *sql.Tx) error {
jobMetadataExists, err := c.jobMetadataExists(flowJobName)
jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName)
if err != nil {
return fmt.Errorf("failed to get sync status for flow job: %w", err)
}
Expand Down

0 comments on commit ef83b6f

Please sign in to comment.