From d98e571edc87c895e8088b98214ae88eab1024e4 Mon Sep 17 00:00:00 2001 From: Pankaj B Date: Mon, 29 Jan 2024 21:59:48 +0530 Subject: [PATCH] fix lint error in normalize.go --- flow/connectors/clickhouse/normalize.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 8439b8e730..f0aac33fd2 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -3,6 +3,7 @@ package connclickhouse import ( "database/sql" "fmt" + "strconv" "strings" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -56,7 +57,7 @@ func (c *ClickhouseConnector) SetupNormalizedTables( func generateCreateTableSQLForNormalizedTable( normalizedTable string, tableSchema *protos.TableSchema, - softDeleteColName string, + _ string, // softDeleteColName syncedAtColName string, ) (string, error) { var stmtBuilder strings.Builder @@ -178,9 +179,9 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques selectQuery.WriteString(" FROM ") selectQuery.WriteString(rawTbl) selectQuery.WriteString(" WHERE _peerdb_batch_id > ") - selectQuery.WriteString(fmt.Sprintf("%d", normBatchID)) + selectQuery.WriteString(strconv.FormatInt(normBatchID, 10)) selectQuery.WriteString(" AND _peerdb_batch_id <= ") - selectQuery.WriteString(fmt.Sprintf("%d", req.SyncBatchID)) + selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10)) selectQuery.WriteString(" AND _peerdb_destination_table_name = '") selectQuery.WriteString(tbl) selectQuery.WriteString("'") @@ -203,7 +204,12 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques } endNormalizeBatchId := normBatchID + 1 - c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, endNormalizeBatchId) + err = c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, endNormalizeBatchId) + if err != nil { + c.logger.ErrorContext(c.ctx, "[clickhouse] error while updating normalize batch id", err) + return nil, err + } + return &model.NormalizeResponse{ Done: true, StartBatchID: endNormalizeBatchId, @@ -218,6 +224,7 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch( ) ([]string, error) { rawTbl := c.getRawTableName(flowJobName) + //nolint:gosec q := fmt.Sprintf( `SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`, rawTbl, normalizeBatchID, syncBatchID) @@ -226,7 +233,7 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch( if err != nil { return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) } - + defer rows.Close() var tableNames []string for rows.Next() { var tableName sql.NullString @@ -242,6 +249,11 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch( tableNames = append(tableNames, tableName.String) } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("failed to read rows: %w", err) + } + return tableNames, nil }