Skip to content

Commit

Permalink
committing before context switch
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 7, 2023
1 parent 051584c commit b002eb7
Show file tree
Hide file tree
Showing 14 changed files with 902 additions and 582 deletions.
33 changes: 22 additions & 11 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,22 +350,33 @@ func (h *FlowRequestHandler) ShutdownFlow(
}, nil
}

func (h *FlowRequestHandler) PauseFlow(
func (h *FlowRequestHandler) FlowStateChange(
ctx context.Context,
req *protos.PauseRequest,
) (*protos.PauseResponse, error) {
err := h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.PauseSignal,
)
req *protos.FlowStateChangeRequest,
) (*protos.FlowStateChangeResponse, error) {
var err error
if req.RequestedFlowState == protos.FlowState_STATE_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowState_STATE_RUNNING {
err = h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
"",
shared.CDCFlowSignalName,
shared.NoopSignal,
)
}
if err != nil {
return nil, fmt.Errorf("unable to signal PeerFlow workflow: %w", err)
}

return &protos.PauseResponse{
return &protos.FlowStateChangeResponse{
Ok: true,
}, nil
}
Expand Down
516 changes: 294 additions & 222 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

30 changes: 15 additions & 15 deletions flow/generated/protos/route_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
NoopSignal CDCFlowSignal = iota
ShutdownSignal
PauseSignal

EnableMetricsKey ContextKey = "enableMetrics"
CDCMirrorMonitorKey ContextKey = "cdcMirrorMonitor"
)
Expand Down
48 changes: 15 additions & 33 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -147,30 +148,13 @@ func GetChildWorkflowID(
// CDCFlowWorkflowResult is the result of the PeerFlowWorkflow.
type CDCFlowWorkflowResult = CDCFlowWorkflowState

func (w *CDCFlowWorkflowExecution) signalHandler(state *CDCFlowWorkflowState, v shared.CDCFlowSignal) {
w.logger.Info("received signal - ", v)
if v == shared.ShutdownSignal {
w.logger.Info("received shutdown signal")
state.ActiveSignal = v
} else if v == shared.PauseSignal {
w.logger.Info("received pause signal")
if state.ActiveSignal == shared.NoopSignal {
w.logger.Info("workflow was running, pausing it")
state.ActiveSignal = shared.PauseSignal
} else if state.ActiveSignal == shared.PauseSignal {
w.logger.Info("workflow was paused, resuming it")
state.ActiveSignal = shared.NoopSignal
}
}
}

func (w *CDCFlowWorkflowExecution) receiveAndHandleSignal(ctx workflow.Context, state *CDCFlowWorkflowState) {
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)

var signalVal shared.CDCFlowSignal
ok := signalChan.ReceiveAsync(&signalVal)
if ok {
w.signalHandler(state, signalVal)
state.ActiveSignal = util.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
}
}

Expand Down Expand Up @@ -294,27 +278,25 @@ func CDCFlowWorkflowWithConfig(
// check and act on signals before a fresh flow starts.
w.receiveAndHandleSignal(ctx, state)

// check if the peer flow has been shutdown
if state.ActiveSignal == shared.ShutdownSignal {
w.logger.Info("peer flow has been shutdown")
return state, nil
}

if state.ActiveSignal == shared.PauseSignal {
startTime := time.Now()
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)
var signalVal shared.CDCFlowSignal

for state.ActiveSignal == shared.PauseSignal {
err = workflow.Sleep(ctx, 1*time.Minute)
if err != nil {
return state, err
}
w.logger.Info("mirror has been paused for ", time.Since(startTime))
w.receiveAndHandleSignal(ctx, state)
}
if state.ActiveSignal == shared.ShutdownSignal {
// handling going from paused to shutdown
continue
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
state.ActiveSignal = util.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
}
}
}
// check if the peer flow has been shutdown
if state.ActiveSignal == shared.ShutdownSignal {
w.logger.Info("peer flow has been shutdown")
return state, nil
}

// check if total sync flows have been completed
// since this happens immediately after we check for signals, the case of a signal being missed
Expand Down
48 changes: 15 additions & 33 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -342,30 +343,13 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta
return nil
}

