Skip to content

Commit

Permalink
fix lint error in normalize.go
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 29, 2024
1 parent 554ee0b commit d98e571
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connclickhouse
import (
"database/sql"
"fmt"
"strconv"
"strings"

"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (c *ClickhouseConnector) SetupNormalizedTables(
func generateCreateTableSQLForNormalizedTable(
normalizedTable string,
tableSchema *protos.TableSchema,
softDeleteColName string,
_ string, // softDeleteColName
syncedAtColName string,
) (string, error) {
var stmtBuilder strings.Builder
Expand Down Expand Up @@ -178,9 +179,9 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques
selectQuery.WriteString(" FROM ")
selectQuery.WriteString(rawTbl)
selectQuery.WriteString(" WHERE _peerdb_batch_id > ")
selectQuery.WriteString(fmt.Sprintf("%d", normBatchID))
selectQuery.WriteString(strconv.FormatInt(normBatchID, 10))
selectQuery.WriteString(" AND _peerdb_batch_id <= ")
selectQuery.WriteString(fmt.Sprintf("%d", req.SyncBatchID))
selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10))
selectQuery.WriteString(" AND _peerdb_destination_table_name = '")
selectQuery.WriteString(tbl)
selectQuery.WriteString("'")
Expand All @@ -203,7 +204,12 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques
}

endNormalizeBatchId := normBatchID + 1
c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, endNormalizeBatchId)
err = c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, endNormalizeBatchId)
if err != nil {
c.logger.ErrorContext(c.ctx, "[clickhouse] error while updating normalize batch id", err)
return nil, err
}

return &model.NormalizeResponse{
Done: true,
StartBatchID: endNormalizeBatchId,
Expand All @@ -218,6 +224,7 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch(
) ([]string, error) {
rawTbl := c.getRawTableName(flowJobName)

//nolint:gosec
q := fmt.Sprintf(
`SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`,
rawTbl, normalizeBatchID, syncBatchID)
Expand All @@ -226,7 +233,7 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch(
if err != nil {
return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err)
}

defer rows.Close()
var tableNames []string
for rows.Next() {
var tableName sql.NullString
Expand All @@ -242,6 +249,11 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch(
tableNames = append(tableNames, tableName.String)
}

err = rows.Err()
if err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}

return tableNames, nil
}

Expand Down

0 comments on commit d98e571

Please sign in to comment.