diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index 457a729d1..6e66a6123 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -73,7 +73,7 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string if err != nil { return 0, fmt.Errorf("failed to parse table name %s: %w", table, err) } - //nolint:gosec + row := c.database.QueryRowContext(ctx, "SELECT COUNT(*) FROM "+table) var count pgtype.Int8 err = row.Scan(&count) diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 21bd122cc..e67678843 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -61,9 +61,7 @@ func (c *SnowflakeConnector) getTableSchema(ctx context.Context, tableName strin return nil, fmt.Errorf("failed to parse table '%s'", tableName) } - //nolint:gosec queryString := fmt.Sprintf("SELECT * FROM %s LIMIT 0", snowflakeSchemaTableNormalize(schematable)) - //nolint:rowserrcheck rows, err := c.database.QueryContext(ctx, queryString) if err != nil { @@ -422,8 +420,12 @@ func (c *SnowflakeConnector) processRowsStream( return numRows, nil } -func (c *SnowflakeConnector) PullQRepRecordStream(ctx context.Context, config *protos.QRepConfig, - partition *protos.QRepPartition, stream *model.QRecordStream) (int, error) { +func (c *SnowflakeConnector) PullQRepRecordStream( + ctx context.Context, + config *protos.QRepConfig, + partition *protos.QRepPartition, + stream *model.QRecordStream, +) (int, error) { if !partition.FullTablePartition { return 0, errors.New("only full table partitions are supported") } diff --git a/flow/connectors/snowflake/qrep_avro_consolidate.go b/flow/connectors/snowflake/qrep_avro_consolidate.go index 707afd878..90114a69b 100644 --- a/flow/connectors/snowflake/qrep_avro_consolidate.go +++ b/flow/connectors/snowflake/qrep_avro_consolidate.go @@ -203,8 +203,6 @@ func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context) } tempTableName := fmt.Sprintf("%s_temp_%d", s.dstTableName, runID) - - //nolint:gosec createTempTableCmd := fmt.Sprintf("CREATE TEMPORARY TABLE %s AS SELECT * FROM %s LIMIT 0", tempTableName, s.dstTableName) if _, err := s.connector.database.ExecContext(ctx, createTempTableCmd); err != nil { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 9f569dc0a..5cf1eb73f 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -521,14 +521,15 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No return nil, fmt.Errorf("canceled while normalizing records: %w", err) } + table := tableName g.Go(func() error { mergeGen := &mergeStmtGenerator{ rawTableName: getRawTableIdentifier(req.FlowJobName), - dstTableName: tableName, + dstTableName: table, syncBatchID: req.SyncBatchID, normalizeBatchID: normBatchID, - normalizedTableSchema: req.TableNameSchemaMapping[tableName], - unchangedToastColumns: tableNameToUnchangedToastCols[tableName], + normalizedTableSchema: req.TableNameSchemaMapping[table], + unchangedToastColumns: tableNameToUnchangedToastCols[table], peerdbCols: &protos.PeerDBColumns{ SoftDelete: req.SoftDelete, SoftDeleteColName: req.SoftDeleteColName, @@ -541,12 +542,12 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No } startTime := time.Now() - c.logger.Info("[merge] merging records...", "destTable", tableName) + c.logger.Info("[merge] merging records...", "destTable", table) - result, err := c.database.ExecContext(gCtx, mergeStatement, tableName) + result, err := c.database.ExecContext(gCtx, mergeStatement, table) if err != nil { return fmt.Errorf("failed to merge records into %s (statement: %s): %w", - tableName, mergeStatement, err) + table, mergeStatement, err) } endTime := time.Now() @@ -555,7 +556,7 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No rowsAffected, err := result.RowsAffected() if err != nil { - return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err) + return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", table, err) } atomic.AddInt64(&totalRowsAffected, rowsAffected) @@ -881,7 +882,7 @@ func (c *SnowflakeConnector) GetTableSchema( return nil, err } res[tableName] = tableSchema - utils.RecordHeartbeat(ctx, fmt.Sprintf("fetched schema for table %s", tableName)) + utils.RecordHeartbeat(ctx, "fetched schema for table "+tableName) } return &protos.GetTableSchemaBatchOutput{