diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 67d290cdeb..f867d9d96f 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -9,7 +9,6 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - "github.com/jackc/pgx/v5/pgtype" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -81,7 +80,6 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( tableNameRowsMapping := make(map[string]uint32) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) streamRes, err := utils.RecordsToRawTableStream(streamReq) - if err != nil { return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } @@ -89,7 +87,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( qrepConfig := &protos.QRepConfig{ StagingPath: c.config.S3Integration, FlowJobName: req.FlowJobName, - DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s", rawTableIdentifier)), + DestinationTableIdentifier: strings.ToLower(rawTableIdentifier), } avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c) destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier) @@ -122,7 +120,6 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( } func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - rawTableName := c.getRawTableName(req.FlowJobName) c.logger.Info(fmt.Sprintf("pushing records to Clickhouse table %s", rawTableName)) @@ -150,18 +147,6 @@ func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model return res, nil } -func (c *ClickhouseConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) { - checkIfJobMetadataExistsSQL := "SELECT TO_BOOLEAN(COUNT(1)) FROM %s WHERE MIRROR_JOB_NAME=?" - - var result pgtype.Bool - err := tx.QueryRowContext(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - return false, fmt.Errorf("error reading result row: %w", err) - } - return result.Bool, nil -} - func (c *ClickhouseConnector) SyncFlowCleanup(jobName string) error { err := c.pgMetadata.DropMetadata(jobName) if err != nil { diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 5adad380d4..68129a98d5 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -61,7 +61,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords( stream *model.QRecordStream, flowJobName string, ) (int, error) { - tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier) dstTableName := s.config.DestinationTableIdentifier @@ -84,8 +83,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords( } defer avroFile.Cleanup() s.connector.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog) - - err = s.CopyStageToDestination(avroFile) if err != nil { return 0, err