From 1d8f1646c0d8727297ce467087cd2ce983e45ecb Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 28 Feb 2024 02:38:08 +0530 Subject: [PATCH] lint 2 --- flow/connectors/snowflake/client.go | 2 +- flow/connectors/snowflake/qrep.go | 10 ++++++---- .../snowflake/qrep_avro_consolidate.go | 2 -- flow/connectors/snowflake/snowflake.go | 19 ++++++++++--------- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index 457a729d18..6e66a6123b 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -73,7 +73,7 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string if err != nil { return 0, fmt.Errorf("failed to parse table name %s: %w", table, err) } - //nolint:gosec + row := c.database.QueryRowContext(ctx, "SELECT COUNT(*) FROM "+table) var count pgtype.Int8 err = row.Scan(&count) diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 21bd122ccd..e676788439 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -61,9 +61,7 @@ func (c *SnowflakeConnector) getTableSchema(ctx context.Context, tableName strin return nil, fmt.Errorf("failed to parse table '%s'", tableName) } - //nolint:gosec queryString := fmt.Sprintf("SELECT * FROM %s LIMIT 0", snowflakeSchemaTableNormalize(schematable)) - //nolint:rowserrcheck rows, err := c.database.QueryContext(ctx, queryString) if err != nil { @@ -422,8 +420,12 @@ func (c *SnowflakeConnector) processRowsStream( return numRows, nil } -func (c *SnowflakeConnector) PullQRepRecordStream(ctx context.Context, config *protos.QRepConfig, - partition *protos.QRepPartition, stream *model.QRecordStream) (int, error) { +func (c *SnowflakeConnector) PullQRepRecordStream( + ctx context.Context, + config *protos.QRepConfig, + partition *protos.QRepPartition, + stream *model.QRecordStream, +) (int, error) { if !partition.FullTablePartition { return 0, errors.New("only full table partitions are supported") } diff --git a/flow/connectors/snowflake/qrep_avro_consolidate.go b/flow/connectors/snowflake/qrep_avro_consolidate.go index 707afd878c..90114a69b0 100644 --- a/flow/connectors/snowflake/qrep_avro_consolidate.go +++ b/flow/connectors/snowflake/qrep_avro_consolidate.go @@ -203,8 +203,6 @@ func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context) } tempTableName := fmt.Sprintf("%s_temp_%d", s.dstTableName, runID) - - //nolint:gosec createTempTableCmd := fmt.Sprintf("CREATE TEMPORARY TABLE %s AS SELECT * FROM %s LIMIT 0", tempTableName, s.dstTableName) if _, err := s.connector.database.ExecContext(ctx, createTempTableCmd); err != nil { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index da67998075..34901bff5a 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -517,14 +517,15 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No g.SetLimit(8) // limit parallel merges to 8 for _, tableName := range destinationTableNames { + table := tableName g.Go(func() error { mergeGen := &mergeStmtGenerator{ rawTableName: getRawTableIdentifier(req.FlowJobName), - dstTableName: tableName, + dstTableName: table, syncBatchID: req.SyncBatchID, normalizeBatchID: normBatchID, - normalizedTableSchema: req.TableNameSchemaMapping[tableName], - unchangedToastColumns: tableNameToUnchangedToastCols[tableName], + normalizedTableSchema: req.TableNameSchemaMapping[table], + unchangedToastColumns: tableNameToUnchangedToastCols[table], peerdbCols: &protos.PeerDBColumns{ SoftDelete: req.SoftDelete, SoftDeleteColName: req.SoftDeleteColName, @@ -537,17 +538,17 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No } startTime := time.Now() - c.logger.Info("[merge] merging records...", "destTable", tableName) + c.logger.Info("[merge] merging records...", "destTable", table) - result, err := c.database.ExecContext(gCtx, mergeStatement, tableName) + result, err := c.database.ExecContext(gCtx, mergeStatement, table) if err != nil { return fmt.Errorf("failed to merge records into %s (statement: %s): %w", - tableName, mergeStatement, err) + table, mergeStatement, err) } endTime := time.Now() c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds", - tableName, endTime.Sub(startTime)/time.Second)) + table, endTime.Sub(startTime)/time.Second)) if err != nil { c.logger.Error("[merge] error while normalizing records", "error", err) return err @@ -555,7 +556,7 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No rowsAffected, err := result.RowsAffected() if err != nil { - return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err) + return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", table, err) } atomic.AddInt64(&totalRowsAffected, rowsAffected) @@ -881,7 +882,7 @@ func (c *SnowflakeConnector) GetTableSchema( return nil, err } res[tableName] = tableSchema - utils.RecordHeartbeat(ctx, fmt.Sprintf("fetched schema for table %s", tableName)) + utils.RecordHeartbeat(ctx, "fetched schema for table "+tableName) } return &protos.GetTableSchemaBatchOutput{