diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index fac249b2cb..1e7f7a16c1 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -415,11 +416,11 @@ func (h *FlowRequestHandler) FlowStateChange( } if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil { - err = h.temporalClient.SignalWorkflow( + err = model.CDCDynamicPropertiesSignal.SignalClientWorkflow( ctx, + h.temporalClient, workflowID, "", - shared.CDCDynamicPropertiesSignalName, req.FlowConfigUpdate.GetCdcFlowConfigUpdate(), ) if err != nil { @@ -435,21 +436,21 @@ func (h *FlowRequestHandler) FlowStateChange( if err != nil { return nil, err } - err = h.temporalClient.SignalWorkflow( + err = model.FlowSignal.SignalClientWorkflow( ctx, + h.temporalClient, workflowID, "", - shared.FlowSignalName, - shared.PauseSignal, + model.PauseSignal, ) } else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING && currState == protos.FlowStatus_STATUS_PAUSED { - err = h.temporalClient.SignalWorkflow( + err = model.FlowSignal.SignalClientWorkflow( ctx, + h.temporalClient, workflowID, "", - shared.FlowSignalName, - shared.NoopSignal, + model.NoopSignal, ) } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && (currState != protos.FlowStatus_STATUS_TERMINATED) { diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 7005e32bf9..b3c097136d 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -18,6 +18,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -1192,7 +1193,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { // too short of a gap between signals also causes issues // might have something to do with how test workflows handle fast-forwarding time. env.RegisterDelayedCallback(func() { - env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal) + e2e.EnvSignalWorkflow(env, model.FlowSignal, model.PauseSignal) s.t.Log("Sent pause signal") sentPause = true }, 28*time.Second) @@ -1204,7 +1205,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { return false } isPaused = true - env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{ + e2e.EnvSignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{ IdleTimeout: 14, BatchSize: 12, AdditionalTables: []*protos.TableMapping{ @@ -1224,7 +1225,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { if !sentUpdate { return false } - env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal) + e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal) s.t.Log("Sent resume signal") return true }) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 95eba6ddbf..13e90c22a6 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -78,6 +78,10 @@ func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnv }) } +func EnvSignalWorkflow[T any](env *testsuite.TestWorkflowEnvironment, signal model.TypedSignal[T], value T) { + env.SignalWorkflow(signal.Name, value) +} + // Helper function to assert errors in go routines running concurrent to workflows // This achieves two goals: // 1. cancel workflow to avoid waiting on goroutine which has failed diff --git a/flow/model/model.go b/flow/model/model.go index 7e0af0fa59..b180974d5c 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -191,7 +191,7 @@ type SyncResponse struct { RelationMessageMapping RelationMessageMapping } -type NormalizeSignal struct { +type NormalizePayload struct { Done bool SyncBatchID int64 TableNameSchemaMapping map[string]*protos.TableSchema diff --git a/flow/model/signals.go b/flow/model/signals.go new file mode 100644 index 0000000000..749caf7aa4 --- /dev/null +++ b/flow/model/signals.go @@ -0,0 +1,139 @@ +package model + +import ( + "context" + "time" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/generated/protos" +) + +// typed wrapper around temporal signals + +type TypedSignal[T any] struct { + Name string +} + +func (self TypedSignal[T]) GetSignalChannel(ctx workflow.Context) TypedReceiveChannel[T] { + return TypedReceiveChannel[T]{ + Chan: workflow.GetSignalChannel(ctx, self.Name), + } +} + +func (self TypedSignal[T]) SignalClientWorkflow( + ctx context.Context, + c client.Client, + workflowID string, + runID string, + value T, +) error { + return c.SignalWorkflow( + ctx, + workflowID, + runID, + self.Name, + value, + ) +} + +func (self TypedSignal[T]) SignalChildWorkflow( + ctx workflow.Context, + wf workflow.ChildWorkflowFuture, + value T, +) workflow.Future { + return wf.SignalChildWorkflow(ctx, self.Name, value) +} + +func (self TypedSignal[T]) SignalExternalWorkflow( + ctx workflow.Context, + workflowID string, + runID string, + value T, +) workflow.Future { + return workflow.SignalExternalWorkflow(ctx, workflowID, runID, self.Name, value) +} + +type TypedReceiveChannel[T any] struct { + Chan workflow.ReceiveChannel +} + +func (self TypedReceiveChannel[T]) Receive(ctx workflow.Context) (T, bool) { + var result T + more := self.Chan.Receive(ctx, &result) + return result, more +} + +func (self TypedReceiveChannel[T]) ReceiveWithTimeout(ctx workflow.Context, timeout time.Duration) (T, bool, bool) { + var result T + ok, more := self.Chan.ReceiveWithTimeout(ctx, timeout, &result) + return result, ok, more +} + +func (self TypedReceiveChannel[T]) ReceiveAsync() (T, bool) { + var result T + ok := self.Chan.ReceiveAsync(&result) + return result, ok +} + +func (self TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag() (T, bool, bool) { + var result T + ok, more := self.Chan.ReceiveAsyncWithMoreFlag(&result) + return result, ok, more +} + +func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f func(T, bool)) workflow.Selector { + return selector.AddReceive(self.Chan, func(c workflow.ReceiveChannel, more bool) { + var result T + if !c.ReceiveAsync(&result) { + panic("AddReceive selector should not give empty channel") + } + f(result, more) + }) +} + +type CDCFlowSignal int64 + +const ( + NoopSignal CDCFlowSignal = iota + _ + PauseSignal +) + +var FlowSignal = TypedSignal[CDCFlowSignal]{ + Name: "peer-flow-signal", +} + +var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{ + Name: "cdc-dynamic-properties", +} + +var NormalizeSyncSignal = TypedSignal[NormalizePayload]{ + Name: "normalize-sync", +} + +var NormalizeSyncDoneSignal = TypedSignal[struct{}]{ + Name: "normalize-sync-done", +} + +func FlowSignalHandler(activeSignal CDCFlowSignal, + v CDCFlowSignal, logger log.Logger, +) CDCFlowSignal { + switch v { + case PauseSignal: + logger.Info("received pause signal") + if activeSignal == NoopSignal { + logger.Info("workflow was running, pausing it") + return v + } + case NoopSignal: + logger.Info("received resume signal") + if activeSignal == PauseSignal { + logger.Info("workflow was paused, resuming it") + return v + } + } + return activeSignal +} diff --git a/flow/shared/constants.go b/flow/shared/constants.go index df4ed83057..fe8320b446 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -11,12 +11,6 @@ const ( peerFlowTaskQueue = "peer-flow-task-queue" snapshotFlowTaskQueue = "snapshot-flow-task-queue" - // Signals - FlowSignalName = "peer-flow-signal" - CDCDynamicPropertiesSignalName = "cdc-dynamic-properties" - NormalizeSyncSignalName = "normalize-sync" - NormalizeSyncDoneSignalName = "normalize-sync-done" - // Queries CDCFlowStateQuery = "q-cdc-flow-state" QRepFlowStateQuery = "q-qrep-flow-state" @@ -29,15 +23,10 @@ const ( const MirrorNameSearchAttribute = "MirrorName" type ( - CDCFlowSignal int64 - ContextKey string + ContextKey string ) const ( - NoopSignal CDCFlowSignal = iota - _ - PauseSignal - FlowNameKey ContextKey = "flowName" PartitionIDKey ContextKey = "partitionId" DeploymentUIDKey ContextKey = "deploymentUid" diff --git a/flow/shared/signals.go b/flow/shared/signals.go deleted file mode 100644 index 5bf06a7bdd..0000000000 --- a/flow/shared/signals.go +++ /dev/null @@ -1,25 +0,0 @@ -package shared - -import ( - "go.temporal.io/sdk/log" -) - -func FlowSignalHandler(activeSignal CDCFlowSignal, - v CDCFlowSignal, logger log.Logger, -) CDCFlowSignal { - switch v { - case PauseSignal: - logger.Info("received pause signal") - if activeSignal == NoopSignal { - logger.Info("workflow was running, pausing it") - return v - } - case NoopSignal: - logger.Info("received resume signal") - if activeSignal == PauseSignal { - logger.Info("workflow was paused, resuming it") - return v - } - } - return activeSignal -} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 8158e40f35..6ab6c45c06 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -33,7 +33,7 @@ type CDCFlowWorkflowState struct { // Accumulates status for normalize flows spawned. NormalizeFlowStatuses []model.NormalizeResponse // Current signalled state of the peer flow. - ActiveSignal shared.CDCFlowSignal + ActiveSignal model.CDCFlowSignal // Errors encountered during child sync flow executions. SyncFlowErrors []string // Errors encountered during child sync flow executions. @@ -59,7 +59,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow // 1 more than the limit of 10 SyncFlowStatuses: make([]*model.SyncResponse, 0, 11), NormalizeFlowStatuses: nil, - ActiveSignal: shared.NoopSignal, + ActiveSignal: model.NoopSignal, SyncFlowErrors: nil, NormalizeFlowErrors: nil, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, @@ -423,13 +423,13 @@ func CDCFlowWorkflowWithConfig( cfg, ) - var normWaitChan workflow.ReceiveChannel + var normWaitChan model.TypedReceiveChannel[struct{}] if !peerdbenv.PeerDBEnableParallelSyncNormalize() { - normWaitChan = workflow.GetSignalChannel(ctx, shared.NormalizeSyncDoneSignalName) + normWaitChan = model.NormalizeSyncDoneSignal.GetSignalChannel(ctx) } finishNormalize := func() { - childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{ + model.NormalizeSyncSignal.SignalChildWorkflow(ctx, childNormalizeFlowFuture, model.NormalizePayload{ Done: true, SyncBatchID: -1, }) @@ -448,21 +448,17 @@ func CDCFlowWorkflowWithConfig( } var canceled bool - flowSignalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) + flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) mainLoopSelector := workflow.NewNamedSelector(ctx, "Main Loop") mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { canceled = true }) - mainLoopSelector.AddReceive(flowSignalChan, func(c workflow.ReceiveChannel, _ bool) { - var signalVal shared.CDCFlowSignal - c.ReceiveAsync(&signalVal) - state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) + flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) { + state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger) }) // add a signal to change CDC properties - cdcPropertiesSignalChan := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName) - mainLoopSelector.AddReceive(cdcPropertiesSignalChan, func(c workflow.ReceiveChannel, more bool) { - var cdcConfigUpdate *protos.CDCFlowConfigUpdate - c.Receive(ctx, &cdcConfigUpdate) + cdcPropertiesSignalChan := model.CDCDynamicPropertiesSignal.GetSignalChannel(ctx) + cdcPropertiesSignalChan.AddToSelector(mainLoopSelector, func(cdcConfigUpdate *protos.CDCFlowConfigUpdate, more bool) { // only modify for options since SyncFlow uses it if cdcConfigUpdate.BatchSize > 0 { state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize @@ -488,15 +484,15 @@ func CDCFlowWorkflowWithConfig( break } - if state.ActiveSignal == shared.PauseSignal { + if state.ActiveSignal == model.PauseSignal { startTime := time.Now() state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED - for state.ActiveSignal == shared.PauseSignal { + for state.ActiveSignal == model.PauseSignal { w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate mainLoopSelector.Select(ctx) - if state.ActiveSignal == shared.NoopSignal { + if state.ActiveSignal == model.NoopSignal { err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) if err != nil { return state, err @@ -532,7 +528,7 @@ func CDCFlowWorkflowWithConfig( var syncDone, syncErr bool var normalizeSignalError error - normDone := normWaitChan == nil + normDone := normWaitChan.Chan == nil mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) { syncDone = true @@ -583,7 +579,7 @@ func CDCFlowWorkflowWithConfig( } } - signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{ + signalFuture := model.NormalizeSyncSignal.SignalChildWorkflow(ctx, childNormalizeFlowFuture, model.NormalizePayload{ Done: false, SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, TableNameSchemaMapping: state.SyncFlowOptions.TableNameSchemaMapping, @@ -610,7 +606,7 @@ func CDCFlowWorkflowWithConfig( return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, state) } if !normDone { - normWaitChan.Receive(ctx, nil) + normWaitChan.Receive(ctx) } } diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 84005e2b47..7b19e63b9f 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -26,7 +26,7 @@ func NormalizeFlowWorkflow( results := make([]model.NormalizeResponse, 0, 4) errors := make([]string, 0) - syncChan := workflow.GetSignalChannel(ctx, shared.NormalizeSyncSignalName) + syncChan := model.NormalizeSyncSignal.GetSignalChannel(ctx) var stopLoop, canceled bool var lastSyncBatchID, syncBatchID int64 @@ -37,9 +37,7 @@ func NormalizeFlowWorkflow( selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { canceled = true }) - selector.AddReceive(syncChan, func(c workflow.ReceiveChannel, _ bool) { - var s model.NormalizeSignal - c.ReceiveAsync(&s) + syncChan.AddToSelector(selector, func(s model.NormalizePayload, _ bool) { if s.Done { stopLoop = true } @@ -82,11 +80,10 @@ func NormalizeFlowWorkflow( if !peerdbenv.PeerDBEnableParallelSyncNormalize() { parent := workflow.GetInfo(ctx).ParentWorkflowExecution - workflow.SignalExternalWorkflow( + model.NormalizeSyncDoneSignal.SignalExternalWorkflow( ctx, parent.ID, parent.RunID, - shared.NormalizeSyncDoneSignalName, struct{}{}, ) } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 4014efdc72..2266784e40 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -13,6 +13,7 @@ import ( "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" ) @@ -24,7 +25,7 @@ type QRepFlowExecution struct { // being tracked for future workflow signalling childPartitionWorkflows []workflow.ChildWorkflowFuture // Current signalled state of the peer flow. - activeSignal shared.CDCFlowSignal + activeSignal model.CDCFlowSignal } type QRepPartitionFlowExecution struct { @@ -68,7 +69,7 @@ func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUU logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), runUUID: runUUID, childPartitionWorkflows: nil, - activeSignal: shared.NoopSignal, + activeSignal: model.NoopSignal, } } @@ -380,11 +381,10 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta return nil } -func (q *QRepFlowExecution) receiveAndHandleSignalAsync(signalChan workflow.ReceiveChannel) { - var signalVal shared.CDCFlowSignal - ok := signalChan.ReceiveAsync(&signalVal) +func (q *QRepFlowExecution) receiveAndHandleSignalAsync(signalChan model.TypedReceiveChannel[model.CDCFlowSignal]) { + val, ok := signalChan.ReceiveAsync() if ok { - q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) } } @@ -516,19 +516,18 @@ func QRepFlowWorkflow( // here, we handle signals after the end of the flow because a new workflow does not inherit the signals // and the chance of missing a signal is much higher if the check is before the time consuming parts run - signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) + signalChan := model.FlowSignal.GetSignalChannel(ctx) q.receiveAndHandleSignalAsync(signalChan) - if q.activeSignal == shared.PauseSignal { + if q.activeSignal == model.PauseSignal { startTime := time.Now() state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED - var signalVal shared.CDCFlowSignal - for q.activeSignal == shared.PauseSignal { + for q.activeSignal == model.PauseSignal { logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate - ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) + val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute) if ok { - q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) } else if err := ctx.Err(); err != nil { return err } diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 878805094d..574550ee2e 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -9,6 +9,7 @@ import ( "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" ) @@ -85,19 +86,18 @@ func XminFlowWorkflow( // here, we handle signals after the end of the flow because a new workflow does not inherit the signals // and the chance of missing a signal is much higher if the check is before the time consuming parts run - signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) + signalChan := model.FlowSignal.GetSignalChannel(ctx) q.receiveAndHandleSignalAsync(signalChan) - if q.activeSignal == shared.PauseSignal { + if q.activeSignal == model.PauseSignal { startTime := time.Now() state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED - var signalVal shared.CDCFlowSignal - for q.activeSignal == shared.PauseSignal { + for q.activeSignal == model.PauseSignal { logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate - ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) + val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute) if ok { - q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, logger) + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, logger) } else if err := ctx.Err(); err != nil { return err }