Skip to content

Commit

Permalink
use flow update config instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 14, 2024
1 parent 779a439 commit 81335c5
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 60 deletions.
13 changes: 9 additions & 4 deletions flow/cmd/custom_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,15 @@ func (h *FlowRequestHandler) CustomSyncFlow(

// Resume mirror with custom sync number
_, err = h.FlowStateChange(ctx, &protos.FlowStateChangeRequest{
FlowJobName: req.FlowJobName,
RequestedFlowState: protos.FlowStatus_STATUS_RUNNING,
FlowConfigUpdate: nil,
CustomNumberOfSyncs: req.NumberOfSyncs,
FlowJobName: req.FlowJobName,
RequestedFlowState: protos.FlowStatus_STATUS_RUNNING,
FlowConfigUpdate: &protos.FlowConfigUpdate{
Update: &protos.FlowConfigUpdate_CdcFlowConfigUpdate{
CdcFlowConfigUpdate: &protos.CDCFlowConfigUpdate{
NumberOfSyncs: req.NumberOfSyncs,
},
},
},
})
if err != nil {
return &protos.CreateCustomFlowResponse{
Expand Down
9 changes: 2 additions & 7 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,7 @@ func (h *FlowRequestHandler) FlowStateChange(
h.temporalClient,
workflowID,
"",
model.CDCFlowSignalProperties{
Signal: model.PauseSignal,
},
model.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
Expand All @@ -440,10 +438,7 @@ func (h *FlowRequestHandler) FlowStateChange(
h.temporalClient,
workflowID,
"",
model.CDCFlowSignalProperties{
Signal: model.NoopSignal,
CustomNumberOfSyncs: int(req.CustomNumberOfSyncs),
},
model.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState != protos.FlowStatus_STATUS_TERMINATED) {
Expand Down
8 changes: 2 additions & 6 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,9 +978,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {

if !s.t.Failed() {
addRows(1)
e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{
Signal: model.PauseSignal,
})
e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)
addRows(1)
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
// keep adding 1 more row - finishing another sync
Expand All @@ -1004,9 +1002,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
// add rows to both tables before resuming - should handle
addRows(18)

e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{
Signal: model.NoopSignal,
})
e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal)

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
Expand Down
8 changes: 2 additions & 6 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() {

tc := e2e.NewTemporalClient(s.t)
env := e2e.RunQRepFlowWorkflow(tc, config)
e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{
Signal: model.PauseSignal,
})
e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)

e2e.EnvWaitFor(s.t, env, 3*time.Minute, "pausing", func() bool {
response, err := env.Query(shared.QRepFlowStateQuery)
Expand All @@ -391,9 +389,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() {
}
return state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED
})
e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{
Signal: model.NoopSignal,
})
e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal)
e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool {
response, err := env.Query(shared.QRepFlowStateQuery)
if err != nil {
Expand Down
21 changes: 5 additions & 16 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,6 @@ func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f f

type CDCFlowSignal int64

type CDCFlowSignalProperties struct {
Signal CDCFlowSignal
CustomNumberOfSyncs int
}

const (
NoopSignal CDCFlowSignal = iota
_
Expand All @@ -114,31 +109,25 @@ const (

func FlowSignalHandler(activeSignal CDCFlowSignal,
v CDCFlowSignal, logger log.Logger,
) CDCFlowSignalProperties {
) CDCFlowSignal {
switch v {
case PauseSignal:
logger.Info("received pause signal")
if activeSignal == NoopSignal {
logger.Info("workflow was running, pausing it")
return CDCFlowSignalProperties{
Signal: v,
}
return v
}
case NoopSignal:
logger.Info("received resume signal")
if activeSignal == PauseSignal {
logger.Info("workflow was paused, resuming it")
return CDCFlowSignalProperties{
Signal: v,
}
return v
}
}
return CDCFlowSignalProperties{
Signal: activeSignal,
}
return activeSignal
}

var FlowSignal = TypedSignal[CDCFlowSignalProperties]{
var FlowSignal = TypedSignal[CDCFlowSignal]{
Name: "peer-flow-signal",
}

Expand Down
24 changes: 11 additions & 13 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow
BatchSize: cfg.MaxBatchSize,
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
TableMappings: tableMappings,
NumberOfSyncs: 0,
},
}
}
Expand Down Expand Up @@ -171,13 +172,16 @@ func addCdcPropertiesSignalListener(
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if cdcConfigUpdate.NumberOfSyncs > 0 {
state.SyncFlowOptions.NumberOfSyncs = cdcConfigUpdate.NumberOfSyncs
}
// do this irrespective of additional tables being present, for auto unpausing
state.FlowConfigUpdate = cdcConfigUpdate

logger.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables),
slog.Int("NumberOfSyncs", int(state.SyncFlowOptions.NumberOfSyncs)))
})
}