func (q *QRepFlowExecution) signalHandler(v shared.CDCFlowSignal) {
q.logger.Info("received signal - ", v)
if v == shared.ShutdownSignal {
q.logger.Info("received shutdown signal")
q.activeSignal = v
} else if v == shared.PauseSignal {
q.logger.Info("received pause signal")
if q.activeSignal == shared.NoopSignal {
q.logger.Info("workflow was running, pausing it")
q.activeSignal = shared.PauseSignal
} else if q.activeSignal == shared.PauseSignal {
q.logger.Info("workflow was paused, resuming it")
q.activeSignal = shared.NoopSignal
}
}
}

func (q *QRepFlowExecution) receiveAndHandleSignal(ctx workflow.Context) {
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)

var signalVal shared.CDCFlowSignal
ok := signalChan.ReceiveAsync(&signalVal)
if ok {
q.signalHandler(signalVal)
q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger)
}
}

Expand Down Expand Up @@ -453,7 +437,7 @@ func QRepFlowWorkflow(
state.NumPartitionsProcessed += uint64(len(partitions.Partitions))

if len(partitions.Partitions) > 0 {
lastPartition = partitions.Partitions[len(partitions.Partitions)-1]
state.LastPartition = partitions.Partitions[len(partitions.Partitions)-1]
}

// sleep for a while and continue the workflow
Expand All @@ -469,26 +453,24 @@ func QRepFlowWorkflow(
// here, we handle signals after the end of the flow because a new workflow does not inherit the signals
// and the chance of missing a signal is much higher if the check is before the time consuming parts run
q.receiveAndHandleSignal(ctx)
if q.activeSignal == shared.ShutdownSignal {
q.logger.Info("terminating workflow - ", config.FlowJobName)
return nil
}
if q.activeSignal == shared.PauseSignal {
startTime := time.Now()
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)
var signalVal shared.CDCFlowSignal

for q.activeSignal == shared.PauseSignal {
err = workflow.Sleep(ctx, 1*time.Minute)
if err != nil {
return err
}
q.logger.Info("mirror has been paused for ", time.Since(startTime))
q.receiveAndHandleSignal(ctx)
}
if q.activeSignal == shared.ShutdownSignal {
// handling going from paused to shutdown
q.logger.Info("terminating workflow - ", config.FlowJobName)
return nil
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger)
}
}
}
if q.activeSignal == shared.ShutdownSignal {
q.logger.Info("terminating workflow - ", config.FlowJobName)
return nil
}

// here, we handle signals after the end of the flow because a new workflow does not inherit the signals
// and the chance of missing a signal is much higher if the check is before the time consuming parts run
Expand Down
4 changes: 4 additions & 0 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ pub enum PeerDDL {
PauseMirror {
if_exists: bool,
flow_job_name: String,
},
ResumeMirror {
if_exists: bool,
flow_job_name: String,
}
}

Expand Down
15 changes: 10 additions & 5 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl FlowGrpcClient {
workflow_id: workflow_details.workflow_id,
source_peer: Some(workflow_details.source_peer),
destination_peer: Some(workflow_details.destination_peer),
remove_flow_entry:false
remove_flow_entry: false,
};
let response = self.client.shutdown_flow(shutdown_flow_req).await?;
let shutdown_response = response.into_inner();
Expand Down Expand Up @@ -159,16 +159,21 @@ impl FlowGrpcClient {
}
}

pub async fn pause_flow_job(
pub async fn flow_state_change(
&mut self,
flow_job_name: &str,
workflow_id: &str,
pause: bool,
) -> anyhow::Result<()> {
let pause_flow_req = pt::peerdb_route::PauseRequest{
let pause_flow_req = pt::peerdb_route::FlowStateChangeRequest {
flow_job_name: flow_job_name.to_owned(),
workflow_id: workflow_id.to_owned()
workflow_id: workflow_id.to_owned(),
requested_flow_state: match pause {
true => pt::peerdb_route::FlowState::StatePaused.into(),
false => pt::peerdb_route::FlowState::StateRunning.into(),
},
};
let response = self.client.pause_flow(pause_flow_req).await?;
let response = self.client.flow_state_change(pause_flow_req).await?;
let pause_response = response.into_inner();
if pause_response.ok {
Ok(())
Expand Down
Loading

0 comments on commit b002eb7

Please sign in to comment.