Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 26, 2024
1 parent 1b44535 commit e88debe
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 70 deletions.
2 changes: 0 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest,
) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs

_, validateErr := h.ValidateCDCMirror(ctx, req)
if validateErr != nil {
slog.Error("validate mirror error", slog.Any("error", validateErr))
Expand Down Expand Up @@ -232,7 +231,6 @@ func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
cfg := req.QrepConfig

workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
Expand Down
30 changes: 15 additions & 15 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,26 @@ type MirrorJobRow struct {
NormalizeBatchID int
}

func (c *ClickhouseConnector) getMirrorRowByJobNAme(jobName string) (*MirrorJobRow, error) {
getLastOffsetSQL := "SELECT mirror_job_name, offset, sync_batch_id, normalize_batch_id FROM %s WHERE MIRROR_JOB_NAME=? Limit 1"
// func (c *ClickhouseConnector) getMirrorRowByJobNAme(jobName string) (*MirrorJobRow, error) {
// getLastOffsetSQL := "SELECT mirror_job_name, offset, sync_batch_id, normalize_batch_id FROM %s WHERE MIRROR_JOB_NAME=? Limit 1"

row := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, mirrorJobsTableIdentifier), jobName)
// row := c.database.QueryRowContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, mirrorJobsTableIdentifier), jobName)

var result MirrorJobRow
// var result MirrorJobRow

err := row.Scan(
&result.MirrorJobName,
&result.Offset,
&result.SyncBatchID,
&result.NormalizeBatchID,
)
// err := row.Scan(
// &result.MirrorJobName,
// &result.Offset,
// &result.SyncBatchID,
// &result.NormalizeBatchID,
// )

if err != nil {
return nil, err
}
// if err != nil {
// return nil, err
// }

return &result, nil
}
// return &result, nil
// }

func (c *ClickhouseConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
rawTableName := c.getRawTableName(req.FlowJobName)
Expand Down
47 changes: 0 additions & 47 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,6 @@ func NewClickhouseAvroSyncMethod(
}
}

// func (s *ClickhouseAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage string) error {
// if avroFile.StorageLocation != avro.AvroLocalStorage {
// s.connector.logger.Info("no file to put to stage")
// return nil
// }

// activity.RecordHeartbeat(s.connector.ctx, "putting file to stage")
// putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage)

// shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
// return fmt.Sprintf("putting file to stage %s", stage)
// })
// defer shutdown()

// if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil {
// return fmt.Errorf("failed to put file to stage: %w", err)
// }

// s.connector.logger.Info(fmt.Sprintf("put file %s to stage %s", avroFile.FilePath, stage))
// return nil
// }

func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFile) error {
stagingPath := s.config.StagingPath
if stagingPath == "" {
Expand Down Expand Up @@ -106,31 +84,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords(
defer avroFile.Cleanup()
s.connector.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog)

// stage := s.connector.getStageNameForJob(s.config.FlowJobName)
// err = s.connector.createStage(stage, s.config)
// if err != nil {
// return 0, err
// }
// s.connector.logger.Info(fmt.Sprintf("Created stage %s", stage))

// colNames, _, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier)
// if err != nil {
// return 0, err
// }

// err = s.putFileToStage(avroFile, "stage")
// if err != nil {
// return 0, err
// }
// s.connector.logger.Info("pushed avro file to stage", tableLog)

// err = CopyStageToDestination(s.connector, s.config, s.config.DestinationTableIdentifier, stage, colNames)
// if err != nil {
// return 0, err
// }
// s.connector.logger.Info(fmt.Sprintf("copying records into %s from stage %s",
// s.config.DestinationTableIdentifier, stage))

//Copy stage/avro to destination
err = s.CopyStageToDestination(avroFile)
if err != nil {
Expand Down
6 changes: 0 additions & 6 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type QRepPartitionFlowExecution struct {

// returns a new empty QRepFlowState
func NewQRepFlowState() *protos.QRepFlowState {

return &protos.QRepFlowState{
LastPartition: &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Expand All @@ -49,7 +48,6 @@ func NewQRepFlowState() *protos.QRepFlowState {

// returns a new empty QRepFlowState
func NewQRepFlowStateForTesting() *protos.QRepFlowState {

return &protos.QRepFlowState{
LastPartition: &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Expand All @@ -63,7 +61,6 @@ func NewQRepFlowStateForTesting() *protos.QRepFlowState {

// NewQRepFlowExecution creates a new instance of QRepFlowExecution.
func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution {

return &QRepFlowExecution{
config: config,
flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
Expand All @@ -78,7 +75,6 @@ func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUU
func NewQRepPartitionFlowExecution(ctx workflow.Context,
config *protos.QRepConfig, runUUID string,
) *QRepPartitionFlowExecution {

return &QRepPartitionFlowExecution{
config: config,
flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
Expand Down Expand Up @@ -414,7 +410,6 @@ func QRepFlowWorkflow(
config *protos.QRepConfig,
state *protos.QRepFlowState,
) error {

// The structure of this workflow is as follows:
// 1. Start the loop to continuously run the replication flow.
// 2. In the loop, query the source database to get the partitions to replicate.
Expand Down Expand Up @@ -552,7 +547,6 @@ func QRepPartitionWorkflow(
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {

ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
q := NewQRepPartitionFlowExecution(ctx, config, runUUID)
return q.ReplicatePartitions(ctx, partitions)
Expand Down

0 comments on commit e88debe

Please sign in to comment.