diff --git a/flow/.golangci.yml b/flow/.golangci.yml index 638fab22f9..6564caf63e 100644 --- a/flow/.golangci.yml +++ b/flow/.golangci.yml @@ -21,6 +21,7 @@ linters: - nonamedreturns - perfsprint - prealloc + - rowserrcheck - staticcheck - stylecheck - sqlclosecheck diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 0695d955bf..b6a8b59cc5 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -79,6 +79,7 @@ func (c *ClickhouseConnector) createMetadataInsertStatement( func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) { //nolint:gosec queryString := fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, tableName) + //nolint:rowserrcheck rows, err := c.database.Query(queryString) if err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 231d5f78f2..e14c05e7d9 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -162,20 +162,13 @@ func (c *PostgresConnector) SetupMetadataTables() error { // GetLastOffset returns the last synced offset for a job. func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) { - rows, err := c.pool. - Query(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) - if err != nil { - return 0, fmt.Errorf("error getting last offset for job %s: %w", jobName, err) - } - defer rows.Close() - - if !rows.Next() { - c.logger.Info("No row found, returning nil") - return 0, nil - } var result pgtype.Int8 - err = rows.Scan(&result) + err := c.pool.QueryRow(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { + if err == pgx.ErrNoRows { + c.logger.Info("No row found, returning nil") + return 0, nil + } return 0, fmt.Errorf("error while reading result row: %w", err) } diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index b4c2642f57..4361b17881 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -82,12 +82,9 @@ func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType } //nolint:gosec - queryString := fmt.Sprintf(` - SELECT * - FROM %s - LIMIT 0 - `, snowflakeSchemaTableNormalize(schematable)) + queryString := fmt.Sprintf("SELECT * FROM %s LIMIT 0", snowflakeSchemaTableNormalize(schematable)) + //nolint:rowserrcheck rows, err := c.database.QueryContext(c.ctx, queryString) if err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) @@ -303,6 +300,11 @@ func (c *SnowflakeConnector) getColsFromTable(tableName string) ([]string, []str colTypes = append(colTypes, colType.String) } + err = rows.Err() + if err != nil { + return nil, nil, fmt.Errorf("failed to read rows: %w", err) + } + if len(colNames) == 0 { return nil, nil, fmt.Errorf("cannot load schema: table %s.%s does not exist", schemaTable.Schema, schemaTable.Table) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index ac8e8badcf..e70f04d6c7 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -324,25 +324,14 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T } func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, - c.metadataSchema, mirrorJobsTableIdentifier), jobName) - if err != nil { - return 0, fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) - } - defer func() { - err = rows.Close() - if err != nil { - c.logger.Error("error while closing rows for reading last offset", slog.Any("error", err)) - } - }() - - if !rows.Next() { - c.logger.Warn("No row found, returning 0") - return 0, nil - } var result pgtype.Int8 - err = rows.Scan(&result) + err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, + c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { + if err == sql.ErrNoRows { + c.logger.Warn("No row found, returning 0") + return 0, nil + } return 0, fmt.Errorf("error while reading result row: %w", err) } if result.Int64 == 0 { @@ -362,40 +351,28 @@ func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) err } func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema, - mirrorJobsTableIdentifier), jobName) - if err != nil { - return 0, fmt.Errorf("error querying Snowflake peer for last syncBatchId: %w", err) - } - defer rows.Close() - var result pgtype.Int8 - if !rows.Next() { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - err = rows.Scan(&result) + err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema, + mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { + if err == sql.ErrNoRows { + c.logger.Warn("No row found, returning 0") + return 0, nil + } return 0, fmt.Errorf("error while reading result row: %w", err) } return result.Int64, nil } func (c *SnowflakeConnector) GetLastNormalizeBatchID(jobName string) (int64, error) { - rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema, - mirrorJobsTableIdentifier), jobName) - if err != nil { - return 0, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) - } - defer rows.Close() - var normBatchID pgtype.Int8 - if !rows.Next() { - c.logger.Warn("No row found, returning 0") - return 0, nil - } - err = rows.Scan(&normBatchID) + err := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema, + mirrorJobsTableIdentifier), jobName).Scan(&normBatchID) if err != nil { + if err == sql.ErrNoRows { + c.logger.Warn("No row found, returning 0") + return 0, nil + } return 0, fmt.Errorf("error while reading result row: %w", err) } return normBatchID.Int64, nil @@ -422,6 +399,11 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, sy } destinationTableNames = append(destinationTableNames, result.String) } + + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("failed to read rows: %w", err) + } return destinationTableNames, nil }