Skip to content

Commit

Permalink
boilerplate
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 13, 2024
1 parent e551e73 commit 3f4bec5
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 31 deletions.
102 changes: 102 additions & 0 deletions flow/cmd/custom_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package cmd

import (
"context"
"fmt"

"github.com/PeerDB-io/peer-flow/generated/protos"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

const peerdbPauseGuideDocLink = "https://docs.peerdb.io/features/pause-mirror"

func (h *FlowRequestHandler) CustomSyncFlow(
ctx context.Context, req *protos.CreateCustomFlowRequest,
) (*protos.CreateCustomFlowResponse, error) {
// ---- REQUEST VALIDATION ----
if req.FlowJobName == "" {
return &protos.CreateCustomFlowResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: 0,
ErrorMessage: "Flow job name is not provided",
Ok: false,
}, nil
}

if req.NumberOfSyncs <= 0 || req.NumberOfSyncs > peerflow.MaxSyncsPerCdcFlow {
return &protos.CreateCustomFlowResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: 0,
ErrorMessage: fmt.Sprintf("Sync number request must be between 1 and %d (inclusive). Requested number: %d",
peerflow.MaxSyncsPerCdcFlow, req.NumberOfSyncs),
Ok: false,
}, nil
}

mirrorExists, err := h.CheckIfMirrorNameExists(ctx, req.FlowJobName)
if err != nil {
return &protos.CreateCustomFlowResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: 0,
ErrorMessage: "Server error: unable to check if mirror " + req.FlowJobName + " exists.",
Ok: false,
}, nil
}
if !mirrorExists {
return &protos.CreateCustomFlowResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: 0,
ErrorMessage: req.FlowJobName + "does not exist. This may be because it was dropped.",
Ok: false,
}, nil
}

mirrorStatusResponse, _ := h.MirrorStatus(ctx, &protos.MirrorStatusRequest{
FlowJobName: req.FlowJobName,
})
if mirrorStatusResponse.ErrorMessage != "" {
return &protos.CreateCustomFlowResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: 0,
ErrorMessage: fmt.Sprintf("Server error: unable to check the status of mirror %s: %s",
req.FlowJobName, mirrorStatusResponse.ErrorMessage),
Ok: false,
}, nil
}

if mirrorStatusResponse.CurrentFlowState != protos.FlowStatus_STATUS_PAUSED {
return &protos.CreateCustomFlowResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: 0,
ErrorMessage: fmt.Sprintf(`Requested mirror %s is not paused. This is a requirement.
The mirror can be paused via PeerDB UI. Please follow %s`,
req.FlowJobName, peerdbPauseGuideDocLink),
Ok: false,
}, nil
}
// ---- REQUEST VALIDATED ----

// Resume mirror with custom sync number
_, err = h.FlowStateChange(ctx, &protos.FlowStateChangeRequest{
FlowJobName: req.FlowJobName,
RequestedFlowState: protos.FlowStatus_STATUS_RUNNING,
FlowConfigUpdate: nil,
CustomNumberOfSyncs: req.NumberOfSyncs,
})
if err != nil {
return &protos.CreateCustomFlowResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: 0,
ErrorMessage: fmt.Sprintf("Unable to kick off sync for mirror %s:%s",
req.FlowJobName, err.Error()),
Ok: false,
}, nil
}

