Skip to content

Commit

Permalink
Introduce typed signal wrappers for temporal signals (#1360)
Browse files Browse the repository at this point in the history
Splitting up cdc_flow/sync_flow will be introducing more signals,
& I was having a hard enough time keeping these 4 straight
  • Loading branch information
serprex authored Feb 22, 2024
1 parent 7ba5085 commit e09ac80
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 93 deletions.
17 changes: 9 additions & 8 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)
Expand Down Expand Up @@ -415,11 +416,11 @@ func (h *FlowRequestHandler) FlowStateChange(
}

if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil {
err = h.temporalClient.SignalWorkflow(
err = model.CDCDynamicPropertiesSignal.SignalClientWorkflow(
ctx,
h.temporalClient,
workflowID,
"",
shared.CDCDynamicPropertiesSignalName,
req.FlowConfigUpdate.GetCdcFlowConfigUpdate(),
)
if err != nil {
Expand All @@ -435,21 +436,21 @@ func (h *FlowRequestHandler) FlowStateChange(
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
err = model.FlowSignal.SignalClientWorkflow(
ctx,
h.temporalClient,
workflowID,
"",
shared.FlowSignalName,
shared.PauseSignal,
model.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
err = h.temporalClient.SignalWorkflow(
err = model.FlowSignal.SignalClientWorkflow(
ctx,
h.temporalClient,
workflowID,
"",
shared.FlowSignalName,
shared.NoopSignal,
model.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState != protos.FlowStatus_STATUS_TERMINATED) {
Expand Down
7 changes: 4 additions & 3 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -1192,7 +1193,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
// too short of a gap between signals also causes issues
// might have something to do with how test workflows handle fast-forwarding time.
env.RegisterDelayedCallback(func() {
env.SignalWorkflow(shared.FlowSignalName, shared.PauseSignal)
e2e.EnvSignalWorkflow(env, model.FlowSignal, model.PauseSignal)
s.t.Log("Sent pause signal")
sentPause = true
}, 28*time.Second)
Expand All @@ -1204,7 +1205,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
return false
}
isPaused = true
env.SignalWorkflow(shared.CDCDynamicPropertiesSignalName, &protos.CDCFlowConfigUpdate{
e2e.EnvSignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{
IdleTimeout: 14,
BatchSize: 12,
AdditionalTables: []*protos.TableMapping{
Expand All @@ -1224,7 +1225,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
if !sentUpdate {
return false
}
env.SignalWorkflow(shared.FlowSignalName, shared.NoopSignal)
e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal)
s.t.Log("Sent resume signal")
return true
})
Expand Down
4 changes: 4 additions & 0 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnv
})
}

func EnvSignalWorkflow[T any](env *testsuite.TestWorkflowEnvironment, signal model.TypedSignal[T], value T) {
env.SignalWorkflow(signal.Name, value)
}

// Helper function to assert errors in go routines running concurrent to workflows
// This achieves two goals:
// 1. cancel workflow to avoid waiting on goroutine which has failed
Expand Down
2 changes: 1 addition & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ type SyncResponse struct {
RelationMessageMapping RelationMessageMapping
}

type NormalizeSignal struct {
type NormalizePayload struct {
Done bool
SyncBatchID int64
TableNameSchemaMapping map[string]*protos.TableSchema
Expand Down
139 changes: 139 additions & 0 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package model

import (
"context"
"time"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peer-flow/generated/protos"
)

// typed wrapper around temporal signals

type TypedSignal[T any] struct {
Name string
}

func (self TypedSignal[T]) GetSignalChannel(ctx workflow.Context) TypedReceiveChannel[T] {
return TypedReceiveChannel[T]{
Chan: workflow.GetSignalChannel(ctx, self.Name),
}
}

func (self TypedSignal[T]) SignalClientWorkflow(
ctx context.Context,
c client.Client,
workflowID string,
runID string,
value T,
) error {
return c.SignalWorkflow(
ctx,
workflowID,
runID,
self.Name,
value,
)
}

func (self TypedSignal[T]) SignalChildWorkflow(
ctx workflow.Context,
wf workflow.ChildWorkflowFuture,
value T,
) workflow.Future {
return wf.SignalChildWorkflow(ctx, self.Name, value)
}

func (self TypedSignal[T]) SignalExternalWorkflow(
ctx workflow.Context,
workflowID string,
runID string,
value T,
) workflow.Future {
return workflow.SignalExternalWorkflow(ctx, workflowID, runID, self.Name, value)
}

type TypedReceiveChannel[T any] struct {
Chan workflow.ReceiveChannel
}

func (self TypedReceiveChannel[T]) Receive(ctx workflow.Context) (T, bool) {
var result T
more := self.Chan.Receive(ctx, &result)
return result, more
}

func (self TypedReceiveChannel[T]) ReceiveWithTimeout(ctx workflow.Context, timeout time.Duration) (T, bool, bool) {
var result T
ok, more := self.Chan.ReceiveWithTimeout(ctx, timeout, &result)
return result, ok, more
}

func (self TypedReceiveChannel[T]) ReceiveAsync() (T, bool) {
var result T
ok := self.Chan.ReceiveAsync(&result)
return result, ok
}

func (self TypedReceiveChannel[T]) ReceiveAsyncWithMoreFlag() (T, bool, bool) {
var result T
ok, more := self.Chan.ReceiveAsyncWithMoreFlag(&result)
return result, ok, more
}

func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f func(T, bool)) workflow.Selector {
return selector.AddReceive(self.Chan, func(c workflow.ReceiveChannel, more bool) {
var result T
if !c.ReceiveAsync(&result) {
panic("AddReceive selector should not give empty channel")
}
f(result, more)
})
}

type CDCFlowSignal int64

const (
NoopSignal CDCFlowSignal = iota
_
PauseSignal
)

var FlowSignal = TypedSignal[CDCFlowSignal]{
Name: "peer-flow-signal",
}

var CDCDynamicPropertiesSignal = TypedSignal[*protos.CDCFlowConfigUpdate]{
Name: "cdc-dynamic-properties",
}

var NormalizeSyncSignal = TypedSignal[NormalizePayload]{
Name: "normalize-sync",
}

var NormalizeSyncDoneSignal = TypedSignal[struct{}]{
Name: "normalize-sync-done",
}

func FlowSignalHandler(activeSignal CDCFlowSignal,
v CDCFlowSignal, logger log.Logger,
) CDCFlowSignal {
switch v {
case PauseSignal:
logger.Info("received pause signal")
if activeSignal == NoopSignal {
logger.Info("workflow was running, pausing it")
return v
}
case NoopSignal:
logger.Info("received resume signal")
if activeSignal == PauseSignal {
logger.Info("workflow was paused, resuming it")
return v
}
}
return activeSignal
}
13 changes: 1 addition & 12 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ const (
peerFlowTaskQueue = "peer-flow-task-queue"
snapshotFlowTaskQueue = "snapshot-flow-task-queue"

// Signals
FlowSignalName = "peer-flow-signal"
CDCDynamicPropertiesSignalName = "cdc-dynamic-properties"
NormalizeSyncSignalName = "normalize-sync"
NormalizeSyncDoneSignalName = "normalize-sync-done"

// Queries
CDCFlowStateQuery = "q-cdc-flow-state"
QRepFlowStateQuery = "q-qrep-flow-state"
Expand All @@ -29,15 +23,10 @@ const (
const MirrorNameSearchAttribute = "MirrorName"

type (
CDCFlowSignal int64
ContextKey string
ContextKey string
)

const (
NoopSignal CDCFlowSignal = iota
_
PauseSignal

FlowNameKey ContextKey = "flowName"
PartitionIDKey ContextKey = "partitionId"
DeploymentUIDKey ContextKey = "deploymentUid"
Expand Down
25 changes: 0 additions & 25 deletions flow/shared/signals.go

This file was deleted.

Loading

0 comments on commit e09ac80

Please sign in to comment.