Skip to content

Commit

Permalink
flow cleanup, remove logs
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 3, 2024
1 parent d66fac8 commit e2c1294
Show file tree
Hide file tree
Showing 12 changed files with 2 additions and 194 deletions.
15 changes: 0 additions & 15 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,6 @@ func (a *FlowableActivity) ReplayTableSchemaDeltas(

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error {
fmt.Println("*************** in flowable.SetupQRepMetadataTables")

conn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer)
if err != nil {
return fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -485,21 +483,16 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
last *protos.QRepPartition,
runUUID string,
) (*protos.QRepParitionResult, error) {
fmt.Printf("\n******************************* in GetQRepPartitions 1")

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return nil, fmt.Errorf("failed to get qrep pull connector: %w", err)
}
fmt.Printf("\n******************************* in GetQRepPartitions 2")
defer connectors.CloseConnector(srcConn)

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
})

fmt.Printf("\n******************************* in GetQRepPartitions 3")

defer func() {
shutdown <- struct{}{}
}()
Expand All @@ -509,8 +502,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
}

fmt.Printf("\n******************************* in GetQRepPartitions 4")

if len(partitions) > 0 {
err = monitoring.InitializeQRepRun(
ctx,
Expand All @@ -524,8 +515,6 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}
}

fmt.Printf("\n******************************* in GetQRepPartitions 5")

