Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 25, 2024
1 parent 6b1afa6 commit 1b44535
Show file tree
Hide file tree
Showing 9 changed files with 1 addition and 48 deletions.
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ func (a *FlowableActivity) StartNormalize(
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs
ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName)
//fmt.Printf("\n*********************** in StartNormalize %+v\n", conn)
dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down
2 changes: 0 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest,
) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs
fmt.Printf("\n******************************** CreateCDCFlow %+v", cfg)

_, validateErr := h.ValidateCDCMirror(ctx, req)
if validateErr != nil {
Expand Down Expand Up @@ -233,7 +232,6 @@ func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
cfg := req.QrepConfig
fmt.Printf("\n******************************** CreateQRepFlow config: %+v", cfg)

workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
streamRes, err := utils.RecordsToRawTableStream(streamReq)
//x := *&streamRes.Stream
//y := (*x).Records
//fmt.Printf("\n*******************############################## cdc.go in syncRecordsViaAvro streamRes: %+v", streamRes)
if err != nil {
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}

qrepConfig := &protos.QRepConfig{
DestinationPeer: c.config.DestinationPeer,
StagingPath: c.config.S3Integration,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s", rawTableIdentifier)),
Expand Down Expand Up @@ -162,8 +162,6 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
}

func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
//fmt.Printf("\n ******************************************** !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! in ClickhouseConnector.SyncRecords")
//fmt.Printf("\n ******************************* in cdc.go in SyncRecords config: %+v", c.config.S3Integration)
//c.config.S3Integration = "s3://avro-clickhouse"
rawTableName := c.getRawTableName(req.FlowJobName)
c.logger.Info(fmt.Sprintf("pushing records to Clickhouse table %s", rawTableName))
Expand Down
4 changes: 0 additions & 4 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques
// model the raw table data as inserts.
for _, tbl := range destinationTableNames {
// SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id
//fmt.Printf("\n************************* in normalize_records1: tbl %s", tbl)
selectQuery := strings.Builder{}
selectQuery.WriteString("SELECT ")

Expand All @@ -143,7 +142,6 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques

//schema := c.tableSchemaMapping[tbl]
schema := req.TableNameSchemaMapping[tbl]
//fmt.Printf("\n************************* in normalize_records2: schema %+v", schema)
numCols := len(schema.ColumnNames)

projection := strings.Builder{}
Expand Down Expand Up @@ -245,8 +243,6 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch(
tableNames = append(tableNames, tableName.String)
}

fmt.Printf("\n****************************** getDistinctTableNamesInBatch tableNames %+v", tableNames)

return tableNames, nil
}

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func (c *ClickhouseConnector) SyncQRepRecords(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
//fmt.Printf("\n******************* in ClickhouseConnector.SyncQRepRecords")
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
flowLog := slog.Group("sync_metadata",
Expand Down
27 changes: 0 additions & 27 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func NewClickhouseAvroSyncMethod(
// }

func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFile) error {
//fmt.Printf("\n************************* in CopyStageToDesti stagingPath: %+v", s.config.StagingPath)
stagingPath := s.config.StagingPath
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration //"s3://avro-clickhouse"
Expand All @@ -73,8 +72,6 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFil
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)

//fmt.Printf("\n************************ CopyStagingToDestination query: %s\n", query)

_, err = s.connector.database.Exec(query)

return err
Expand All @@ -85,8 +82,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords(
stream *model.QRecordStream,
flowJobName string,
) (int, error) {
//fmt.Printf("\n************************* in qrep_avro_sync: SyncRecords1 dstTableSchema %+v", dstTableSchema)
//fmt.Printf("\n************************ in qrep_avro_sync: SyncRecords2 config %+v", s.config)
//s.config.StagingPath = "s3://avro-clickhouse"
tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier)
dstTableName := s.config.DestinationTableIdentifier
Expand All @@ -96,21 +91,15 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords(
return -1, fmt.Errorf("failed to get schema from stream: %w", err)
}

//fmt.Printf("\n******************************* in qrep_avro_sync: SyncRecords3 stream schema %+v", schema)

s.connector.logger.Info("sync function called and schema acquired", tableLog)

avroSchema, err := s.getAvroSchema(dstTableName, schema)
if err != nil {
return 0, err
}

//fmt.Printf("\n******************************* in qrep_avro_sync: SyncRecords5 avro schema %+v", avroSchema)

partitionID := shared.RandomString(16)
//fmt.Printf("\n******************* calling writeToAvroFile partitionId: %+v", partitionID)
avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName)
//fmt.Printf("\n******************* records written to avrofile %+v", avroFile)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -144,7 +133,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords(

//Copy stage/avro to destination
err = s.CopyStageToDestination(avroFile)
//fmt.Printf("\n ***************** in qrep_avro_sync: SyncRecords after CopyStageToDestination err: %+v", err)
if err != nil {
return 0, err
}
Expand All @@ -158,8 +146,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
dstTableSchema []*sql.ColumnType,
stream *model.QRecordStream,
) (int, error) {
fmt.Printf("\n******************* in qrep_avro_sync: SyncQRepRecords config %+v", s.config.DestinationPeer)
//fmt.Printf("\n************************* in SyncQRepRecords 1")
startTime := time.Now()
dstTableName := config.DestinationTableIdentifier
//s.config.StagingPath = "s3://avro-clickhouse"
Expand All @@ -169,15 +155,11 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
if err != nil {
return -1, fmt.Errorf("failed to get schema from stream: %w", err)
}
//fmt.Printf("\n******************************* in qrep_avro_sync: SyncQRepRecords 2 avro schema %+v", schema)
avroSchema, err := s.getAvroSchema(dstTableName, schema)
if err != nil {
return 0, err
}

//fmt.Printf("\n******************************* in qrep_avro_sync: SyncQRepRecords 3 avro schema %+v", avroSchema)
//fmt.Printf("\n******************************* in qrep_avro_sync: SyncQRepRecords 4 avro schema %+v",)

avroFile, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName)
if err != nil {
return 0, err
Expand All @@ -190,8 +172,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{})
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

//fmt.Printf("\n*********************** in qrep_avro_sync SyncQRepRecords 4 avroFileUrl: %+v", avroFileUrl)

if err != nil {
return 0, err
}
Expand All @@ -200,12 +180,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')",
config.DestinationTableIdentifier, selector, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)

