Skip to content

Commit

Permalink
more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 21, 2024
1 parent 44a0b97 commit f5846c9
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 11 deletions.
61 changes: 50 additions & 11 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,18 @@ func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model
}

// transaction for SyncRecords
syncRecordsTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
return nil, err
}
// syncRecordsTx, err := c.database.BeginTx(c.ctx, nil)
// if err != nil {
// return nil, err
// }
// in case we return after error, ensure transaction is rolled back
defer func() {
deferErr := syncRecordsTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
c.logger.Error("error while rolling back transaction for SyncRecords: %v",
slog.Any("error", deferErr), slog.Int64("syncBatchID", syncBatchID))
}
}()
// defer func() {
// deferErr := syncRecordsTx.Rollback()
// if deferErr != sql.ErrTxDone && deferErr != nil {
// c.logger.Error("error while rolling back transaction for SyncRecords: %v",
// slog.Any("error", deferErr), slog.Int64("syncBatchID", syncBatchID))
// }
// }()

// updating metadata with new offset and syncBatchID
err = c.updateSyncMetadata(req.FlowJobName, res.LastSyncedCheckPointID, syncBatchID, syncRecordsTx)
Expand All @@ -199,6 +199,45 @@ func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model
return res, nil
}

func (c *SnowflakeConnector) jobMetadataExistsTx(tx *sql.Tx, jobName string) (bool, error) {
checkIfJobMetadataExistsSQL := "SELECT TO_BOOLEAN(COUNT(1)) FROM %s WHERE MIRROR_JOB_NAME=?"

var result pgtype.Bool
err := tx.QueryRowContext(c.ctx,
fmt.Sprintf(checkIfJobMetadataExistsSQL, mirrorJobsTableIdentifier), jobName).Scan(&result)
if err != nil {
return false, fmt.Errorf("error reading result row: %w", err)
}
return result.Bool, nil
}

func (c *ClickhouseConnector) updateSyncMetadata(flowJobName string, lastCP int64,
syncBatchID int64, syncRecordsTx *sql.Tx,
) error {
jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName)
if err != nil {
return fmt.Errorf("failed to get sync status for flow job: %w", err)
}

if !jobMetadataExists {
_, err := syncRecordsTx.ExecContext(c.ctx,
fmt.Sprintf(insertJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier),
flowJobName, lastCP, syncBatchID, 0)
if err != nil {
return fmt.Errorf("failed to insert flow job status: %w", err)
}
} else {
_, err := syncRecordsTx.ExecContext(c.ctx,
fmt.Sprintf(updateMetadataForSyncRecordsSQL, c.metadataSchema, mirrorJobsTableIdentifier),
lastCP, syncBatchID, flowJobName)
if err != nil {
return fmt.Errorf("failed to update flow job status: %w", err)
}
}

return nil
}

func (c *ClickhouseConnector) SyncFlowCleanup(jobName string) error {
syncFlowCleanupTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
Expand Down
78 changes: 78 additions & 0 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,84 @@ func NewClickhouseAvroSyncMethod(
}
}

func (s *ClickhouseAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage string) error {
if avroFile.StorageLocation != avro.AvroLocalStorage {
s.connector.logger.Info("no file to put to stage")
return nil
}

activity.RecordHeartbeat(s.connector.ctx, "putting file to stage")
putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage)

shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})
defer shutdown()

if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil {
return fmt.Errorf("failed to put file to stage: %w", err)
}

s.connector.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage))
return nil
}

func (s *ClickhouseAvroSyncMethod) SyncRecords(
dstTableSchema []*sql.ColumnType,
stream *model.QRecordStream,
flowJobName string,
) (int, error) {
tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier)
dstTableName := s.config.DestinationTableIdentifier

schema, err := stream.Schema()
if err != nil {
return -1, fmt.Errorf("failed to get schema from stream: %w", err)
}

s.connector.logger.Info("sync function called and schema acquired", tableLog)

avroSchema, err := s.getAvroSchema(dstTableName, schema)
if err != nil {
return 0, err
}

partitionID := shared.RandomString(16)
avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName)
if err != nil {
return 0, err
}
defer avroFile.Cleanup()
s.connector.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog)

stage := s.connector.getStageNameForJob(s.config.FlowJobName)
err = s.connector.createStage(stage, s.config)
if err != nil {
return 0, err
}
s.connector.logger.Info(fmt.Sprintf("Created stage %s", stage))

colNames, _, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier)
if err != nil {
return 0, err
}

err = s.putFileToStage(avroFile, stage)
if err != nil {
return 0, err
}
s.connector.logger.Info("pushed avro file to stage", tableLog)

err = CopyStageToDestination(s.connector, s.config, s.config.DestinationTableIdentifier, stage, colNames)
if err != nil {
return 0, err
}
s.connector.logger.Info(fmt.Sprintf("copying records into %s from stage %s",
s.config.DestinationTableIdentifier, stage))

return avroFile.NumRecords, nil
}

func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
config *protos.QRepConfig,
partition *protos.QRepPartition,
Expand Down

0 comments on commit f5846c9

Please sign in to comment.