From f5846c9277133a25d9b3afe7b9713508762c4c03 Mon Sep 17 00:00:00 2001 From: Pankaj B Date: Fri, 19 Jan 2024 11:37:53 +0530 Subject: [PATCH] more changes --- flow/connectors/clickhouse/cdc.go | 61 ++++++++++++--- flow/connectors/clickhouse/qrep_avro_sync.go | 78 ++++++++++++++++++++ 2 files changed, 128 insertions(+), 11 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 49916a6aee..0e9b6a3d5f 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -172,18 +172,18 @@ func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model } // transaction for SyncRecords - syncRecordsTx, err := c.database.BeginTx(c.ctx, nil) - if err != nil { - return nil, err - } + // syncRecordsTx, err := c.database.BeginTx(c.ctx, nil) + // if err != nil { + // return nil, err + // } // in case we return after error, ensure transaction is rolled back - defer func() { - deferErr := syncRecordsTx.Rollback() - if deferErr != sql.ErrTxDone && deferErr != nil { - c.logger.Error("error while rolling back transaction for SyncRecords: %v", - slog.Any("error", deferErr), slog.Int64("syncBatchID", syncBatchID)) - } - }() + // defer func() { + // deferErr := syncRecordsTx.Rollback() + // if deferErr != sql.ErrTxDone && deferErr != nil { + // c.logger.Error("error while rolling back transaction for SyncRecords: %v", + // slog.Any("error", deferErr), slog.Int64("syncBatchID", syncBatchID)) + // } + // }() // updating metadata with new offset and syncBatchID err = c.updateSyncMetadata(req.FlowJobName, res.LastSyncedCheckPointID, syncBatchID, syncRecordsTx) @@ -199,6 +199,45 @@ func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model return res, nil } +func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) { + checkIfJobMetadataExistsSQL := "SELECT TO_BOOLEAN(COUNT(1)) FROM %s WHERE MIRROR_JOB_NAME=?" + + var result pgtype.Bool + err := tx.QueryRowContext(c.ctx, + fmt.Sprintf(checkIfJobMetadataExistsSQL, mirrorJobsTableIdentifier), jobName).Scan(&result) + if err != nil { + return false, fmt.Errorf("error reading result row: %w", err) + } + return result.Bool, nil +} + +func (c *ClickhouseConnector) updateSyncMetadata(flowJobName string, lastCP int64, + syncBatchID int64, syncRecordsTx *sql.Tx, +) error { + jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) + if err != nil { + return fmt.Errorf("failed to get sync status for flow job: %w", err) + } + + if !jobMetadataExists { + _, err := syncRecordsTx.ExecContext(c.ctx, + fmt.Sprintf(insertJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), + flowJobName, lastCP, syncBatchID, 0) + if err != nil { + return fmt.Errorf("failed to insert flow job status: %w", err) + } + } else { + _, err := syncRecordsTx.ExecContext(c.ctx, + fmt.Sprintf(updateMetadataForSyncRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier), + lastCP, syncBatchID, flowJobName) + if err != nil { + return fmt.Errorf("failed to update flow job status: %w", err) + } + } + + return nil +} + func (c *ClickhouseConnector) SyncFlowCleanup(jobName string) error { syncFlowCleanupTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 2adb391457..0b1d02f6d1 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -30,6 +30,84 @@ func NewClickhouseAvroSyncMethod( } } +func (s *ClickhouseAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage string) error { + if avroFile.StorageLocation != avro.AvroLocalStorage { + s.connector.logger.Info("no file to put to stage") + return nil + } + + activity.RecordHeartbeat(s.connector.ctx, "putting file to stage") + putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage) + + shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string { + return fmt.Sprintf("putting file to stage %s", stage) + }) + defer shutdown() + + if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil { + return fmt.Errorf("failed to put file to stage: %w", err) + } + + s.connector.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage)) + return nil +} + +func (s *ClickhouseAvroSyncMethod) SyncRecords( + dstTableSchema []*sql.ColumnType, + stream *model.QRecordStream, + flowJobName string, +) (int, error) { + tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier) + dstTableName := s.config.DestinationTableIdentifier + + schema, err := stream.Schema() + if err != nil { + return -1, fmt.Errorf("failed to get schema from stream: %w", err) + } + + s.connector.logger.Info("sync function called and schema acquired", tableLog) + + avroSchema, err := s.getAvroSchema(dstTableName, schema) + if err != nil { + return 0, err + } + + partitionID := shared.RandomString(16) + avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName) + if err != nil { + return 0, err + } + defer avroFile.Cleanup() + s.connector.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog) + + stage := s.connector.getStageNameForJob(s.config.FlowJobName) + err = s.connector.createStage(stage, s.config) + if err != nil { + return 0, err + } + s.connector.logger.Info(fmt.Sprintf("Created stage %s", stage)) + + colNames, _, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier) + if err != nil { + return 0, err + } + + err = s.putFileToStage(avroFile, stage) + if err != nil { + return 0, err + } + s.connector.logger.Info("pushed avro file to stage", tableLog) + + err = CopyStageToDestination(s.connector, s.config, s.config.DestinationTableIdentifier, stage, colNames) + if err != nil { + return 0, err + } + s.connector.logger.Info(fmt.Sprintf("copying records into %s from stage %s", + s.config.DestinationTableIdentifier, stage)) + + return avroFile.NumRecords, nil +} + func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( config *protos.QRepConfig, partition *protos.QRepPartition,