diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index bfe8939f5a..fc3d747eba 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -468,8 +468,6 @@ func (a *FlowableActivity) ReplayTableSchemaDeltas( // SetupQRepMetadataTables sets up the metadata tables for QReplication. func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error { - fmt.Println("*************** in flowable.SetupQRepMetadataTables") - conn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer) if err != nil { return fmt.Errorf("failed to get connector: %w", err) @@ -485,21 +483,16 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, last *protos.QRepPartition, runUUID string, ) (*protos.QRepParitionResult, error) { - fmt.Printf("\n******************************* in GetQRepPartitions 1") - srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { return nil, fmt.Errorf("failed to get qrep pull connector: %w", err) } - fmt.Printf("\n******************************* in GetQRepPartitions 2") defer connectors.CloseConnector(srcConn) shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName) }) - fmt.Printf("\n******************************* in GetQRepPartitions 3") - defer func() { shutdown <- struct{}{} }() @@ -509,8 +502,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, return nil, fmt.Errorf("failed to get partitions from source: %w", err) } - fmt.Printf("\n******************************* in GetQRepPartitions 4") - if len(partitions) > 0 { err = monitoring.InitializeQRepRun( ctx, @@ -524,8 +515,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, } } - fmt.Printf("\n******************************* in GetQRepPartitions 5") - return &protos.QRepParitionResult{ Partitions: partitions, }, nil @@ -537,7 +526,6 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, partitions *protos.QRepPartitionBatch, runUUID string, ) error { - fmt.Printf("\n ************************ in ReplicateQRepPartitions") err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID) if err != nil { return fmt.Errorf("failed to update start time for qrep run: %w", err) @@ -567,7 +555,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, partition *protos.QRepPartition, runUUID string, ) error { - fmt.Printf("\n******************************* in replicateQRepPartition") err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now()) if err != nil { @@ -821,7 +808,6 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition, ) error { - fmt.Printf("\n******************************* in QRepWaitUntilNewRows") if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil { return nil } @@ -912,7 +898,6 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, partition *protos.QRepPartition, runUUID string, ) (int64, error) { - fmt.Printf("\n******************************** in ReplicateXminPartition") startTime := time.Now() srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 61cc5407bb..c4a7c1a99b 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -268,7 +268,6 @@ func (h *FlowRequestHandler) CreateQRepFlow( } else { workflowFn = peerflow.QRepFlowWorkflow } - fmt.Printf("\n *********** cfg %v", cfg) _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state) if err != nil { slog.Error("unable to start QRepFlow workflow", @@ -625,10 +624,6 @@ func (h *FlowRequestHandler) CreatePeer( chConfig := chConfigObject.ClickhouseConfig encodedConfig, encodingErr = proto.Marshal(chConfig) - fmt.Printf("config %v", config) - fmt.Printf("chConfigObject %v", chConfigObject) - fmt.Printf("chConfig %v", chConfig) - fmt.Printf("encodedConfig %v", encodedConfig) case protos.DBType_BIGQUERY: bqConfigObject, ok := config.(*protos.Peer_BigqueryConfig) if !ok { diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 9fa32efc7f..b01c86ef2c 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -122,7 +122,6 @@ func NewClickhouseConnector(ctx context.Context, // if err != nil { // return nil, err // } - fmt.Println("*********************in NewClickhouseConnector") database, err := connect(ctx, clickhouseProtoConfig) // snowflakeConfig := gosnowfla ke.Config{ // Account: snowflakeProtoConfig.AccountId, @@ -167,7 +166,6 @@ func NewClickhouseConnector(ctx context.Context, } func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) { - fmt.Println("***********************start connect") dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", //&database=%s config.Host, config.Port, config.User, config.Password) //, config.Database //dsn := "tcp://host.docker.internal:9009?username=clickhouse&password=clickhouse" //&database=desti" @@ -185,18 +183,6 @@ func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, err return nil, err } - fmt.Println("**********************successfully connected 1") - - _, err2 := conn.Exec("select * from tasks") - if err2 != nil { - fmt.Println("****************err in querying after connecting 1", err2) - } - if err2 == nil { - fmt.Println("*****************select after connecting done 1") - } - - fmt.Println("***********************end connect") - return conn, nil } @@ -620,7 +606,6 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( rawTableIdentifier), } avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c) - fmt.Printf("\n*************** in Clickhouse syncRecordsViaAvro 1 %+v", avroSyncer) destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier) if err != nil { return nil, err diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 7ba7616b47..91bab75040 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -26,7 +26,6 @@ func (c *ClickhouseConnector) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, ) (int, error) { - fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords %+v %+v", config, partition) // Ensure the destination table is available. destTable := config.DestinationTableIdentifier flowLog := slog.Group("sync_metadata", @@ -34,28 +33,16 @@ func (c *ClickhouseConnector) SyncQRepRecords( slog.String("destinationTable", destTable), ) - fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 2") tblSchema, err := c.getTableSchema(destTable) - fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 2.5 %v", err) if err != nil { return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err) } - fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 2.7 %v", flowLog) //c.logger.Info("Called QRep sync function and obtained table schema", flowLog) - fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 2.8") - fmt.Printf("*********** Called QRep sync function and obtained table schema") - - fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 3 %v", tblSchema) - fmt.Print("\n destination_peer %+v", config.DestinationPeer.Config) done, err := c.isPartitionSynced(partition.PartitionId) if err != nil { return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) } - fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 4") - - fmt.Printf("*********** Called QRep sync function and obtained table schema") - if done { c.logger.Info("Partition has already been synced", flowLog) return 0, nil @@ -63,8 +50,6 @@ func (c *ClickhouseConnector) SyncQRepRecords( avroSync := NewClickhouseAvroSyncMethod(config, c) - fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 5 stream: %+v ", stream) - return avroSync.SyncQRepRecords(config, partition, tblSchema, stream) } @@ -100,61 +85,45 @@ func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnTyp LIMIT 0 `, tableName) - fmt.Printf("\n*************** in Clickhouse qrep.getTableSchema %s", queryString) - rows, err := c.database.Query(queryString) if err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) } defer rows.Close() - fmt.Printf("\n*************** in Clickhouse qrep.getTableSchema 2") - columnTypes, err := rows.ColumnTypes() if err != nil { return nil, fmt.Errorf("failed to get column types: %w", err) } - fmt.Printf("\n*************** in Clickhouse qrep.getTableSchema 3 %v %+v %+v", len(columnTypes), *columnTypes[0], *&columnTypes[1]) - return columnTypes, nil } func (c *ClickhouseConnector) isPartitionSynced(partitionID string) (bool, error) { //nolint:gosec - fmt.Printf("\n*************** in Clickhouse qrep.isPartitionSynced 1 %s", partitionID) queryString := fmt.Sprintf(` SELECT COUNT(*) FROM %s.%s WHERE partitionID = '%s' `, "desti", qRepMetadataTableName, partitionID) //c.metadataSchema - fmt.Printf("\n*************** in Clickhouse qrep.isPartitionSynced 2 %s", queryString) - row := c.database.QueryRow(queryString) - fmt.Printf("\n*************** in Clickhouse qrep.isPartitionSynced 3 %+v", row) - var count int if err := row.Scan(&count); err != nil { - fmt.Printf("\n**************************** failed to execute query: %v", err) return false, fmt.Errorf("failed to execute query: %w", err) } - fmt.Printf("\n************ in Clickhouse qrep.isPartitionSynced 4 %v", count) - //return count.Int64 > 0, nil return count > 0, nil } func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { - fmt.Println("***********************SetupQRepMetadataTabless 1") // NOTE that Snowflake does not support transactional DDL //createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil) // if err != nil { // return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err) // } - fmt.Println("***********************SetupQRepMetadataTabless 2") // in case we return after error, ensure transaction is rolled back // defer func() { // deferErr := createMetadataTablesTx.Rollback() @@ -164,19 +133,13 @@ func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) // } // }() //err = c.createPeerDBInternalSchema(createMetadataTablesTx) - fmt.Println("***********************SetupQRepMetadataTabless 3") // if err != nil { // return err // } err := c.createQRepMetadataTable() //(createMetadataTablesTx) if err != nil { - fmt.Printf("\n*****************error in creating qrep metadata table: %v\n", err) return err } - fmt.Println("\n***********************SetupQRepMetadataTabless 4") - stageName := c.getStageNameForJob(config.FlowJobName) - - fmt.Println("\n***********************Created StageName", stageName) //not needed for clickhouse // err = c.createStage(stageName, config) @@ -201,7 +164,6 @@ func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) func (c *ClickhouseConnector) createQRepMetadataTable() error { //createMetadataTableTx *sql.Tx // Define the schema - fmt.Printf("\n************* in createQRepMetadataTable") schemaStatement := ` CREATE TABLE IF NOT EXISTS %s.%s ( flowJobName String, @@ -214,47 +176,38 @@ func (c *ClickhouseConnector) createQRepMetadataTable() error { //createMetadata ` queryString := fmt.Sprintf(schemaStatement, "desti", qRepMetadataTableName) //c.metadataSchema, - fmt.Printf("\n************* queryString to create table %s", queryString) - //_, err := createMetadataTableTx.Exec(queryString) _, err := c.database.Exec(queryString) //_, err := c.database.Exec("select * from tasks;") if err != nil { - fmt.Printf("\n************************ failed to create table %s %v", err.Error(), err) c.logger.Error(fmt.Sprintf("failed to create table %s.%s", c.metadataSchema, qRepMetadataTableName), slog.Any("error", err)) return fmt.Errorf("failed to create table %s.%s: %w", c.metadataSchema, qRepMetadataTableName, err) } //c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName)) - fmt.Printf("\n************* created table %s", qRepMetadataTableName) return nil } func (c *ClickhouseConnector) createStage(stageName string, config *protos.QRepConfig) error { - fmt.Printf("\n***********************createStage1 %s, stageName: %s", stageName, config.StagingPath) var createStageStmt string if strings.HasPrefix(config.StagingPath, "s3://") { - fmt.Printf("\n***********************createStage 2") stmt, err := c.createExternalStage(stageName, config) if err != nil { return err } createStageStmt = stmt } else { - fmt.Printf("\n***********************createStage 3") stageStatement := ` CREATE OR REPLACE STAGE %s FILE_FORMAT = (TYPE = AVRO); ` createStageStmt = fmt.Sprintf(stageStatement, stageName) } - fmt.Printf("\n***********************createStage 4 %s", createStageStmt) // Execute the query _, err := c.database.Exec(createStageStmt) - fmt.Printf("\n***********************createStage 5 err: %v", err) if err != nil { c.logger.Error(fmt.Sprintf("failed to create stage %s", stageName), slog.Any("error", err)) return fmt.Errorf("failed to create stage %s: %w", stageName, err) @@ -416,7 +369,5 @@ func (c *ClickhouseConnector) dropStage(stagingPath string, job string) error { } func (c *ClickhouseConnector) getStageNameForJob(job string) string { - fmt.Println("\n***********************getStageNameForJob %s", job) - fmt.Println("\n*********************%s.peerdb_stage_%s", c.metadataSchema, job) return fmt.Sprintf("%s.peerdb_stage_%s", c.metadataSchema, job) } diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 271103c2f4..838c830692 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -100,21 +100,14 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( dstTableSchema []*sql.ColumnType, stream *model.QRecordStream, ) (int, error) { - fmt.Printf("\n************ in ch.SyncQRepRecords 1") - partitionLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) startTime := time.Now() dstTableName := config.DestinationTableIdentifier - fmt.Printf("\n************ in ch.SyncQRepRecords 2 %+v", stream.Schema) - schema, err := stream.Schema() - fmt.Printf("\n******************************** ************ in ch.SyncQRepRecords 2.1 %+v %+v %+v", schema, err, partitionLog) if err != nil { - fmt.Printf("\n******************************** ************ in ch.SyncQRepRecords 2.5 error %v", err) return -1, fmt.Errorf("failed to get schema from stream: %w", err) } //s.connector.logger.Info("sync function called and schema acquired", partitionLog) commenting this as this is throwing an error - fmt.Printf("\n************ in ch.SyncQRepRecords 3") // err = s.addMissingColumns( // config.FlowJobName, // schema, @@ -126,22 +119,18 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( // return 0, err // } - fmt.Printf("\n************ in ch.SyncQRepRecords 4") - avroSchema, err := s.getAvroSchema(dstTableName, schema) if err != nil { return 0, err } - fmt.Printf("\n************ in ch.SyncQRepRecords 5") - avroFile, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName) if err != nil { return 0, err } - //defer avroFile.Cleanup() - fmt.Printf("\n************ in ch.SyncQRepRecords 6 ****%+v", avroFile) + //TODO: avro file cleanup + //defer avroFile.Cleanup() avroFileUrl := "https://avro-clickhouse.s3.us-east-2.amazonaws.com" + avroFile.FilePath @@ -153,15 +142,11 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( query := fmt.Sprintf("INSERT INTO desti.todos SELECT * FROM s3('%s','%s','%s', 'Avro')", avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) - fmt.Printf("*******************in ch.SyncQRepRecords 6.0 query for uploading avro to clickhouse %+v", query) _, err = s.connector.database.Exec(query) if err != nil { - fmt.Printf("*******************in ch.SyncQRepRecords 6.1 error in uploading avro to clickhouse %+v", err) return 0, err } - fmt.Printf("*******************in ch.SyncQRepRecords 6.2 success uploading avro to clickhouse") - // err = s.UploadAvroFile(avroFile.FilePath) // if err != nil { // return 0, err @@ -174,7 +159,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( // return 0, err // } - fmt.Printf("\n************ in ch.SyncQRepRecords 7 succefully uploaded avro file") //s.connector.logger.Info("Put file to stage in Avro sync for snowflake", partitionLog) err = s.insertMetadata(partition, config.FlowJobName, startTime) @@ -182,8 +166,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( return -1, err } - fmt.Printf("\n************ in ch.SyncQRepRecords 8") - activity.RecordHeartbeat(s.connector.ctx, "finished syncing records") return avroFile.NumRecords, nil @@ -196,12 +178,10 @@ func (s *ClickhouseAvroSyncMethod) addMissingColumns( dstTableName string, partition *protos.QRepPartition, ) error { - fmt.Printf("\n************ in ch.addMissingColumns 1") partitionLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) // check if avro schema has additional columns compared to destination table // if so, we need to add those columns to the destination table colsToTypes := map[string]qvalue.QValueKind{} - fmt.Printf("\n************ in ch.addMissingColumns 2") for _, col := range schema.Fields { hasColumn := false // check ignoring case @@ -218,14 +198,12 @@ func (s *ClickhouseAvroSyncMethod) addMissingColumns( colsToTypes[col.Name] = col.Type } } - fmt.Printf("\n************ in ch.addMissingColumns 3") if len(colsToTypes) > 0 { tx, err := s.connector.database.Begin() if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } - fmt.Printf("\n************ in ch.addMissingColumns 4") for colName, colType := range colsToTypes { sfColType, err := colType.ToDWHColumnType(qvalue.QDWHTypeClickhouse) if err != nil { @@ -242,14 +220,11 @@ func (s *ClickhouseAvroSyncMethod) addMissingColumns( return fmt.Errorf("failed to alter destination table: %w", err) } } - fmt.Printf("\n************ in ch.addMissingColumns 5") if err := tx.Commit(); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } - fmt.Printf("\n************ in ch.addMissingColumns 6") - s.connector.logger.Info("successfully added missing columns to destination table "+ dstTableName, partitionLog) } else { @@ -263,15 +238,11 @@ func (s *ClickhouseAvroSyncMethod) getAvroSchema( dstTableName string, schema *model.QRecordSchema, ) (*model.QRecordAvroSchemaDefinition, error) { - fmt.Printf("\n************ in ch.getAvroSchema 1") avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema) - fmt.Printf("\n************ in ch.getAvroSchema 2") if err != nil { return nil, fmt.Errorf("failed to define Avro schema: %w", err) } - fmt.Printf("\n************ in ch.getAvroSchema 3") //s.connector.logger.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema)) - fmt.Printf("\n************ in ch.getAvroSchema 4") return avroSchema, nil } @@ -309,7 +280,6 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( // return avroFile, nil // } else if strings.HasPrefix(s.config.StagingPath, "s3://") { - fmt.Printf("\n************ in ch.writeToAvroFile 3") ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse) s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) @@ -318,18 +288,13 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( } s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, s.config.FlowJobName, partitionID) - fmt.Printf("\n************ in ch.writeToAvroFile 3.1 s3AvroFileKey: %v", s3AvroFileKey) // s.connector.logger.Info("OCF: Writing records to S3", // slog.String(string(shared.PartitionIDKey), partitionID)) //avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) avroFile, err := ocfWriter.WriteRecordsToS3("avro-clickhouse", s3AvroFileKey, utils.S3PeerCredentials{}) ///utils.S3PeerCredentials{}) if err != nil { - fmt.Printf("\n************ in ch.writeToAvroFile 3.2 err: %+v", err) - return nil, fmt.Errorf("failed to write records to S3: %w", err) } - fmt.Printf("\n************ in ch.writeToAvroFile 4 uploaded file to s3 avroFile: %+v", avroFile) - return avroFile, nil //} //return nil, fmt.Errorf("unsupported staging path: %s", s.config.StagingPath) @@ -444,29 +409,22 @@ func (s *ClickhouseAvroSyncMethod) insertMetadata( flowJobName string, startTime time.Time, ) error { - fmt.Printf("\n************ qrep_avro_sync.go ch.insertMetadata 1 %+v %+v %+v", partition, flowJobName, startTime) partitionLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) - fmt.Printf("\n************ qrep_avro_sync.go ch.insertMetadata 2 %+v %+v", insertMetadataStmt, err) if err != nil { s.connector.logger.Error("failed to create metadata insert statement", slog.Any("error", err), partitionLog) return fmt.Errorf("failed to create metadata insert statement: %v", err) } - fmt.Printf("\n************ qrep_avro_sync.go ch.insertMetadata 3 s: %+v", *s.connector) if _, err := s.connector.database.Exec(insertMetadataStmt); err != nil { - fmt.Printf("\n************ qrep_avro_sync.go ch.insertMetadata 4 %+v", err) //s.connector.logger.Error("failed to execute metadata insert statement "+insertMetadataStmt, //slog.Any("error", err), partitionLog) return fmt.Errorf("failed to execute metadata insert statement: %v", err) } - fmt.Printf("\n************ qrep_avro_sync.go ch.insertMetadata 5 %+v", err) - //s.connector.logger.Info("inserted metadata for partition", partitionLog) - fmt.Printf("\n************ qrep_avro_sync.go ch.insertMetadata 6") return nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index d1382e6b46..352ef8d06f 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -205,8 +205,6 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon // println("*********** error in stringifying in GetQRepSyncConnector", err) // } - fmt.Println("************ in GetQRepSyncConnector") - inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: @@ -218,10 +216,8 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon case *protos.Peer_S3Config: return conns3.NewS3Connector(ctx, config.GetS3Config()) case *protos.Peer_ClickhouseConfig: - fmt.Println("************** in GetQRepSyncConnector: clickhouse matched") return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()) default: - fmt.Println("********************* no match") //println(inner, inner.(type), *protos.Peer_ClickhouseConfig) return nil, ErrUnsupportedFunctionality } @@ -229,7 +225,6 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { inner := peer.Type - fmt.Println("************************************** peer types", inner, protos.DBType_CLICKHOUSE) switch inner { case protos.DBType_POSTGRES: pgConfig := peer.GetPostgresConfig() @@ -281,7 +276,6 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { func GetQRepConsolidateConnector(ctx context.Context, config *protos.Peer, ) (QRepConsolidateConnector, error) { - fmt.Println("************* in GetQRepConsolidateConnector") inner := config.Config switch inner.(type) { case *protos.Peer_SnowflakeConfig: diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index a75d1faee4..fdeb79c7ea 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -25,8 +25,6 @@ func (c *PostgresConnector) GetQRepPartitions( last *protos.QRepPartition, ) ([]*protos.QRepPartition, error) { - fmt.Printf("\n******************************* in Postgres.GetQRepPartitions 1") - if config.WatermarkColumn == "" { // if no watermark column is specified, return a single partition partition := &protos.QRepPartition{ @@ -37,8 +35,6 @@ func (c *PostgresConnector) GetQRepPartitions( return []*protos.QRepPartition{partition}, nil } - fmt.Printf("\n******************************* in Postgres.GetQRepPartitions 2") - // begin a transaction tx, err := c.pool.BeginTx(c.ctx, pgx.TxOptions{ AccessMode: pgx.ReadOnly, @@ -47,7 +43,6 @@ func (c *PostgresConnector) GetQRepPartitions( if err != nil { return nil, fmt.Errorf("failed to begin transaction: %w", err) } - fmt.Printf("\n******************************* in Postgres.GetQRepPartitions 3") defer func() { deferErr := tx.Rollback(c.ctx) if deferErr != pgx.ErrTxClosed && deferErr != nil { @@ -59,7 +54,6 @@ func (c *PostgresConnector) GetQRepPartitions( if err != nil { return nil, fmt.Errorf("failed to set transaction snapshot: %w", err) } - fmt.Printf("\n******************************* in Postgres.GetQRepPartitions 3") // TODO re-enable locking of the watermark table. // // lock the table while we get the partitions. // lockQuery := fmt.Sprintf("LOCK %s IN EXCLUSIVE MODE", config.WatermarkTable) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index b808188b14..400dac861b 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -120,25 +120,18 @@ func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error } func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (int, error) { - fmt.Printf("\n************ avro_writer.go writeRecordsToOCFWriter 1") schema, err := p.stream.Schema() if err != nil { slog.Error("failed to get schema from stream", slog.Any("error", err)) return 0, fmt.Errorf("failed to get schema from stream: %w", err) } - fmt.Printf("\n************ avro_writer.go writeRecordsToOCFWriter 2 %+v", schema) - colNames := schema.GetColumnNames() - fmt.Printf("\n************ avro_writer.go writeRecordsToOCFWriter 3 colnames: %+v", colNames) - var numRows uber_atomic.Uint32 numRows.Store(0) - fmt.Printf("\n************ avro_writer.go writeRecordsToOCFWriter 5") - if p.ctx != nil { shutdown := utils.HeartbeatRoutine(p.ctx, 30*time.Second, func() string { written := numRows.Load() @@ -149,15 +142,12 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( shutdown <- struct{}{} }() } - fmt.Printf("\n************ avro_writer.go writeRecordsToOCFWriter 6 stream.Records %+v", p.stream.Records) for qRecordOrErr := range p.stream.Records { - fmt.Printf("\n************ avro_writer.go writeRecordsToOCFWriter 7 %+v", qRecordOrErr) if qRecordOrErr.Err != nil { slog.Error("[avro] failed to get record from stream", slog.Any("error", qRecordOrErr.Err)) return 0, fmt.Errorf("[avro] failed to get record from stream: %w", qRecordOrErr.Err) } - fmt.Printf("\n************ avro_writer.go writeRecordsToOCFWriter 7.1 Record %+v", qRecordOrErr.Record) avroConverter := model.NewQRecordAvroConverter( qRecordOrErr.Record, p.targetDWH, @@ -166,7 +156,6 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( ) avroMap, err := avroConverter.Convert() - fmt.Printf("\n************ avro_writer.go writeRecordsToOCFWriter 7.2 avroMap %+v", avroMap) if err != nil { slog.Error("failed to convert QRecord to Avro compatible map: ", slog.Any("error", err)) @@ -179,7 +168,6 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( return 0, fmt.Errorf("failed to write record to OCF: %w", err) } - fmt.Printf("\n************ avro_writer.go writeRecordsToOCFWriter 7.3") numRows.Inc() } @@ -249,7 +237,6 @@ func (p *peerDBOCFWriter) WriteRecordsToAvroFile(filePath string) (*AvroFile, er } numRecords, err := p.WriteOCF(file) - fmt.Printf("********************* avro_writer.go WriteRecordsToAvroFile numRecords %v", numRecords) if err != nil { return nil, fmt.Errorf("failed to write records to temporary Avro file: %w", err) } diff --git a/flow/connectors/utils/partition/partition.go b/flow/connectors/utils/partition/partition.go index 91a34646b9..cb2f326a66 100644 --- a/flow/connectors/utils/partition/partition.go +++ b/flow/connectors/utils/partition/partition.go @@ -147,7 +147,6 @@ func NewPartitionHelper() *PartitionHelper { } func (p *PartitionHelper) AddPartition(start interface{}, end interface{}) error { - fmt.Printf("\n******************************* in AddPartition 1") slog.Debug(fmt.Sprintf("adding partition - start: %v, end: %v", start, end)) // Skip partition if it's fully contained within the previous one diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index d277658137..e965c21210 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -71,10 +71,8 @@ func GetAvroSchemaDefinition( dstTableName string, qRecordSchema *QRecordSchema, ) (*QRecordAvroSchemaDefinition, error) { - fmt.Printf("\n************ in conversion_avro.go GetAvroSchemaDefinition 1") avroFields := []QRecordAvroField{} nullableFields := make(map[string]struct{}) - fmt.Printf("\n************ in conversion_avro.go GetAvroSchemaDefinition 2") for _, qField := range qRecordSchema.Fields { avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, qField.Nullable) if err != nil { @@ -93,20 +91,16 @@ func GetAvroSchemaDefinition( Type: consolidatedType, }) } - fmt.Printf("\n************ in conversion_avro.go GetAvroSchemaDefinition 3") avroSchema := QRecordAvroSchema{ Type: "record", Name: dstTableName, Fields: avroFields, } - fmt.Printf("\n************ in conversion_avro.go GetAvroSchemaDefinition 4") avroSchemaJSON, err := json.Marshal(avroSchema) - fmt.Printf("\n************ in conversion_avro.go GetAvroSchemaDefinition 5") if err != nil { return nil, fmt.Errorf("failed to marshal Avro schema to JSON: %v", err) } - fmt.Printf("\n************ in conversion_avro.go GetAvroSchemaDefinition 6") return &QRecordAvroSchemaDefinition{ Schema: string(avroSchemaJSON), NullableFields: nullableFields, diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index f0207f0fbf..af8346d633 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -55,17 +55,12 @@ func NewQRecordStream(buffer int) *QRecordStream { } func (s *QRecordStream) Schema() (*QRecordSchema, error) { - fmt.Printf("\n************ in QRecordStream.Schema 1") if s.schemaCache != nil { return s.schemaCache, nil } - fmt.Printf("\n************ in QRecordStream.Schema 2") - schemaOrError := <-s.schema - fmt.Printf("\n************ in QRecordStream.Schema 3 %+v", schemaOrError) s.schemaCache = schemaOrError.Schema - fmt.Printf("\n************ in QRecordStream.Schema 4 %+v %+v", schemaOrError.Schema, schemaOrError.Err) return schemaOrError.Schema, schemaOrError.Err } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 55c6bb3bbb..37cdd4284c 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -84,7 +84,6 @@ func NewQRepPartitionFlowExecution(ctx workflow.Context, // SetupMetadataTables creates the metadata tables for query based replication. func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error { - fmt.Println("***************setting up metadata tables for qrep flow - ", q.config.FlowJobName) q.logger.Info("setting up metadata tables for qrep flow - ", q.config.FlowJobName) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -165,7 +164,6 @@ func (q *QRepFlowExecution) GetPartitions( func (q *QRepPartitionFlowExecution) ReplicatePartitions(ctx workflow.Context, partitions *protos.QRepPartitionBatch, ) error { - fmt.Printf("\n******************************* in qrep_flow.go ReplicatePartitions ") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 24 * 5 * time.Hour, HeartbeatTimeout: 5 * time.Minute, @@ -385,8 +383,6 @@ func QRepFlowWorkflow( // 4. Wait for all the workflows to complete. // 5. Sleep for a while and repeat the loop. - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 1") - ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := workflow.GetLogger(ctx) @@ -395,8 +391,6 @@ func QRepFlowWorkflow( maxParallelWorkers = int(config.MaxParallelWorkers) } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 2") - // register a query to get the number of partitions processed err := workflow.SetQueryHandler(ctx, "num-partitions-processed", func() (uint64, error) { return state.NumPartitionsProcessed, nil @@ -405,8 +399,6 @@ func QRepFlowWorkflow( return fmt.Errorf("failed to register query handler: %w", err) } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 3") - // get qrep run uuid via side-effect runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { return uuid.New().String() @@ -417,8 +409,6 @@ func QRepFlowWorkflow( return fmt.Errorf("failed to get run uuid: %w", err) } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 4") - q := NewQRepFlowExecution(ctx, config, runUUID) err = q.SetupWatermarkTableOnDestination(ctx) @@ -426,8 +416,6 @@ func QRepFlowWorkflow( return fmt.Errorf("failed to setup watermark table: %w", err) } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 4") - err = q.SetupMetadataTables(ctx) if err != nil { return fmt.Errorf("failed to setup metadata tables: %w", err) @@ -439,42 +427,31 @@ func QRepFlowWorkflow( return err } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 5") - logger.Info("fetching partitions to replicate for peer flow - ", config.FlowJobName) partitions, err := q.GetPartitions(ctx, state.LastPartition) if err != nil { return fmt.Errorf("failed to get partitions: %w", err) } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 6") - logger.Info("partitions to replicate - ", len(partitions.Partitions)) if err = q.processPartitions(ctx, maxParallelWorkers, partitions.Partitions); err != nil { return err } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 7") logger.Info("consolidating partitions for peer flow - ", config.FlowJobName) if err = q.consolidatePartitions(ctx); err != nil { return err } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 8") - if config.InitialCopyOnly { q.logger.Info("initial copy completed for peer flow - ", config.FlowJobName) return nil } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 9") - err = q.handleTableRenameForResync(ctx, state) if err != nil { return err } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 10") - q.logger.Info("partitions processed - ", len(partitions.Partitions)) state.NumPartitionsProcessed += uint64(len(partitions.Partitions)) @@ -482,8 +459,6 @@ func QRepFlowWorkflow( state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1] } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 11") - if !state.DisableWaitForNewRows { // sleep for a while and continue the workflow err = q.waitForNewRows(ctx, state.LastPartition) @@ -491,7 +466,6 @@ func QRepFlowWorkflow( return err } } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 12") workflow.GetLogger(ctx).Info("Continuing as new workflow", "Last Partition", state.LastPartition, @@ -514,14 +488,11 @@ func QRepFlowWorkflow( } } } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 13") if q.activeSignal == shared.ShutdownSignal { q.logger.Info("terminating workflow - ", config.FlowJobName) return nil } - fmt.Printf("\n******************************* in qrep_flow.go QRepFlowWorkflow 14") - // Continue the workflow with new state return workflow.NewContinueAsNewError(ctx, QRepFlowWorkflow, config, state) }