Skip to content

Commit

Permalink
Support DROP MIRROR for Query Replication (#481)
Browse files Browse the repository at this point in the history
fixes: #476
  • Loading branch information
iskakaushik authored Oct 4, 2023
1 parent ab64814 commit ed3f0d2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 21 deletions.
65 changes: 56 additions & 9 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

func (h *FlowRequestHandler) ShutdownFlow(
ctx context.Context, req *protos.ShutdownRequest) (*protos.ShutdownResponse, error) {
ctx context.Context,
req *protos.ShutdownRequest,
) (*protos.ShutdownResponse, error) {
err := h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
Expand Down Expand Up @@ -142,8 +144,24 @@ func (h *FlowRequestHandler) ShutdownFlow(
return nil, fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

if err = dropFlowHandle.Get(ctx, nil); err != nil {
return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

errChan := make(chan error, 1)
go func() {
errChan <- dropFlowHandle.Get(cancelCtx, nil)
}()

select {
case err := <-errChan:
if err != nil {
return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(1 * time.Minute):
err := h.handleWorkflowNotClosed(ctx, workflowID, "")
if err != nil {
return nil, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
}

return &protos.ShutdownResponse{
Expand All @@ -153,9 +171,9 @@ func (h *FlowRequestHandler) ShutdownFlow(

func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 5 * time.Second
expBackoff.MaxInterval = 30 * time.Second
expBackoff.MaxElapsedTime = 5 * time.Minute
expBackoff.InitialInterval = 3 * time.Second
expBackoff.MaxInterval = 10 * time.Second
expBackoff.MaxElapsedTime = 1 * time.Minute

// empty will terminate the latest run
runID := ""
Expand All @@ -176,10 +194,39 @@ func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowI

err := backoff.Retry(operation, expBackoff)
if err != nil {
// terminate workflow if it is still running
reason := "PeerFlow workflow did not close in time"
err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, reason)
return h.handleWorkflowNotClosed(ctx, workflowID, runID)
}

return nil
}

func (h *FlowRequestHandler) handleWorkflowNotClosed(ctx context.Context, workflowID, runID string) error {
errChan := make(chan error, 1)

// Create a new context with timeout for CancelWorkflow
ctxWithTimeout, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

// Call CancelWorkflow in a goroutine
go func() {
err := h.temporalClient.CancelWorkflow(ctxWithTimeout, workflowID, runID)
errChan <- err
}()

select {
case err := <-errChan:
if err != nil {
log.Errorf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error())
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
if err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
}
}
case <-time.After(1 * time.Minute):
// If 1 minute has passed and we haven't received an error, terminate the workflow
log.Errorf("Timeout reached while trying to cancel PeerFlow workflow. Attempting to terminate.")
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
}
}
Expand Down
25 changes: 18 additions & 7 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
isDeletedColumnName = "_PEERDB_IS_DELETED"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"

syncRecordsChunkSize = 1024
)
Expand Down Expand Up @@ -936,16 +937,26 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error {
}
}()

_, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema,
getRawTableIdentifier(jobName)))
row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, peerDBInternalSchema)
var schemaExists bool
err = row.Scan(&schemaExists)
if err != nil {
return fmt.Errorf("unable to drop raw table: %w", err)
return fmt.Errorf("unable to check if internal schema exists: %w", err)
}
_, err = syncFlowCleanupTx.ExecContext(c.ctx,
fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return fmt.Errorf("unable to delete job metadata: %w", err)

if schemaExists {
_, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema,
getRawTableIdentifier(jobName)))
if err != nil {
return fmt.Errorf("unable to drop raw table: %w", err)
}
_, err = syncFlowCleanupTx.ExecContext(c.ctx,
fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return fmt.Errorf("unable to delete job metadata: %w", err)
}
}

err = syncFlowCleanupTx.Commit()
if err != nil {
return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err)
Expand Down
14 changes: 9 additions & 5 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -231,14 +232,17 @@ func QRepFlowWorkflow(

// register a signal handler to terminate the workflow
terminateWorkflow := false
signalChan := workflow.GetSignalChannel(ctx, "terminate")
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)

s := workflow.NewSelector(ctx)
s.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) {
var signal string
c.Receive(ctx, &signal)
logger.Info("Received signal to terminate workflow", "Signal", signal)
terminateWorkflow = true
var signalVal shared.CDCFlowSignal
c.Receive(ctx, &signalVal)
logger.Info("received signal", "signal", signalVal)
if signalVal == shared.ShutdownSignal {
logger.Info("received shutdown signal")
terminateWorkflow = true
}
})

// register a query to get the number of partitions processed
Expand Down

0 comments on commit ed3f0d2

Please sign in to comment.