Skip to content

Commit

Permalink
fix more lint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 29, 2024
1 parent d98e571 commit 45c165f
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 19 deletions.
17 changes: 1 addition & 16 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/jackc/pgx/v5/pgtype"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -81,15 +80,14 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
tableNameRowsMapping := make(map[string]uint32)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)

if err != nil {
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}

qrepConfig := &protos.QRepConfig{
StagingPath: c.config.S3Integration,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s", rawTableIdentifier)),
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c)
destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier)
Expand Down Expand Up @@ -122,7 +120,6 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
}

func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {

rawTableName := c.getRawTableName(req.FlowJobName)
c.logger.Info(fmt.Sprintf("pushing records to Clickhouse table %s", rawTableName))

Expand Down Expand Up @@ -150,18 +147,6 @@ func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model
return res, nil
}

func (c *ClickhouseConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) {
checkIfJobMetadataExistsSQL := "SELECT TO_BOOLEAN(COUNT(1)) FROM %s WHERE MIRROR_JOB_NAME=?"

var result pgtype.Bool
err := tx.QueryRowContext(c.ctx,
fmt.Sprintf(checkIfJobMetadataExistsSQL, mirrorJobsTableIdentifier), jobName).Scan(&result)
if err != nil {
return false, fmt.Errorf("error reading result row: %w", err)
}
return result.Bool, nil
}

func (c *ClickhouseConnector) SyncFlowCleanup(jobName string) error {
err := c.pgMetadata.DropMetadata(jobName)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords(
stream *model.QRecordStream,
flowJobName string,
) (int, error) {

tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier)
dstTableName := s.config.DestinationTableIdentifier

Expand All @@ -84,8 +83,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords(
}
defer avroFile.Cleanup()
s.connector.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog)


err = s.CopyStageToDestination(avroFile)
if err != nil {
return 0, err
Expand Down

0 comments on commit 45c165f

Please sign in to comment.