Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lint: rowserrcheck #1164

Merged
merged 1 commit into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ linters:
- nonamedreturns
- perfsprint
- prealloc
- rowserrcheck
- staticcheck
- stylecheck
- sqlclosecheck
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (c *ClickhouseConnector) createMetadataInsertStatement(
func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) {
//nolint:gosec
queryString := fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, tableName)
//nolint:rowserrcheck
rows, err := c.database.Query(queryString)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
Expand Down
17 changes: 5 additions & 12 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,13 @@ func (c *PostgresConnector) SetupMetadataTables() error {

// GetLastOffset returns the last synced offset for a job.
func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) {
rows, err := c.pool.
Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return 0, fmt.Errorf("error getting last offset for job %s: %w", jobName, err)
}
defer rows.Close()

if !rows.Next() {
c.logger.Info("No row found, returning nil")
return 0, nil
}
var result pgtype.Int8
err = rows.Scan(&result)
err := c.pool.QueryRow(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result)
if err != nil {
if err == pgx.ErrNoRows {
c.logger.Info("No row found, returning nil")
return 0, nil
}
return 0, fmt.Errorf("error while reading result row: %w", err)
}

Expand Down
12 changes: 7 additions & 5 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,9 @@ func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType
}

//nolint:gosec
queryString := fmt.Sprintf(`
SELECT *
FROM %s
LIMIT 0
`, snowflakeSchemaTableNormalize(schematable))
queryString := fmt.Sprintf("SELECT * FROM %s LIMIT 0", snowflakeSchemaTableNormalize(schematable))

//nolint:rowserrcheck
rows, err := c.database.QueryContext(c.ctx, queryString)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
Expand Down Expand Up @@ -303,6 +300,11 @@ func (c *SnowflakeConnector) getColsFromTable(tableName string) ([]string, []str
colTypes = append(colTypes, colType.String)
}

err = rows.Err()
if err != nil {
return nil, nil, fmt.Errorf("failed to read rows: %w", err)
}

if len(colNames) == 0 {
return nil, nil, fmt.Errorf("cannot load schema: table %s.%s does not exist", schemaTable.Schema, schemaTable.Table)
}
Expand Down
64 changes: 23 additions & 41 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,25 +324,14 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T
}

func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL,
c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
}
defer func() {
err = rows.Close()
if err != nil {
c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err))
}
}()

if !rows.Next() {
c.logger.Warn("No row found, returning 0")
return 0, nil
}
var result pgtype.Int8
err = rows.Scan(&result)
err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastOffsetSQL,
c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result)
if err != nil {
if err == sql.ErrNoRows {
c.logger.Warn("No row found, returning 0")
return 0, nil
}
return 0, fmt.Errorf("error while reading result row: %w", err)
}
if result.Int64 == 0 {
Expand All @@ -362,40 +351,28 @@ func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) err
}

func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema,
mirrorJobsTableIdentifier), jobName)
if err != nil {
return 0, fmt.Errorf("error querying Snowflake peer for last syncBatchId: %w", err)
}
defer rows.Close()

var result pgtype.Int8
if !rows.Next() {
c.logger.Warn("No row found, returning 0")
return 0, nil
}
err = rows.Scan(&result)
err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema,
mirrorJobsTableIdentifier), jobName).Scan(&result)
if err != nil {
if err == sql.ErrNoRows {
c.logger.Warn("No row found, returning 0")
return 0, nil
}
return 0, fmt.Errorf("error while reading result row: %w", err)
}
return result.Int64, nil
}

func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema,
mirrorJobsTableIdentifier), jobName)
if err != nil {
return 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err)
}
defer rows.Close()

var normBatchID pgtype.Int8
if !rows.Next() {
c.logger.Warn("No row found, returning 0")
return 0, nil
}
err = rows.Scan(&normBatchID)
err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema,
mirrorJobsTableIdentifier), jobName).Scan(&normBatchID)
if err != nil {
if err == sql.ErrNoRows {
c.logger.Warn("No row found, returning 0")
return 0, nil
}
return 0, fmt.Errorf("error while reading result row: %w", err)
}
return normBatchID.Int64, nil
Expand All @@ -422,6 +399,11 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, sy
}
destinationTableNames = append(destinationTableNames, result.String)
}

err = rows.Err()
if err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}
return destinationTableNames, nil
}

Expand Down
Loading