return &protos.CreateCustomFlowResponse{
FlowJobName: req.FlowJobName,
NumberOfSyncs: req.NumberOfSyncs,
ErrorMessage: "",
Ok: true,
}, nil
}
10 changes: 8 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,16 +429,22 @@ func (h *FlowRequestHandler) FlowStateChange(
h.temporalClient,
workflowID,
"",
model.PauseSignal,
model.CDCFlowSignalProperties{
Signal: model.PauseSignal,
},
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
slog.Info("Resume handler", slog.Int("customNumberOfSyncs", int(req.CustomNumberOfSyncs)))
err = model.FlowSignal.SignalClientWorkflow(
ctx,
h.temporalClient,
workflowID,
"",
model.NoopSignal,
model.CDCFlowSignalProperties{
Signal: model.NoopSignal,
CustomNumberOfSyncs: int(req.CustomNumberOfSyncs),
},
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState != protos.FlowStatus_STATUS_TERMINATED) {
Expand Down
6 changes: 4 additions & 2 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ func (h *FlowRequestHandler) MirrorStatus(

workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
return &protos.MirrorStatusResponse{
ErrorMessage: "unable to get workflow ID " + err.Error(),
}, nil
}

currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: "unable to get flow state: " + err.Error(),
ErrorMessage: "unable to get workflow status " + err.Error(),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (h *FlowRequestHandler) CheckIfMirrorNameExists(ctx context.Context, mirror
var nameExists pgtype.Bool
err := h.pool.QueryRow(ctx, "SELECT EXISTS(SELECT * FROM flows WHERE name = $1)", mirrorName).Scan(&nameExists)
if err != nil {
return true, fmt.Errorf("failed to check if mirror name exists: %v", err)
return false, fmt.Errorf("failed to check if mirror name exists: %v", err)
}

return nameExists.Bool, nil
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ type RenameTablesConnector interface {
}

func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
if config == nil {
return nil, errors.ErrUnsupported
}
switch inner := config.Config.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, inner.PostgresConfig)
Expand Down
8 changes: 6 additions & 2 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {

if !s.t.Failed() {
addRows(1)
e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)
e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{
Signal: model.PauseSignal,
})
addRows(1)
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
// keep adding 1 more row - finishing another sync
Expand All @@ -1002,7 +1004,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
// add rows to both tables before resuming - should handle
addRows(18)

e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal)
e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{
Signal: model.NoopSignal,
})

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
Expand Down
8 changes: 6 additions & 2 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() {

tc := e2e.NewTemporalClient(s.t)
env := e2e.RunQRepFlowWorkflow(tc, config)
e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)
e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{
Signal: model.PauseSignal,
})

e2e.EnvWaitFor(s.t, env, 3*time.Minute, "pausing", func() bool {
response, err := env.Query(shared.QRepFlowStateQuery)
Expand All @@ -389,7 +391,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() {
}
return state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED
})
e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal)
e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{
Signal: model.NoopSignal,
})
e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool {
response, err := env.Query(shared.QRepFlowStateQuery)
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f f
}

type CDCFlowSignal int64
type CDCFlowSignalProperties struct {
Signal CDCFlowSignal
CustomNumberOfSyncs int
}

const (
NoopSignal CDCFlowSignal = iota
Expand All @@ -109,25 +113,31 @@ const (

func FlowSignalHandler(activeSignal CDCFlowSignal,
v CDCFlowSignal, logger log.Logger,
) CDCFlowSignal {
) CDCFlowSignalProperties {
switch v {
case PauseSignal:
logger.Info("received pause signal")
if activeSignal == NoopSignal {
logger.Info("workflow was running, pausing it")
return v
return CDCFlowSignalProperties{
Signal: v,
}
}
case NoopSignal:
logger.Info("received resume signal")
if activeSignal == PauseSignal {
logger.Info("workflow was paused, resuming it")
return v
return CDCFlowSignalProperties{
Signal: v,
}
}
}
return activeSignal
return CDCFlowSignalProperties{
Signal: activeSignal,
}
}

var FlowSignal = TypedSignal[CDCFlowSignal]{
var FlowSignal = TypedSignal[CDCFlowSignalProperties]{
Name: "peer-flow-signal",
}

Expand Down
30 changes: 22 additions & 8 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func GetChildWorkflowID(
type CDCFlowWorkflowResult = CDCFlowWorkflowState

const (
maxSyncsPerCdcFlow = 32
MaxSyncsPerCdcFlow = 32
)

func processCDCFlowConfigUpdate(
Expand Down Expand Up @@ -196,7 +196,6 @@ func CDCFlowWorkflow(

logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cfg.FlowJobName))
flowSignalChan := model.FlowSignal.GetSignalChannel(ctx)

err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) {
return *state, nil
})
Expand All @@ -221,11 +220,20 @@ func CDCFlowWorkflow(
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}

var syncCountLimit int
if state.ActiveSignal == model.PauseSignal {
selector := workflow.NewNamedSelector(ctx, "PauseLoop")
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger)
flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignalProperties, _ bool) {
cdcFlowData := model.FlowSignalHandler(state.ActiveSignal, val.Signal, logger)
slog.Info("value of signal", slog.Any("signal", cdcFlowData.Signal))
slog.Info("cdc signal val", slog.Any("val", val))
state.ActiveSignal = cdcFlowData.Signal
syncCountLimit = val.CustomNumberOfSyncs
if syncCountLimit <= 0 {
syncCountLimit = MaxSyncsPerCdcFlow
}
slog.Info("sync limit reception inside pause", slog.Int("limit", syncCountLimit))
})
addCdcPropertiesSignalListener(ctx, logger, selector, state)

