Skip to content

Commit

Permalink
DropFlow support for stilgar
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Mar 18, 2024
1 parent 359246b commit b3e0d72
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 46 deletions.
92 changes: 47 additions & 45 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,56 +337,58 @@ func (h *FlowRequestHandler) ShutdownFlow(
}, fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err)
}

workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: req.FlowJobName,
},
}
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, req)
if err != nil {
slog.Error("unable to start DropFlow workflow",
logs,
slog.Any("error", err))
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to start DropFlow workflow: %v", err),
}, fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

errChan := make(chan error, 1)
go func() {
errChan <- dropFlowHandle.Get(cancelCtx, nil)
}()

select {
case err := <-errChan:
if err != nil {
slog.Error("DropFlow workflow did not execute successfully",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("DropFlow workflow did not execute successfully: %v", err),
}, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
if req.SourcePeer.Type == protos.DBType_POSTGRES {
workflowID := fmt.Sprintf("%s-dropflow-%s", req.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
SearchAttributes: map[string]interface{}{
shared.MirrorNameSearchAttribute: req.FlowJobName,
},
}
case <-time.After(5 * time.Minute):
err := h.handleCancelWorkflow(ctx, workflowID, "")
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, req)
if err != nil {
slog.Error("unable to wait for DropFlow workflow to close",
slog.Error("unable to start DropFlow workflow",
logs,
slog.Any("error", err),
)
slog.Any("error", err))
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for DropFlow workflow to close: %v", err),
}, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
ErrorMessage: fmt.Sprintf("unable to start DropFlow workflow: %v", err),
}, fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

errChan := make(chan error, 1)
go func() {
errChan <- dropFlowHandle.Get(cancelCtx, nil)
}()

select {
case err := <-errChan:
if err != nil {
slog.Error("DropFlow workflow did not execute successfully",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("DropFlow workflow did not execute successfully: %v", err),
}, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(5 * time.Minute):
err := h.handleCancelWorkflow(ctx, workflowID, "")
if err != nil {
slog.Error("unable to wait for DropFlow workflow to close",
logs,
slog.Any("error", err),
)
return &protos.ShutdownResponse{
Ok: false,
ErrorMessage: fmt.Sprintf("unable to wait for DropFlow workflow to close: %v", err),
}, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,10 @@ func QRepFlowWorkflow(
if config.SourcePeer.Type == protos.DBType_SNOWFLAKE && config.WaitBetweenBatchesSeconds > 0 {
logger.Info(fmt.Sprintf("copy completed, restarting workflow in %d seconds",
config.WaitBetweenBatchesSeconds))
workflow.Sleep(ctx, time.Duration(config.WaitBetweenBatchesSeconds)*time.Second)
err := workflow.Sleep(ctx, time.Duration(config.WaitBetweenBatchesSeconds)*time.Second)
if err != nil {
return nil
}
return workflow.NewContinueAsNewError(ctx, QRepFlowWorkflow, config, state)
}
logger.Info("initial copy completed for peer flow")
Expand Down

0 comments on commit b3e0d72

Please sign in to comment.