return &protos.QRepParitionResult{
Partitions: partitions,
}, nil
Expand All @@ -537,7 +526,6 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
fmt.Printf("\n ************************ in ReplicateQRepPartitions")
err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID)
if err != nil {
return fmt.Errorf("failed to update start time for qrep run: %w", err)
Expand Down Expand Up @@ -567,7 +555,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
fmt.Printf("\n******************************* in replicateQRepPartition")

err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
Expand Down Expand Up @@ -821,7 +808,6 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
config *protos.QRepConfig, last *protos.QRepPartition,
) error {
fmt.Printf("\n******************************* in QRepWaitUntilNewRows")
if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil {
return nil
}
Expand Down Expand Up @@ -912,7 +898,6 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) (int64, error) {
fmt.Printf("\n******************************** in ReplicateXminPartition")
startTime := time.Now()
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func (h *FlowRequestHandler) CreateQRepFlow(
} else {
workflowFn = peerflow.QRepFlowWorkflow
}
fmt.Printf("\n *********** cfg %v", cfg)
_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
Expand Down Expand Up @@ -625,10 +624,6 @@ func (h *FlowRequestHandler) CreatePeer(

chConfig := chConfigObject.ClickhouseConfig
encodedConfig, encodingErr = proto.Marshal(chConfig)
fmt.Printf("config %v", config)
fmt.Printf("chConfigObject %v", chConfigObject)
fmt.Printf("chConfig %v", chConfig)
fmt.Printf("encodedConfig %v", encodedConfig)
case protos.DBType_BIGQUERY:
bqConfigObject, ok := config.(*protos.Peer_BigqueryConfig)
if !ok {
Expand Down
15 changes: 0 additions & 15 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func NewClickhouseConnector(ctx context.Context,
// if err != nil {
// return nil, err
// }
fmt.Println("*********************in NewClickhouseConnector")
database, err := connect(ctx, clickhouseProtoConfig)
// snowflakeConfig := gosnowfla ke.Config{
// Account: snowflakeProtoConfig.AccountId,
Expand Down Expand Up @@ -167,7 +166,6 @@ func NewClickhouseConnector(ctx context.Context,
}

func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) {
fmt.Println("***********************start connect")
dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", //&database=%s
config.Host, config.Port, config.User, config.Password) //, config.Database
//dsn := "tcp://host.docker.internal:9009?username=clickhouse&password=clickhouse" //&database=desti"
Expand All @@ -185,18 +183,6 @@ func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, err
return nil, err
}

fmt.Println("**********************successfully connected 1")

_, err2 := conn.Exec("select * from tasks")
if err2 != nil {
fmt.Println("****************err in querying after connecting 1", err2)
}
if err2 == nil {
fmt.Println("*****************select after connecting done 1")
}

fmt.Println("***********************end connect")

return conn, nil
}

Expand Down Expand Up @@ -620,7 +606,6 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
rawTableIdentifier),
}
avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c)
fmt.Printf("\n*************** in Clickhouse syncRecordsViaAvro 1 %+v", avroSyncer)
destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier)
if err != nil {
return nil, err
Expand Down
49 changes: 0 additions & 49 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,30 @@ func (c *ClickhouseConnector) SyncQRepRecords(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords %+v %+v", config, partition)
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
flowLog := slog.Group("sync_metadata",
slog.String(string(shared.PartitionIDKey), partition.PartitionId),
slog.String("destinationTable", destTable),
)

fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 2")
tblSchema, err := c.getTableSchema(destTable)
fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 2.5 %v", err)
if err != nil {
return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err)
}
fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 2.7 %v", flowLog)
//c.logger.Info("Called QRep sync function and obtained table schema", flowLog)
fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 2.8")
fmt.Printf("*********** Called QRep sync function and obtained table schema")

fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 3 %v", tblSchema)
fmt.Print("\n destination_peer %+v", config.DestinationPeer.Config)
done, err := c.isPartitionSynced(partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 4")

fmt.Printf("*********** Called QRep sync function and obtained table schema")

if done {
c.logger.Info("Partition has already been synced", flowLog)
return 0, nil
}

avroSync := NewClickhouseAvroSyncMethod(config, c)

fmt.Printf("\n*************** in Clickhouse qrep.SyncQRepRecords 5 stream: %+v ", stream)

return avroSync.SyncQRepRecords(config, partition, tblSchema, stream)
}

Expand Down Expand Up @@ -100,61 +85,45 @@ func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnTyp
LIMIT 0
`, tableName)

fmt.Printf("\n*************** in Clickhouse qrep.getTableSchema %s", queryString)

rows, err := c.database.Query(queryString)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()

fmt.Printf("\n*************** in Clickhouse qrep.getTableSchema 2")

columnTypes, err := rows.ColumnTypes()
if err != nil {
return nil, fmt.Errorf("failed to get column types: %w", err)
}

fmt.Printf("\n*************** in Clickhouse qrep.getTableSchema 3 %v %+v %+v", len(columnTypes), *columnTypes[0], *&columnTypes[1])

return columnTypes, nil
}

func (c *ClickhouseConnector) isPartitionSynced(partitionID string) (bool, error) {
//nolint:gosec
fmt.Printf("\n*************** in Clickhouse qrep.isPartitionSynced 1 %s", partitionID)
queryString := fmt.Sprintf(`
SELECT COUNT(*)
FROM %s.%s
WHERE partitionID = '%s'
`, "desti", qRepMetadataTableName, partitionID) //c.metadataSchema

fmt.Printf("\n*************** in Clickhouse qrep.isPartitionSynced 2 %s", queryString)

row := c.database.QueryRow(queryString)

fmt.Printf("\n*************** in Clickhouse qrep.isPartitionSynced 3 %+v", row)

var count int
if err := row.Scan(&count); err != nil {
fmt.Printf("\n**************************** failed to execute query: %v", err)
return false, fmt.Errorf("failed to execute query: %w", err)
}

fmt.Printf("\n************ in Clickhouse qrep.isPartitionSynced 4 %v", count)

//return count.Int64 > 0, nil
return count > 0, nil
}

func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error {
fmt.Println("***********************SetupQRepMetadataTabless 1")
// NOTE that Snowflake does not support transactional DDL
//createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil)
// if err != nil {
// return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err)
// }
fmt.Println("***********************SetupQRepMetadataTabless 2")
// in case we return after error, ensure transaction is rolled back
// defer func() {
// deferErr := createMetadataTablesTx.Rollback()
Expand All @@ -164,19 +133,13 @@ func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig)
// }
// }()
//err = c.createPeerDBInternalSchema(createMetadataTablesTx)
fmt.Println("***********************SetupQRepMetadataTabless 3")
// if err != nil {
// return err
// }
err := c.createQRepMetadataTable() //(createMetadataTablesTx)
if err != nil {
fmt.Printf("\n*****************error in creating qrep metadata table: %v\n", err)
return err
}
fmt.Println("\n***********************SetupQRepMetadataTabless 4")
stageName := c.getStageNameForJob(config.FlowJobName)

fmt.Println("\n***********************Created StageName", stageName)

//not needed for clickhouse
// err = c.createStage(stageName, config)
Expand All @@ -201,7 +164,6 @@ func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig)

func (c *ClickhouseConnector) createQRepMetadataTable() error { //createMetadataTableTx *sql.Tx
// Define the schema
fmt.Printf("\n************* in createQRepMetadataTable")
schemaStatement := `
CREATE TABLE IF NOT EXISTS %s.%s (
flowJobName String,
Expand All @@ -214,47 +176,38 @@ func (c *ClickhouseConnector) createQRepMetadataTable() error { //createMetadata
`
queryString := fmt.Sprintf(schemaStatement, "desti", qRepMetadataTableName) //c.metadataSchema,

fmt.Printf("\n************* queryString to create table %s", queryString)

//_, err := createMetadataTableTx.Exec(queryString)
_, err := c.database.Exec(queryString)
//_, err := c.database.Exec("select * from tasks;")

if err != nil {
fmt.Printf("\n************************ failed to create table %s %v", err.Error(), err)
c.logger.Error(fmt.Sprintf("failed to create table %s.%s", c.metadataSchema, qRepMetadataTableName),
slog.Any("error", err))

return fmt.Errorf("failed to create table %s.%s: %w", c.metadataSchema, qRepMetadataTableName, err)
}
//c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName))
fmt.Printf("\n************* created table %s", qRepMetadataTableName)
return nil
}

func (c *ClickhouseConnector) createStage(stageName string, config *protos.QRepConfig) error {
fmt.Printf("\n***********************createStage1 %s, stageName: %s", stageName, config.StagingPath)
var createStageStmt string
if strings.HasPrefix(config.StagingPath, "s3://") {
fmt.Printf("\n***********************createStage 2")
stmt, err := c.createExternalStage(stageName, config)
if err != nil {
return err
}
createStageStmt = stmt
} else {
fmt.Printf("\n***********************createStage 3")
stageStatement := `
CREATE OR REPLACE STAGE %s
FILE_FORMAT = (TYPE = AVRO);
`
createStageStmt = fmt.Sprintf(stageStatement, stageName)
}
fmt.Printf("\n***********************createStage 4 %s", createStageStmt)
// Execute the query
_, err := c.database.Exec(createStageStmt)

fmt.Printf("\n***********************createStage 5 err: %v", err)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to create stage %s", stageName), slog.Any("error", err))
return fmt.Errorf("failed to create stage %s: %w", stageName, err)
Expand Down Expand Up @@ -416,7 +369,5 @@ func (c *ClickhouseConnector) dropStage(stagingPath string, job string) error {
}

func (c *ClickhouseConnector) getStageNameForJob(job string) string {
fmt.Println("\n***********************getStageNameForJob %s", job)
fmt.Println("\n*********************%s.peerdb_stage_%s", c.metadataSchema, job)
return fmt.Sprintf("%s.peerdb_stage_%s", c.metadataSchema, job)
}
Loading

0 comments on commit e2c1294

Please sign in to comment.