Expand Down Expand Up @@ -375,7 +383,6 @@ func CDCFlowWorkflow(

var restart, finished bool
syncCount := 0

syncFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: syncFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
Expand Down Expand Up @@ -444,13 +451,20 @@ func CDCFlowWorkflow(
}

logger.Info("normalize finished, finishing")
if syncCount == int(syncCountLimit) {
logger.Info("sync count limit reached, pausing",
slog.Int("limit", syncCountLimit),
slog.Int("count", syncCount))
state.ActiveSignal = model.PauseSignal
}
normFlowFuture = nil
restart = true
finished = true
})

flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) {
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger)
flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignalProperties, _ bool) {
cdcFlowData := model.FlowSignalHandler(state.ActiveSignal, val.Signal, logger)
state.ActiveSignal = cdcFlowData.Signal
})

syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx)
Expand Down Expand Up @@ -492,7 +506,7 @@ func CDCFlowWorkflow(
return state, err
}

if state.ActiveSignal == model.PauseSignal || syncCount >= maxSyncsPerCdcFlow {
if state.ActiveSignal == model.PauseSignal || syncCount >= MaxSyncsPerCdcFlow {
restart = true
if syncFlowFuture != nil {
err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil)
Expand Down
10 changes: 5 additions & 5 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error {

func (q *QRepFlowExecution) waitForNewRows(
ctx workflow.Context,
signalChan model.TypedReceiveChannel[model.CDCFlowSignal],
signalChan model.TypedReceiveChannel[model.CDCFlowSignalProperties],
lastPartition *protos.QRepPartition,
) error {
ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
Expand All @@ -336,8 +336,8 @@ func (q *QRepFlowExecution) waitForNewRows(
var newRows bool
var waitErr error
waitSelector := workflow.NewNamedSelector(ctx, "WaitForRows")
signalChan.AddToSelector(waitSelector, func(val model.CDCFlowSignal, _ bool) {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
signalChan.AddToSelector(waitSelector, func(val model.CDCFlowSignalProperties, _ bool) {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal
})
waitSelector.AddFuture(future, func(f workflow.Future) {
newRows = true
Expand Down Expand Up @@ -541,7 +541,7 @@ func QRepFlowWorkflow(
// only place we block on receive, so signal processing is immediate
val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute)
if ok {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal
} else if err := ctx.Err(); err != nil {
return err
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func QRepFlowWorkflow(
if !ok {
break
}
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal
}

logger.Info("Continuing as new workflow",
Expand Down
4 changes: 2 additions & 2 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func XminFlowWorkflow(
// only place we block on receive, so signal processing is immediate
val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute)
if ok {
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, logger)
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, logger).Signal
} else if err := ctx.Err(); err != nil {
return err
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func XminFlowWorkflow(
if !ok {
break
}
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger)
q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal
}

logger.Info("Continuing as new workflow",
Expand Down
Loading

0 comments on commit 3f4bec5

Please sign in to comment.