Skip to content

Commit

Permalink
include consolidate partition step
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 4, 2023
1 parent 55bf5df commit 68d7647
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 18 deletions.
3 changes: 3 additions & 0 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
qe.recordHeartbeat("#%d fetched %d rows", numFetchOpsComplete, numRows)
}

log.WithFields(log.Fields{
"flowName": qe.flowJobName,
}).Info("Committing transaction")
err = tx.Commit(qe.ctx)
if err != nil {
stream.Records <- &model.QRecordOrError{
Expand Down
59 changes: 41 additions & 18 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file corresponds to query based replication.
// This file corresponds to xmin based replication.
package peerflow

import (
Expand All @@ -11,9 +11,7 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/google/uuid"
// "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
// "go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)

Expand Down Expand Up @@ -96,20 +94,6 @@ func (q *XminFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
return nil
}

// getPartitionWorkflowID returns the child workflow ID for a new sync flow.
func (q *XminFlowExecution) getPartitionWorkflowID(ctx workflow.Context) (string, error) {
childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, uuid.New().String())
})

var childWorkflowID string
if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil {
return "", fmt.Errorf("failed to get child workflow ID: %w", err)
}

return childWorkflowID, nil
}

func (q *XminFlowExecution) handleTableCreationForResync(ctx workflow.Context, state *protos.QRepFlowState) error {
if state.NeedsResync && q.config.DstTableFullResync {
renamedTableIdentifier := fmt.Sprintf("%s_peerdb_resync", q.config.DestinationTableIdentifier)
Expand Down Expand Up @@ -170,6 +154,34 @@ func (q *XminFlowExecution) receiveAndHandleSignalAsync(ctx workflow.Context) {
}
}

// For some targets we need to consolidate all the partitions from stages before
// we proceed to next batch.
func (q *XminFlowExecution) consolidatePartitions(ctx workflow.Context) error {
q.logger.Info("consolidating partitions")

// only an operation for Snowflake currently.
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour,
HeartbeatTimeout: 10 * time.Minute,
})

if err := workflow.ExecuteActivity(ctx, flowable.ConsolidateQRepPartitions, q.config,
q.runUUID).Get(ctx, nil); err != nil {
return fmt.Errorf("failed to consolidate partitions: %w", err)
}

q.logger.Info("partitions consolidated")

// clean up qrep flow as well
if err := workflow.ExecuteActivity(ctx, flowable.CleanupQRepFlow, q.config).Get(ctx, nil); err != nil {
return fmt.Errorf("failed to cleanup qrep flow: %w", err)
}

q.logger.Info("qrep flow cleaned up")

return nil
}

func XminFlowWorkflow(
ctx workflow.Context,
config *protos.QRepConfig,
Expand Down Expand Up @@ -216,10 +228,17 @@ func XminFlowWorkflow(
StartToCloseTimeout: 24 * 5 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})
err = workflow.ExecuteActivity(replicateXminPartitionCtx, flowable.ReplicateXminPartition, q.config, state.LastPartition, q.runUUID).Get(ctx, &lastPartition)
err = workflow.ExecuteActivity(
replicateXminPartitionCtx,
flowable.ReplicateXminPartition,
q.config,
state.LastPartition,
q.runUUID,
).Get(ctx, &lastPartition)
if err != nil {
return fmt.Errorf("xmin replication failed: %w", err)
}

state.LastPartition = &protos.QRepPartition{PartitionId: strconv.FormatInt(lastPartition&0xffffffff, 10)}

if config.InitialCopyOnly {
Expand All @@ -232,6 +251,10 @@ func XminFlowWorkflow(
return err
}

if err = q.consolidatePartitions(ctx); err != nil {
return err
}

workflow.GetLogger(ctx).Info("Continuing as new workflow",
"Last Partition", state.LastPartition,
"Number of Partitions Processed", state.NumPartitionsProcessed)
Expand Down

0 comments on commit 68d7647

Please sign in to comment.