From 3424dcaa1430ff4454eb4f7e2a22295b35dc73a0 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 14 May 2024 16:47:08 +0530 Subject: [PATCH] fix the wiring --- flow/workflows/cdc_flow.go | 2 +- nexus/flow-rs/src/grpc.rs | 3 +-- ui/app/mirrors/[mirrorId]/edit/page.tsx | 3 ++- ui/app/mirrors/[mirrorId]/handlers.ts | 1 - 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index cf6719544a..31881ccd25 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -232,7 +232,6 @@ func CDCFlowWorkflow( state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger) }) addCdcPropertiesSignalListener(ctx, logger, selector, state) - syncCountLimit = int(state.SyncFlowOptions.NumberOfSyncs) startTime := workflow.Now(ctx) state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED @@ -251,6 +250,7 @@ func CDCFlowWorkflow( if err != nil { return state, err } + syncCountLimit = int(state.SyncFlowOptions.NumberOfSyncs) logger.Info("wiping flow state after state update processing") // finished processing, wipe it state.FlowConfigUpdate = nil diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index ceb9fbb043..a07115c1ec 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -114,7 +114,6 @@ impl FlowGrpcClient { source_peer: Some(workflow_details.source_peer), destination_peer: Some(workflow_details.destination_peer), flow_config_update, - custom_number_of_syncs:0, }; let response = self.client.flow_state_change(state_change_req).await?; let state_change_response = response.into_inner(); @@ -176,7 +175,7 @@ impl FlowGrpcClient { initial_snapshot_only: job.initial_snapshot_only, script: job.script.clone(), system: system as i32, - idle_timeout_seconds: job.sync_interval.unwrap_or_default() + idle_timeout_seconds: job.sync_interval.unwrap_or_default(), }; self.start_peer_flow(flow_conn_cfg).await diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index c6aae6c010..3a2f1df7e1 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -38,6 +38,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { batchSize: defaultBatchSize, idleTimeout: defaultIdleTimeout, additionalTables: [], + numberOfSyncs: 0, }); const { push } = useRouter(); @@ -53,6 +54,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { (res as MirrorStatusResponse).cdcStatus?.config?.idleTimeoutSeconds || defaultIdleTimeout, additionalTables: [], + numberOfSyncs: 0, }); }); }, [mirrorId, defaultBatchSize, defaultIdleTimeout]); @@ -92,7 +94,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { flowConfigUpdate: { cdcFlowConfigUpdate: { ...config, additionalTables }, }, - customNumberOfSyncs: 0, }; const res = await fetch(`/api/mirrors/state_change`, { method: 'POST', diff --git a/ui/app/mirrors/[mirrorId]/handlers.ts b/ui/app/mirrors/[mirrorId]/handlers.ts index 4b271278f8..bd6e0d3e2f 100644 --- a/ui/app/mirrors/[mirrorId]/handlers.ts +++ b/ui/app/mirrors/[mirrorId]/handlers.ts @@ -28,7 +28,6 @@ export const changeFlowState = async ( sourcePeer: mirrorConfig.source, destinationPeer: mirrorConfig.destination, requestedFlowState: flowState, - customNumberOfSyncs: 0, }; await fetch(`/api/mirrors/state_change`, { method: 'POST',