From 3f4bec591d3400502a29943dec1a824a89507469 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 13 May 2024 11:37:50 +0530 Subject: [PATCH] boilerplate --- flow/cmd/custom_sync.go | 102 ++++++++++++++++++++++++ flow/cmd/handler.go | 10 ++- flow/cmd/mirror_status.go | 6 +- flow/cmd/validate_mirror.go | 2 +- flow/connectors/core.go | 3 + flow/e2e/postgres/peer_flow_pg_test.go | 8 +- flow/e2e/postgres/qrep_flow_pg_test.go | 8 +- flow/model/signals.go | 20 +++-- flow/workflows/cdc_flow.go | 30 +++++-- flow/workflows/qrep_flow.go | 10 +-- flow/workflows/xmin_flow.go | 4 +- nexus/flow-rs/src/grpc.rs | 3 +- protos/route.proto | 23 +++++- ui/app/mirrors/[mirrorId]/edit/page.tsx | 1 + ui/app/mirrors/[mirrorId]/handlers.ts | 1 + 15 files changed, 200 insertions(+), 31 deletions(-) create mode 100644 flow/cmd/custom_sync.go diff --git a/flow/cmd/custom_sync.go b/flow/cmd/custom_sync.go new file mode 100644 index 0000000000..25c16fb910 --- /dev/null +++ b/flow/cmd/custom_sync.go @@ -0,0 +1,102 @@ +package cmd + +import ( + "context" + "fmt" + + "github.com/PeerDB-io/peer-flow/generated/protos" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +const peerdbPauseGuideDocLink = "https://docs.peerdb.io/features/pause-mirror" + +func (h *FlowRequestHandler) CustomSyncFlow( + ctx context.Context, req *protos.CreateCustomFlowRequest, +) (*protos.CreateCustomFlowResponse, error) { + // ---- REQUEST VALIDATION ---- + if req.FlowJobName == "" { + return &protos.CreateCustomFlowResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: 0, + ErrorMessage: "Flow job name is not provided", + Ok: false, + }, nil + } + + if req.NumberOfSyncs <= 0 || req.NumberOfSyncs > peerflow.MaxSyncsPerCdcFlow { + return &protos.CreateCustomFlowResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: 0, + ErrorMessage: fmt.Sprintf("Sync number request must be between 1 and %d (inclusive). Requested number: %d", + peerflow.MaxSyncsPerCdcFlow, req.NumberOfSyncs), + Ok: false, + }, nil + } + + mirrorExists, err := h.CheckIfMirrorNameExists(ctx, req.FlowJobName) + if err != nil { + return &protos.CreateCustomFlowResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: 0, + ErrorMessage: "Server error: unable to check if mirror " + req.FlowJobName + " exists.", + Ok: false, + }, nil + } + if !mirrorExists { + return &protos.CreateCustomFlowResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: 0, + ErrorMessage: req.FlowJobName + "does not exist. This may be because it was dropped.", + Ok: false, + }, nil + } + + mirrorStatusResponse, _ := h.MirrorStatus(ctx, &protos.MirrorStatusRequest{ + FlowJobName: req.FlowJobName, + }) + if mirrorStatusResponse.ErrorMessage != "" { + return &protos.CreateCustomFlowResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: 0, + ErrorMessage: fmt.Sprintf("Server error: unable to check the status of mirror %s: %s", + req.FlowJobName, mirrorStatusResponse.ErrorMessage), + Ok: false, + }, nil + } + + if mirrorStatusResponse.CurrentFlowState != protos.FlowStatus_STATUS_PAUSED { + return &protos.CreateCustomFlowResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: 0, + ErrorMessage: fmt.Sprintf(`Requested mirror %s is not paused. This is a requirement. + The mirror can be paused via PeerDB UI. Please follow %s`, + req.FlowJobName, peerdbPauseGuideDocLink), + Ok: false, + }, nil + } + // ---- REQUEST VALIDATED ---- + + // Resume mirror with custom sync number + _, err = h.FlowStateChange(ctx, &protos.FlowStateChangeRequest{ + FlowJobName: req.FlowJobName, + RequestedFlowState: protos.FlowStatus_STATUS_RUNNING, + FlowConfigUpdate: nil, + CustomNumberOfSyncs: req.NumberOfSyncs, + }) + if err != nil { + return &protos.CreateCustomFlowResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: 0, + ErrorMessage: fmt.Sprintf("Unable to kick off sync for mirror %s:%s", + req.FlowJobName, err.Error()), + Ok: false, + }, nil + } + + return &protos.CreateCustomFlowResponse{ + FlowJobName: req.FlowJobName, + NumberOfSyncs: req.NumberOfSyncs, + ErrorMessage: "", + Ok: true, + }, nil +} diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 41dd04b4e7..c6d4445fe4 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -429,16 +429,22 @@ func (h *FlowRequestHandler) FlowStateChange( h.temporalClient, workflowID, "", - model.PauseSignal, + model.CDCFlowSignalProperties{ + Signal: model.PauseSignal, + }, ) } else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING && currState == protos.FlowStatus_STATUS_PAUSED { + slog.Info("Resume handler", slog.Int("customNumberOfSyncs", int(req.CustomNumberOfSyncs))) err = model.FlowSignal.SignalClientWorkflow( ctx, h.temporalClient, workflowID, "", - model.NoopSignal, + model.CDCFlowSignalProperties{ + Signal: model.NoopSignal, + CustomNumberOfSyncs: int(req.CustomNumberOfSyncs), + }, ) } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && (currState != protos.FlowStatus_STATUS_TERMINATED) { diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 06f64d34a9..9e72ce6466 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -30,13 +30,15 @@ func (h *FlowRequestHandler) MirrorStatus( workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) if err != nil { - return nil, err + return &protos.MirrorStatusResponse{ + ErrorMessage: "unable to get workflow ID " + err.Error(), + }, nil } currState, err := h.getWorkflowStatus(ctx, workflowID) if err != nil { return &protos.MirrorStatusResponse{ - ErrorMessage: "unable to get flow state: " + err.Error(), + ErrorMessage: "unable to get workflow status " + err.Error(), }, nil } diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index b4304b0504..f67a4f1d40 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -127,7 +127,7 @@ func (h *FlowRequestHandler) CheckIfMirrorNameExists(ctx context.Context, mirror var nameExists pgtype.Bool err := h.pool.QueryRow(ctx, "SELECT EXISTS(SELECT * FROM flows WHERE name = $1)", mirrorName).Scan(&nameExists) if err != nil { - return true, fmt.Errorf("failed to check if mirror name exists: %v", err) + return false, fmt.Errorf("failed to check if mirror name exists: %v", err) } return nameExists.Bool, nil diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 0814feabd9..15b7c45bed 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -219,6 +219,9 @@ type RenameTablesConnector interface { } func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) { + if config == nil { + return nil, errors.ErrUnsupported + } switch inner := config.Config.(type) { case *protos.Peer_PostgresConfig: return connpostgres.NewPostgresConnector(ctx, inner.PostgresConfig) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index b3dd3a2dc5..2996142ec1 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -978,7 +978,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { if !s.t.Failed() { addRows(1) - e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) + e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{ + Signal: model.PauseSignal, + }) addRows(1) e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool { // keep adding 1 more row - finishing another sync @@ -1002,7 +1004,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { // add rows to both tables before resuming - should handle addRows(18) - e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal) + e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{ + Signal: model.NoopSignal, + }) e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool { return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 63a226ae13..c1ebf7ff99 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -374,7 +374,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { tc := e2e.NewTemporalClient(s.t) env := e2e.RunQRepFlowWorkflow(tc, config) - e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal) + e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{ + Signal: model.PauseSignal, + }) e2e.EnvWaitFor(s.t, env, 3*time.Minute, "pausing", func() bool { response, err := env.Query(shared.QRepFlowStateQuery) @@ -389,7 +391,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { } return state.CurrentFlowStatus == protos.FlowStatus_STATUS_PAUSED }) - e2e.SignalWorkflow(env, model.FlowSignal, model.NoopSignal) + e2e.SignalWorkflow(env, model.FlowSignal, model.CDCFlowSignalProperties{ + Signal: model.NoopSignal, + }) e2e.EnvWaitFor(s.t, env, time.Minute, "unpausing", func() bool { response, err := env.Query(shared.QRepFlowStateQuery) if err != nil { diff --git a/flow/model/signals.go b/flow/model/signals.go index 53fcf7e1e5..e74285f01a 100644 --- a/flow/model/signals.go +++ b/flow/model/signals.go @@ -100,6 +100,10 @@ func (self TypedReceiveChannel[T]) AddToSelector(selector workflow.Selector, f f } type CDCFlowSignal int64 +type CDCFlowSignalProperties struct { + Signal CDCFlowSignal + CustomNumberOfSyncs int +} const ( NoopSignal CDCFlowSignal = iota @@ -109,25 +113,31 @@ const ( func FlowSignalHandler(activeSignal CDCFlowSignal, v CDCFlowSignal, logger log.Logger, -) CDCFlowSignal { +) CDCFlowSignalProperties { switch v { case PauseSignal: logger.Info("received pause signal") if activeSignal == NoopSignal { logger.Info("workflow was running, pausing it") - return v + return CDCFlowSignalProperties{ + Signal: v, + } } case NoopSignal: logger.Info("received resume signal") if activeSignal == PauseSignal { logger.Info("workflow was paused, resuming it") - return v + return CDCFlowSignalProperties{ + Signal: v, + } } } - return activeSignal + return CDCFlowSignalProperties{ + Signal: activeSignal, + } } -var FlowSignal = TypedSignal[CDCFlowSignal]{ +var FlowSignal = TypedSignal[CDCFlowSignalProperties]{ Name: "peer-flow-signal", } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 8144085fd8..73bc4f87d9 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -82,7 +82,7 @@ func GetChildWorkflowID( type CDCFlowWorkflowResult = CDCFlowWorkflowState const ( - maxSyncsPerCdcFlow = 32 + MaxSyncsPerCdcFlow = 32 ) func processCDCFlowConfigUpdate( @@ -196,7 +196,6 @@ func CDCFlowWorkflow( logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cfg.FlowJobName)) flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) - err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) { return *state, nil }) @@ -221,11 +220,20 @@ func CDCFlowWorkflow( shared.MirrorNameSearchAttribute: cfg.FlowJobName, } + var syncCountLimit int if state.ActiveSignal == model.PauseSignal { selector := workflow.NewNamedSelector(ctx, "PauseLoop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) - flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignal, _ bool) { - state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger) + flowSignalChan.AddToSelector(selector, func(val model.CDCFlowSignalProperties, _ bool) { + cdcFlowData := model.FlowSignalHandler(state.ActiveSignal, val.Signal, logger) + slog.Info("value of signal", slog.Any("signal", cdcFlowData.Signal)) + slog.Info("cdc signal val", slog.Any("val", val)) + state.ActiveSignal = cdcFlowData.Signal + syncCountLimit = val.CustomNumberOfSyncs + if syncCountLimit <= 0 { + syncCountLimit = MaxSyncsPerCdcFlow + } + slog.Info("sync limit reception inside pause", slog.Int("limit", syncCountLimit)) }) addCdcPropertiesSignalListener(ctx, logger, selector, state) @@ -375,7 +383,6 @@ func CDCFlowWorkflow( var restart, finished bool syncCount := 0 - syncFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: syncFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -444,13 +451,20 @@ func CDCFlowWorkflow( } logger.Info("normalize finished, finishing") + if syncCount == int(syncCountLimit) { + logger.Info("sync count limit reached, pausing", + slog.Int("limit", syncCountLimit), + slog.Int("count", syncCount)) + state.ActiveSignal = model.PauseSignal + } normFlowFuture = nil restart = true finished = true }) - flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignal, _ bool) { - state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger) + flowSignalChan.AddToSelector(mainLoopSelector, func(val model.CDCFlowSignalProperties, _ bool) { + cdcFlowData := model.FlowSignalHandler(state.ActiveSignal, val.Signal, logger) + state.ActiveSignal = cdcFlowData.Signal }) syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx) @@ -492,7 +506,7 @@ func CDCFlowWorkflow( return state, err } - if state.ActiveSignal == model.PauseSignal || syncCount >= maxSyncsPerCdcFlow { + if state.ActiveSignal == model.PauseSignal || syncCount >= MaxSyncsPerCdcFlow { restart = true if syncFlowFuture != nil { err := model.SyncStopSignal.SignalChildWorkflow(ctx, syncFlowFuture, struct{}{}).Get(ctx, nil) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 8be2619866..5202d7bd04 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -322,7 +322,7 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error { func (q *QRepFlowExecution) waitForNewRows( ctx workflow.Context, - signalChan model.TypedReceiveChannel[model.CDCFlowSignal], + signalChan model.TypedReceiveChannel[model.CDCFlowSignalProperties], lastPartition *protos.QRepPartition, ) error { ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ @@ -336,8 +336,8 @@ func (q *QRepFlowExecution) waitForNewRows( var newRows bool var waitErr error waitSelector := workflow.NewNamedSelector(ctx, "WaitForRows") - signalChan.AddToSelector(waitSelector, func(val model.CDCFlowSignal, _ bool) { - q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) + signalChan.AddToSelector(waitSelector, func(val model.CDCFlowSignalProperties, _ bool) { + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal }) waitSelector.AddFuture(future, func(f workflow.Future) { newRows = true @@ -541,7 +541,7 @@ func QRepFlowWorkflow( // only place we block on receive, so signal processing is immediate val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute) if ok { - q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal } else if err := ctx.Err(); err != nil { return err } @@ -617,7 +617,7 @@ func QRepFlowWorkflow( if !ok { break } - q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal } logger.Info("Continuing as new workflow", diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 4cd6deece7..f24d02c68b 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -42,7 +42,7 @@ func XminFlowWorkflow( // only place we block on receive, so signal processing is immediate val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute) if ok { - q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, logger) + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, logger).Signal } else if err := ctx.Err(); err != nil { return err } @@ -109,7 +109,7 @@ func XminFlowWorkflow( if !ok { break } - q.activeSignal = model.FlowSignalHandler(q.activeSignal, val, q.logger) + q.activeSignal = model.FlowSignalHandler(q.activeSignal, val.Signal, q.logger).Signal } logger.Info("Continuing as new workflow", diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index a07115c1ec..ceb9fbb043 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -114,6 +114,7 @@ impl FlowGrpcClient { source_peer: Some(workflow_details.source_peer), destination_peer: Some(workflow_details.destination_peer), flow_config_update, + custom_number_of_syncs:0, }; let response = self.client.flow_state_change(state_change_req).await?; let state_change_response = response.into_inner(); @@ -175,7 +176,7 @@ impl FlowGrpcClient { initial_snapshot_only: job.initial_snapshot_only, script: job.script.clone(), system: system as i32, - idle_timeout_seconds: job.sync_interval.unwrap_or_default(), + idle_timeout_seconds: job.sync_interval.unwrap_or_default() }; self.start_peer_flow(flow_conn_cfg).await diff --git a/protos/route.proto b/protos/route.proto index 61a3b89b75..c4602b499a 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -25,6 +25,18 @@ message CreateQRepFlowResponse { string workflow_id = 1; } +message CreateCustomFlowRequest { + string flow_job_name = 1; + int32 number_of_syncs = 2; +} + +message CreateCustomFlowResponse { + string flow_job_name = 1; + int32 number_of_syncs = 2; + string error_message = 3; + bool ok = 4; +} + message ShutdownRequest { string workflow_id = 1; string flow_job_name = 2; @@ -216,6 +228,7 @@ message FlowStateChangeRequest { peerdb_peers.Peer destination_peer = 4; // only can be sent in certain situations optional peerdb_flow.FlowConfigUpdate flow_config_update = 5; + int32 customNumberOfSyncs = 6; } message FlowStateChangeResponse { @@ -237,7 +250,7 @@ service FlowService { body: "*" }; } - rpc ValidateCDCMirror(CreateCDCFlowRequest) returns (ValidateCDCMirrorResponse) { + rpc ValidateCDCMirror(CreateCDCFlowRequest) returns (ValidateCDCMirrorResponse) { option (google.api.http) = { post: "/v1/mirrors/cdc/validate", body: "*" @@ -255,6 +268,7 @@ service FlowService { body: "*" }; } + rpc CreateCDCFlow(CreateCDCFlowRequest) returns (CreateCDCFlowResponse) { option (google.api.http) = { post: "/v1/flows/cdc/create", @@ -267,6 +281,12 @@ service FlowService { body: "*" }; } + rpc CustomSyncFlow(CreateCustomFlowRequest) returns (CreateCustomFlowResponse) { + option (google.api.http) = { + post: "/v1/flows/cdc/create/custom", + body: "*" + }; + } rpc GetSchemas(PostgresPeerActivityInfoRequest) returns (PeerSchemasResponse) { option (google.api.http) = { get: "/v1/peers/schemas" }; @@ -294,6 +314,7 @@ service FlowService { rpc GetStatInfo(PostgresPeerActivityInfoRequest) returns (PeerStatResponse) { option (google.api.http) = { get: "/v1/peers/stats/{peer_name}" }; } + rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) { option (google.api.http) = { post: "/v1/mirrors/drop", body: "*" }; } diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index 9c738fb712..027e6dafa1 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -92,6 +92,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { flowConfigUpdate: { cdcFlowConfigUpdate: { ...config, additionalTables }, }, + customNumberOfSyncs:0 }; const res = await fetch(`/api/mirrors/state_change`, { method: 'POST', diff --git a/ui/app/mirrors/[mirrorId]/handlers.ts b/ui/app/mirrors/[mirrorId]/handlers.ts index bd6e0d3e2f..b486ea81f7 100644 --- a/ui/app/mirrors/[mirrorId]/handlers.ts +++ b/ui/app/mirrors/[mirrorId]/handlers.ts @@ -28,6 +28,7 @@ export const changeFlowState = async ( sourcePeer: mirrorConfig.source, destinationPeer: mirrorConfig.destination, requestedFlowState: flowState, + customNumberOfSyncs:0 }; await fetch(`/api/mirrors/state_change`, { method: 'POST',