From ed3f0d2ad17acd908597637164366c10bb7dc5c9 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 4 Oct 2023 15:34:14 -0400 Subject: [PATCH] Support `DROP MIRROR` for Query Replication (#481) fixes: https://github.com/PeerDB-io/peerdb/issues/476 --- flow/cmd/handler.go | 65 ++++++++++++++++++++++---- flow/connectors/snowflake/snowflake.go | 25 +++++++--- flow/workflows/qrep_flow.go | 14 ++++-- 3 files changed, 83 insertions(+), 21 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index b3c74ab634..6a93ba3fde 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -110,7 +110,9 @@ func (h *FlowRequestHandler) CreateQRepFlow( } func (h *FlowRequestHandler) ShutdownFlow( - ctx context.Context, req *protos.ShutdownRequest) (*protos.ShutdownResponse, error) { + ctx context.Context, + req *protos.ShutdownRequest, +) (*protos.ShutdownResponse, error) { err := h.temporalClient.SignalWorkflow( ctx, req.WorkflowId, @@ -142,8 +144,24 @@ func (h *FlowRequestHandler) ShutdownFlow( return nil, fmt.Errorf("unable to start DropFlow workflow: %w", err) } - if err = dropFlowHandle.Get(ctx, nil); err != nil { - return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) + cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + errChan := make(chan error, 1) + go func() { + errChan <- dropFlowHandle.Get(cancelCtx, nil) + }() + + select { + case err := <-errChan: + if err != nil { + return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) + } + case <-time.After(1 * time.Minute): + err := h.handleWorkflowNotClosed(ctx, workflowID, "") + if err != nil { + return nil, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) + } } return &protos.ShutdownResponse{ @@ -153,9 +171,9 @@ func (h *FlowRequestHandler) ShutdownFlow( func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error { expBackoff := backoff.NewExponentialBackOff() - expBackoff.InitialInterval = 5 * time.Second - expBackoff.MaxInterval = 30 * time.Second - expBackoff.MaxElapsedTime = 5 * time.Minute + expBackoff.InitialInterval = 3 * time.Second + expBackoff.MaxInterval = 10 * time.Second + expBackoff.MaxElapsedTime = 1 * time.Minute // empty will terminate the latest run runID := "" @@ -176,10 +194,39 @@ func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowI err := backoff.Retry(operation, expBackoff) if err != nil { - // terminate workflow if it is still running - reason := "PeerFlow workflow did not close in time" - err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, reason) + return h.handleWorkflowNotClosed(ctx, workflowID, runID) + } + + return nil +} + +func (h *FlowRequestHandler) handleWorkflowNotClosed(ctx context.Context, workflowID, runID string) error { + errChan := make(chan error, 1) + + // Create a new context with timeout for CancelWorkflow + ctxWithTimeout, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + // Call CancelWorkflow in a goroutine + go func() { + err := h.temporalClient.CancelWorkflow(ctxWithTimeout, workflowID, runID) + errChan <- err + }() + + select { + case err := <-errChan: if err != nil { + log.Errorf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error()) + terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID) + if err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil { + return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err) + } + } + case <-time.After(1 * time.Minute): + // If 1 minute has passed and we haven't received an error, terminate the workflow + log.Errorf("Timeout reached while trying to cancel PeerFlow workflow. Attempting to terminate.") + terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID) + if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil { return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err) } } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4310764333..ad1b4ac557 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -74,6 +74,7 @@ const ( dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" isDeletedColumnName = "_PEERDB_IS_DELETED" + checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" syncRecordsChunkSize = 1024 ) @@ -936,16 +937,26 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { } }() - _, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema, - getRawTableIdentifier(jobName))) + row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, peerDBInternalSchema) + var schemaExists bool + err = row.Scan(&schemaExists) if err != nil { - return fmt.Errorf("unable to drop raw table: %w", err) + return fmt.Errorf("unable to check if internal schema exists: %w", err) } - _, err = syncFlowCleanupTx.ExecContext(c.ctx, - fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName) - if err != nil { - return fmt.Errorf("unable to delete job metadata: %w", err) + + if schemaExists { + _, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema, + getRawTableIdentifier(jobName))) + if err != nil { + return fmt.Errorf("unable to drop raw table: %w", err) + } + _, err = syncFlowCleanupTx.ExecContext(c.ctx, + fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName) + if err != nil { + return fmt.Errorf("unable to delete job metadata: %w", err) + } } + err = syncFlowCleanupTx.Commit() if err != nil { return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index bceb1fa7f7..0141e4f57f 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -6,6 +6,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" @@ -231,14 +232,17 @@ func QRepFlowWorkflow( // register a signal handler to terminate the workflow terminateWorkflow := false - signalChan := workflow.GetSignalChannel(ctx, "terminate") + signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) s := workflow.NewSelector(ctx) s.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) { - var signal string - c.Receive(ctx, &signal) - logger.Info("Received signal to terminate workflow", "Signal", signal) - terminateWorkflow = true + var signalVal shared.CDCFlowSignal + c.Receive(ctx, &signalVal) + logger.Info("received signal", "signal", signalVal) + if signalVal == shared.ShutdownSignal { + logger.Info("received shutdown signal") + terminateWorkflow = true + } }) // register a query to get the number of partitions processed