From 1b445356bba86b91f6ea2368f01a5fcdce593bbc Mon Sep 17 00:00:00 2001 From: Pankaj B Date: Thu, 25 Jan 2024 20:28:47 +0530 Subject: [PATCH] cleanup --- flow/activities/flowable.go | 1 - flow/cmd/handler.go | 2 -- flow/connectors/clickhouse/cdc.go | 4 +-- flow/connectors/clickhouse/normalize.go | 4 --- flow/connectors/clickhouse/qrep.go | 1 - flow/connectors/clickhouse/qrep_avro_sync.go | 27 -------------------- flow/connectors/core.go | 2 -- flow/connectors/external_metadata/store.go | 2 -- flow/workflows/qrep_flow.go | 6 ----- 9 files changed, 1 insertion(+), 48 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 0dc89628c2..a6224da626 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -371,7 +371,6 @@ func (a *FlowableActivity) StartNormalize( ) (*model.NormalizeResponse, error) { conn := input.FlowConnectionConfigs ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName) - //fmt.Printf("\n*********************** in StartNormalize %+v\n", conn) dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination) if errors.Is(err, connectors.ErrUnsupportedFunctionality) { dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index ea4a8f0ad2..b53c76d50c 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -121,7 +121,6 @@ func (h *FlowRequestHandler) CreateCDCFlow( ctx context.Context, req *protos.CreateCDCFlowRequest, ) (*protos.CreateCDCFlowResponse, error) { cfg := req.ConnectionConfigs - fmt.Printf("\n******************************** CreateCDCFlow %+v", cfg) _, validateErr := h.ValidateCDCMirror(ctx, req) if validateErr != nil { @@ -233,7 +232,6 @@ func (h *FlowRequestHandler) CreateQRepFlow( ctx context.Context, req *protos.CreateQRepFlowRequest, ) (*protos.CreateQRepFlowResponse, error) { cfg := req.QrepConfig - fmt.Printf("\n******************************** CreateQRepFlow config: %+v", cfg) workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 2e97695746..8103c05b64 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -119,12 +119,12 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( streamRes, err := utils.RecordsToRawTableStream(streamReq) //x := *&streamRes.Stream //y := (*x).Records - //fmt.Printf("\n*******************############################## cdc.go in syncRecordsViaAvro streamRes: %+v", streamRes) if err != nil { return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } qrepConfig := &protos.QRepConfig{ + DestinationPeer: c.config.DestinationPeer, StagingPath: c.config.S3Integration, FlowJobName: req.FlowJobName, DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s", rawTableIdentifier)), @@ -162,8 +162,6 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( } func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - //fmt.Printf("\n ******************************************** !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! in ClickhouseConnector.SyncRecords") - //fmt.Printf("\n ******************************* in cdc.go in SyncRecords config: %+v", c.config.S3Integration) //c.config.S3Integration = "s3://avro-clickhouse" rawTableName := c.getRawTableName(req.FlowJobName) c.logger.Info(fmt.Sprintf("pushing records to Clickhouse table %s", rawTableName)) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 54a7af2385..2432809192 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -134,7 +134,6 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques // model the raw table data as inserts. for _, tbl := range destinationTableNames { // SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id - //fmt.Printf("\n************************* in normalize_records1: tbl %s", tbl) selectQuery := strings.Builder{} selectQuery.WriteString("SELECT ") @@ -143,7 +142,6 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques //schema := c.tableSchemaMapping[tbl] schema := req.TableNameSchemaMapping[tbl] - //fmt.Printf("\n************************* in normalize_records2: schema %+v", schema) numCols := len(schema.ColumnNames) projection := strings.Builder{} @@ -245,8 +243,6 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch( tableNames = append(tableNames, tableName.String) } - fmt.Printf("\n****************************** getDistinctTableNamesInBatch tableNames %+v", tableNames) - return tableNames, nil } diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index cf26ca8d06..74ffe26524 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -24,7 +24,6 @@ func (c *ClickhouseConnector) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, ) (int, error) { - //fmt.Printf("\n******************* in ClickhouseConnector.SyncQRepRecords") // Ensure the destination table is available. destTable := config.DestinationTableIdentifier flowLog := slog.Group("sync_metadata", diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index d560277039..93871ab2e4 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -54,7 +54,6 @@ func NewClickhouseAvroSyncMethod( // } func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFile) error { - //fmt.Printf("\n************************* in CopyStageToDesti stagingPath: %+v", s.config.StagingPath) stagingPath := s.config.StagingPath if stagingPath == "" { stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration //"s3://avro-clickhouse" @@ -73,8 +72,6 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFil query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) - //fmt.Printf("\n************************ CopyStagingToDestination query: %s\n", query) - _, err = s.connector.database.Exec(query) return err @@ -85,8 +82,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords( stream *model.QRecordStream, flowJobName string, ) (int, error) { - //fmt.Printf("\n************************* in qrep_avro_sync: SyncRecords1 dstTableSchema %+v", dstTableSchema) - //fmt.Printf("\n************************ in qrep_avro_sync: SyncRecords2 config %+v", s.config) //s.config.StagingPath = "s3://avro-clickhouse" tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier) dstTableName := s.config.DestinationTableIdentifier @@ -96,8 +91,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords( return -1, fmt.Errorf("failed to get schema from stream: %w", err) } - //fmt.Printf("\n******************************* in qrep_avro_sync: SyncRecords3 stream schema %+v", schema) - s.connector.logger.Info("sync function called and schema acquired", tableLog) avroSchema, err := s.getAvroSchema(dstTableName, schema) @@ -105,12 +98,8 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords( return 0, err } - //fmt.Printf("\n******************************* in qrep_avro_sync: SyncRecords5 avro schema %+v", avroSchema) - partitionID := shared.RandomString(16) - //fmt.Printf("\n******************* calling writeToAvroFile partitionId: %+v", partitionID) avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName) - //fmt.Printf("\n******************* records written to avrofile %+v", avroFile) if err != nil { return 0, err } @@ -144,7 +133,6 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords( //Copy stage/avro to destination err = s.CopyStageToDestination(avroFile) - //fmt.Printf("\n ***************** in qrep_avro_sync: SyncRecords after CopyStageToDestination err: %+v", err) if err != nil { return 0, err } @@ -158,8 +146,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( dstTableSchema []*sql.ColumnType, stream *model.QRecordStream, ) (int, error) { - fmt.Printf("\n******************* in qrep_avro_sync: SyncQRepRecords config %+v", s.config.DestinationPeer) - //fmt.Printf("\n************************* in SyncQRepRecords 1") startTime := time.Now() dstTableName := config.DestinationTableIdentifier //s.config.StagingPath = "s3://avro-clickhouse" @@ -169,15 +155,11 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( if err != nil { return -1, fmt.Errorf("failed to get schema from stream: %w", err) } - //fmt.Printf("\n******************************* in qrep_avro_sync: SyncQRepRecords 2 avro schema %+v", schema) avroSchema, err := s.getAvroSchema(dstTableName, schema) if err != nil { return 0, err } - //fmt.Printf("\n******************************* in qrep_avro_sync: SyncQRepRecords 3 avro schema %+v", avroSchema) - //fmt.Printf("\n******************************* in qrep_avro_sync: SyncQRepRecords 4 avro schema %+v",) - avroFile, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName) if err != nil { return 0, err @@ -190,8 +172,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{}) avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) - //fmt.Printf("\n*********************** in qrep_avro_sync SyncQRepRecords 4 avroFileUrl: %+v", avroFileUrl) - if err != nil { return 0, err } @@ -200,12 +180,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')", config.DestinationTableIdentifier, selector, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) - //fmt.Printf("\n************************************ in qrep_avro_sync SyncQRepRecords 5 query: %s\n", query) - _, err = s.connector.database.Exec(query) - //fmt.Printf("\n************************************ in qrep_avro_sync SyncQRepRecords 6 err: %+v\n", err) - if err != nil { return 0, err } @@ -237,12 +213,10 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( partitionID string, flowJobName string, ) (*avro.AvroFile, error) { - fmt.Printf("\n************************* in writeToAvroFile 1 21 %+v", s.config) stagingPath := s.config.StagingPath //"s3://avro-clickhouse" if stagingPath == "" { stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration //"s3://avro-clickhouse" } - fmt.Printf("\n****************************************** StagingPath: %+v*****\n", stagingPath) ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse) s3o, err := utils.NewS3BucketAndPrefix(stagingPath) @@ -252,7 +226,6 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) ///utils.S3PeerCredentials{}) - //fmt.Printf("\n************************* writeToAvroFile 2 avroFile %+v, err: %+v", avroFile, err) if err != nil { return nil, fmt.Errorf("failed to write records to S3: %w", err) } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 19fbe4114f..8976f5565b 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -160,7 +160,6 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne case *protos.Peer_ClickhouseConfig: return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()) default: - //fmt.Printf("\n*********************** in GetCDCSyncConnector not found %+v %T\n", inner, inner) return nil, ErrUnsupportedFunctionality } } @@ -179,7 +178,6 @@ func GetCDCNormalizeConnector(ctx context.Context, case *protos.Peer_ClickhouseConfig: return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()) default: - //fmt.Printf("\n*********************** in GetCDCNormalizeConnector not found %+v %T\n", inner, inner) return nil, ErrUnsupportedFunctionality } } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 36904a6695..8be5a7e05e 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -98,10 +98,8 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { func (p *PostgresMetadataStore) SetupMetadata() error { // create the schema - //fmt.Printf("\n********** SetupMetadata 1, schema name %+v", p.schemaName) _, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) if err != nil && !utils.IsUniqueError(err) { - fmt.Printf("********** error in SetupMetadata %+v", err) p.logger.Error("failed to create schema", slog.Any("error", err)) return err } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 9864cd875a..9d9d7fcf06 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -35,7 +35,6 @@ type QRepPartitionFlowExecution struct { // returns a new empty QRepFlowState func NewQRepFlowState() *protos.QRepFlowState { - //fmt.Printf("\n*****************************NewQRepFlowState") return &protos.QRepFlowState{ LastPartition: &protos.QRepPartition{ @@ -50,7 +49,6 @@ func NewQRepFlowState() *protos.QRepFlowState { // returns a new empty QRepFlowState func NewQRepFlowStateForTesting() *protos.QRepFlowState { - //fmt.Printf("\n*****************************NewQRepFlowStateForTesting") return &protos.QRepFlowState{ LastPartition: &protos.QRepPartition{ @@ -65,7 +63,6 @@ func NewQRepFlowStateForTesting() *protos.QRepFlowState { // NewQRepFlowExecution creates a new instance of QRepFlowExecution. func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *QRepFlowExecution { - //fmt.Printf("\n*****************************NewQRepFlowExecution") return &QRepFlowExecution{ config: config, @@ -81,7 +78,6 @@ func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUU func NewQRepPartitionFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string, ) *QRepPartitionFlowExecution { - //fmt.Printf("\n*****************************NewQRepPartitionFlowExecution") return &QRepPartitionFlowExecution{ config: config, @@ -418,7 +414,6 @@ func QRepFlowWorkflow( config *protos.QRepConfig, state *protos.QRepFlowState, ) error { - //fmt.Printf("\n*****************************QRepFlowWorkflow") // The structure of this workflow is as follows: // 1. Start the loop to continuously run the replication flow. @@ -557,7 +552,6 @@ func QRepPartitionWorkflow( partitions *protos.QRepPartitionBatch, runUUID string, ) error { - //fmt.Printf("\n*****************************QRepPartitionWorkflow") ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) q := NewQRepPartitionFlowExecution(ctx, config, runUUID)