Expand Down Expand Up @@ -224,16 +228,11 @@ func CDCFlowWorkflow(
if state.ActiveSignal == model.PauseSignal {
selector := workflow.NewNamedSelector(ctx, "PauseLoop")
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignalProperties, _ bool) {
cdcFlowData := model.FlowSignalHandler(state.ActiveSignal, val.Signal, logger)
state.ActiveSignal = cdcFlowData.Signal
syncCountLimit = val.CustomNumberOfSyncs
if syncCountLimit <= 0 {
syncCountLimit = MaxSyncsPerCdcFlow
}
flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger)
})
addCdcPropertiesSignalListener(ctx, logger, selector, state)

syncCountLimit = int(state.SyncFlowOptions.NumberOfSyncs)
startTime := workflow.Now(ctx)
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED

Expand Down Expand Up @@ -453,9 +452,8 @@ func CDCFlowWorkflow(
finished = true
})

flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignalProperties, _ bool) {
cdcFlowData := model.FlowSignalHandler(state.ActiveSignal, val.Signal, logger)
state.ActiveSignal = cdcFlowData.Signal
flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger)
})

syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx)
Expand Down
10 changes: 5 additions & 5 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error {

func (q *QRepFlowExecution) waitForNewRows(
ctx workflow.Context,
signalChan model.TypedReceiveChannel[model.CDCFlowSignalProperties],
signalChan model.TypedReceiveChannel[model.CDCFlowSignal],
lastPartition *protos.QRepPartition,
) error {
ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
Expand All @@ -336,8 +336,8 @@ func (q *QRepFlowExecution) waitForNewRows(
var newRows bool
var waitErr error
waitSelector := workflow.NewNamedSelector(ctx, "WaitForRows")
signalChan.AddToSelector(waitSelector, func(val model.CDCFlowSignalProperties, _ bool) {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal
signalChan.AddToSelector(waitSelector, func(val model.CDCFlowSignal, _ bool) {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
})
waitSelector.AddFuture(future, func(f workflow.Future) {
newRows = true
Expand Down Expand Up @@ -541,7 +541,7 @@ func QRepFlowWorkflow(
// only place we block on receive, so signal processing is immediate
val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute)
if ok {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
} else if err := ctx.Err(); err != nil {
return err
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func QRepFlowWorkflow(
if !ok {
break
}
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
}

logger.Info("Continuing as new workflow",
Expand Down
4 changes: 2 additions & 2 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func XminFlowWorkflow(
// only place we block on receive, so signal processing is immediate
val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute)
if ok {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, logger).Signal
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, logger)
} else if err := ctx.Err(); err != nil {
return err
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func XminFlowWorkflow(
if !ok {
break
}
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
}

logger.Info("Continuing as new workflow",
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ message SyncFlowOptions {
map<uint32, string> src_table_id_name_mapping = 4;
map<string, TableSchema> table_name_schema_mapping = 5;
repeated TableMapping table_mappings = 6;
int32 number_of_syncs = 7;
}

message StartNormalizeInput {
Expand Down Expand Up @@ -371,6 +372,7 @@ message CDCFlowConfigUpdate {
repeated TableMapping additional_tables = 1;
uint32 batch_size = 2;
uint64 idle_timeout = 3;
int32 number_of_syncs = 4;
}

message QRepFlowConfigUpdate {
Expand Down
1 change: 0 additions & 1 deletion protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ message FlowStateChangeRequest {
peerdb_peers.Peer destination_peer = 4;
// only can be sent in certain situations
optional peerdb_flow.FlowConfigUpdate flow_config_update = 5;
int32 customNumberOfSyncs = 6;
}

message FlowStateChangeResponse {
Expand Down

0 comments on commit 81335c5

Please sign in to comment.