Skip to content

Commit

Permalink
fix the wiring
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 14, 2024
1 parent 3742039 commit 28edd92
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 5 deletions.
2 changes: 1 addition & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ func CDCFlowWorkflow(
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, logger)
})
addCdcPropertiesSignalListener(ctx, logger, selector, state)
syncCountLimit = int(state.SyncFlowOptions.NumberOfSyncs)
startTime := workflow.Now(ctx)
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED

Expand All @@ -251,6 +250,7 @@ func CDCFlowWorkflow(
if err != nil {
return state, err
}
syncCountLimit = int(state.SyncFlowOptions.NumberOfSyncs)
logger.Info("wiping flow state after state update processing")
// finished processing, wipe it
state.FlowConfigUpdate = nil
Expand Down
3 changes: 1 addition & 2 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ impl FlowGrpcClient {
source_peer: Some(workflow_details.source_peer),
destination_peer: Some(workflow_details.destination_peer),
flow_config_update,
custom_number_of_syncs:0,
};
let response = self.client.flow_state_change(state_change_req).await?;
let state_change_response = response.into_inner();
Expand Down Expand Up @@ -176,7 +175,7 @@ impl FlowGrpcClient {
initial_snapshot_only: job.initial_snapshot_only,
script: job.script.clone(),
system: system as i32,
idle_timeout_seconds: job.sync_interval.unwrap_or_default()
idle_timeout_seconds: job.sync_interval.unwrap_or_default(),
};

self.start_peer_flow(flow_conn_cfg).await
Expand Down
3 changes: 2 additions & 1 deletion ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
batchSize: defaultBatchSize,
idleTimeout: defaultIdleTimeout,
additionalTables: [],
numberOfSyncs: 0,
});
const { push } = useRouter();

Expand All @@ -53,6 +54,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
(res as MirrorStatusResponse).cdcStatus?.config?.idleTimeoutSeconds ||
defaultIdleTimeout,
additionalTables: [],
numberOfSyncs: 0,
});
});
}, [mirrorId, defaultBatchSize, defaultIdleTimeout]);
Expand Down Expand Up @@ -92,7 +94,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
flowConfigUpdate: {
cdcFlowConfigUpdate: { ...config, additionalTables },
},
customNumberOfSyncs: 0,
};
const res = await fetch(`/api/mirrors/state_change`, {
method: 'POST',
Expand Down
1 change: 0 additions & 1 deletion ui/app/mirrors/[mirrorId]/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ export const changeFlowState = async (
sourcePeer: mirrorConfig.source,
destinationPeer: mirrorConfig.destination,
requestedFlowState: flowState,
customNumberOfSyncs: 0,
};
await fetch(`/api/mirrors/state_change`, {
method: 'POST',
Expand Down

0 comments on commit 28edd92

Please sign in to comment.