//fmt.Printf("\n************************************ in qrep_avro_sync SyncQRepRecords 5 query: %s\n", query)

_, err = s.connector.database.Exec(query)

//fmt.Printf("\n************************************ in qrep_avro_sync SyncQRepRecords 6 err: %+v\n", err)

if err != nil {
return 0, err
}
Expand Down Expand Up @@ -237,12 +213,10 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
partitionID string,
flowJobName string,
) (*avro.AvroFile, error) {
fmt.Printf("\n************************* in writeToAvroFile 1 21 %+v", s.config)
stagingPath := s.config.StagingPath //"s3://avro-clickhouse"
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration //"s3://avro-clickhouse"
}
fmt.Printf("\n****************************************** StagingPath: %+v*****\n", stagingPath)
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd,
qvalue.QDWHTypeClickhouse)
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
Expand All @@ -252,7 +226,6 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName
avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) ///utils.S3PeerCredentials{})
//fmt.Printf("\n************************* writeToAvroFile 2 avroFile %+v, err: %+v", avroFile, err)
if err != nil {
return nil, fmt.Errorf("failed to write records to S3: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig())
default:
//fmt.Printf("\n*********************** in GetCDCSyncConnector not found %+v %T\n", inner, inner)
return nil, ErrUnsupportedFunctionality
}
}
Expand All @@ -179,7 +178,6 @@ func GetCDCNormalizeConnector(ctx context.Context,
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig())
default:
//fmt.Printf("\n*********************** in GetCDCNormalizeConnector not found %+v %T\n", inner, inner)
return nil, ErrUnsupportedFunctionality
}
}
Expand Down
2 changes: 0 additions & 2 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,8 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {

func (p *PostgresMetadataStore) SetupMetadata() error {
// create the schema
//fmt.Printf("\n********** SetupMetadata 1, schema name %+v", p.schemaName)
_, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil && !utils.IsUniqueError(err) {
fmt.Printf("********** error in SetupMetadata %+v", err)
p.logger.Error("failed to create schema", slog.Any("error", err))
return err
}
Expand Down
6 changes: 0 additions & 6 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type QRepPartitionFlowExecution struct {

// returns a new empty QRepFlowState
func NewQRepFlowState() *protos.QRepFlowState {
//fmt.Printf("\n*****************************NewQRepFlowState")

return &protos.QRepFlowState{
LastPartition: &protos.QRepPartition{
Expand All @@ -50,7 +49,6 @@ func NewQRepFlowState() *protos.QRepFlowState {

// returns a new empty QRepFlowState
func NewQRepFlowStateForTesting() *protos.QRepFlowState {
//fmt.Printf("\n*****************************NewQRepFlowStateForTesting")

return &protos.QRepFlowState{
LastPartition: &protos.QRepPartition{
Expand All @@ -65,7 +63,6 @@ func NewQRepFlowStateForTesting() *protos.QRepFlowState {

// NewQRepFlowExecution creates a new instance of QRepFlowExecution.
func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution {
//fmt.Printf("\n*****************************NewQRepFlowExecution")

return &QRepFlowExecution{
config: config,
Expand All @@ -81,7 +78,6 @@ func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUU
func NewQRepPartitionFlowExecution(ctx workflow.Context,
config *protos.QRepConfig, runUUID string,
) *QRepPartitionFlowExecution {
//fmt.Printf("\n*****************************NewQRepPartitionFlowExecution")

return &QRepPartitionFlowExecution{
config: config,
Expand Down Expand Up @@ -418,7 +414,6 @@ func QRepFlowWorkflow(
config *protos.QRepConfig,
state *protos.QRepFlowState,
) error {
//fmt.Printf("\n*****************************QRepFlowWorkflow")

// The structure of this workflow is as follows:
// 1. Start the loop to continuously run the replication flow.
Expand Down Expand Up @@ -557,7 +552,6 @@ func QRepPartitionWorkflow(
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
//fmt.Printf("\n*****************************QRepPartitionWorkflow")

ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
q := NewQRepPartitionFlowExecution(ctx, config, runUUID)
Expand Down

0 comments on commit 1b44535

Please sign in to comment.