From 2206c02a0fcfd8237e174c4f389c875283c682ed Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 4 Oct 2023 15:12:30 -0400 Subject: [PATCH] more fixes --- flow/cmd/handler.go | 49 +++++++++++++++++++++++--- flow/connectors/snowflake/snowflake.go | 25 +++++++++---- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index ab8bb9be49..6a93ba3fde 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -144,8 +144,24 @@ func (h *FlowRequestHandler) ShutdownFlow( return nil, fmt.Errorf("unable to start DropFlow workflow: %w", err) } - if err = dropFlowHandle.Get(ctx, nil); err != nil { - return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) + cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + errChan := make(chan error, 1) + go func() { + errChan <- dropFlowHandle.Get(cancelCtx, nil) + }() + + select { + case err := <-errChan: + if err != nil { + return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) + } + case <-time.After(1 * time.Minute): + err := h.handleWorkflowNotClosed(ctx, workflowID, "") + if err != nil { + return nil, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) + } } return &protos.ShutdownResponse{ @@ -185,13 +201,36 @@ func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowI } func (h *FlowRequestHandler) handleWorkflowNotClosed(ctx context.Context, workflowID, runID string) error { - if err := h.temporalClient.CancelWorkflow(ctx, workflowID, runID); err != nil { - log.Errorf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error()) + errChan := make(chan error, 1) + + // Create a new context with timeout for CancelWorkflow + ctxWithTimeout, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + // Call CancelWorkflow in a goroutine + go func() { + err := h.temporalClient.CancelWorkflow(ctxWithTimeout, workflowID, runID) + errChan <- err + }() + + select { + case err := <-errChan: + if err != nil { + log.Errorf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error()) + terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID) + if err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil { + return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err) + } + } + case <-time.After(1 * time.Minute): + // If 1 minute has passed and we haven't received an error, terminate the workflow + log.Errorf("Timeout reached while trying to cancel PeerFlow workflow. Attempting to terminate.") terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID) - if err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil { + if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil { return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err) } } + return nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 4310764333..ad1b4ac557 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -74,6 +74,7 @@ const ( dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" isDeletedColumnName = "_PEERDB_IS_DELETED" + checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" syncRecordsChunkSize = 1024 ) @@ -936,16 +937,26 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { } }() - _, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema, - getRawTableIdentifier(jobName))) + row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, peerDBInternalSchema) + var schemaExists bool + err = row.Scan(&schemaExists) if err != nil { - return fmt.Errorf("unable to drop raw table: %w", err) + return fmt.Errorf("unable to check if internal schema exists: %w", err) } - _, err = syncFlowCleanupTx.ExecContext(c.ctx, - fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName) - if err != nil { - return fmt.Errorf("unable to delete job metadata: %w", err) + + if schemaExists { + _, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema, + getRawTableIdentifier(jobName))) + if err != nil { + return fmt.Errorf("unable to drop raw table: %w", err) + } + _, err = syncFlowCleanupTx.ExecContext(c.ctx, + fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName) + if err != nil { + return fmt.Errorf("unable to delete job metadata: %w", err) + } } + err = syncFlowCleanupTx.Commit() if err != nil { return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err)