Skip to content

Commit

Permalink
lint 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 27, 2024
1 parent a692b5a commit 1d8f164
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 16 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string
if err != nil {
return 0, fmt.Errorf("failed to parse table name %s: %w", table, err)
}
//nolint:gosec

row := c.database.QueryRowContext(ctx, "SELECT COUNT(*) FROM "+table)
var count pgtype.Int8
err = row.Scan(&count)
Expand Down
10 changes: 6 additions & 4 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ func (c *SnowflakeConnector) getTableSchema(ctx context.Context, tableName strin
return nil, fmt.Errorf("failed to parse table '%s'", tableName)
}

//nolint:gosec
queryString := fmt.Sprintf("SELECT * FROM %s LIMIT 0", snowflakeSchemaTableNormalize(schematable))

//nolint:rowserrcheck
rows, err := c.database.QueryContext(ctx, queryString)
if err != nil {
Expand Down Expand Up @@ -422,8 +420,12 @@ func (c *SnowflakeConnector) processRowsStream(
return numRows, nil
}

func (c *SnowflakeConnector) PullQRepRecordStream(ctx context.Context, config *protos.QRepConfig,
partition *protos.QRepPartition, stream *model.QRecordStream) (int, error) {
func (c *SnowflakeConnector) PullQRepRecordStream(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
if !partition.FullTablePartition {
return 0, errors.New("only full table partitions are supported")
}
Expand Down
2 changes: 0 additions & 2 deletions flow/connectors/snowflake/qrep_avro_consolidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context)
}

tempTableName := fmt.Sprintf("%s_temp_%d", s.dstTableName, runID)

//nolint:gosec
createTempTableCmd := fmt.Sprintf("CREATE TEMPORARY TABLE %s AS SELECT * FROM %s LIMIT 0",
tempTableName, s.dstTableName)
if _, err := s.connector.database.ExecContext(ctx, createTempTableCmd); err != nil {
Expand Down
19 changes: 10 additions & 9 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,14 +517,15 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No
g.SetLimit(8) // limit parallel merges to 8

for _, tableName := range destinationTableNames {
table := tableName
g.Go(func() error {
mergeGen := &mergeStmtGenerator{
rawTableName: getRawTableIdentifier(req.FlowJobName),
dstTableName: tableName,
dstTableName: table,
syncBatchID: req.SyncBatchID,
normalizeBatchID: normBatchID,
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
unchangedToastColumns: tableNameToUnchangedToastCols[tableName],
normalizedTableSchema: req.TableNameSchemaMapping[table],
unchangedToastColumns: tableNameToUnchangedToastCols[table],
peerdbCols: &protos.PeerDBColumns{
SoftDelete: req.SoftDelete,
SoftDeleteColName: req.SoftDeleteColName,
Expand All @@ -537,25 +538,25 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No
}

startTime := time.Now()
c.logger.Info("[merge] merging records...", "destTable", tableName)
c.logger.Info("[merge] merging records...", "destTable", table)

result, err := c.database.ExecContext(gCtx, mergeStatement, tableName)
result, err := c.database.ExecContext(gCtx, mergeStatement, table)
if err != nil {
return fmt.Errorf("failed to merge records into %s (statement: %s): %w",
tableName, mergeStatement, err)
table, mergeStatement, err)
}

endTime := time.Now()
c.logger.Info(fmt.Sprintf("[merge] merged records into %s, took: %d seconds",
tableName, endTime.Sub(startTime)/time.Second))
table, endTime.Sub(startTime)/time.Second))
if err != nil {
c.logger.Error("[merge] error while normalizing records", "error", err)
return err
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", tableName, err)
return fmt.Errorf("failed to get rows affected by merge statement for table %s: %w", table, err)
}

atomic.AddInt64(&totalRowsAffected, rowsAffected)
Expand Down Expand Up @@ -881,7 +882,7 @@ func (c *SnowflakeConnector) GetTableSchema(
return nil, err
}
res[tableName] = tableSchema
utils.RecordHeartbeat(ctx, fmt.Sprintf("fetched schema for table %s", tableName))
utils.RecordHeartbeat(ctx, "fetched schema for table "+tableName)
}

return &protos.GetTableSchemaBatchOutput{
Expand Down

0 comments on commit 1d8f164

Please sign in to comment.