From 2b844402f97057f9900d5f350ff835c2622c7195 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Fri, 2 Feb 2024 00:38:58 +0530 Subject: [PATCH] UI: Edit Page and Refactor Edit Mirror Route (#1156) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit You get above by clicking on this button: ![Screenshot 2024-01-25 at 9 01 52 PM](https://github.com/PeerDB-io/peerdb/assets/65964360/865cdd5b-f2b4-41a1-ac10-b7c5efe6fbe8) IdleTimeoutSeconds and SyncBatchSize can be changed anytime, tables can only be added IF mirror is paused, otherwise the button will be disabled. Additional changes for this: moved CDC dynamic properties to the main signal selector and blocking on the main signal selector while paused, so we can receive mirror edits even while paused. pass TableNameSchemaMapping from state always to NormalizeFlow in case new tables were added. added a new property to the tables component to disable selecting of tables already in the mirror. There is a bug where tables added to the mirror dynamically will not reflect in the status screen, will be fixed in a follow up PR. TableMappings Needs to be stored in state as well. --------- Co-authored-by: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Co-authored-by: Kevin Biju --- flow/cmd/handler.go | 82 +++++--- flow/cmd/mirror_status.go | 40 +++- flow/shared/constants.go | 2 +- flow/workflows/cdc_flow.go | 93 ++++----- protos/route.proto | 1 + ui/app/api/mirrors/state/route.ts | 23 ++ .../[mirrorId]/aggregatedCountsByInterval.ts | 0 ui/app/mirrors/{edit => }/[mirrorId]/cdc.tsx | 0 .../{edit => }/[mirrorId]/cdcDetails.tsx | 0 .../{edit => }/[mirrorId]/cdcGraph.tsx | 0 .../{edit => }/[mirrorId]/configValues.ts | 0 ui/app/mirrors/[mirrorId]/edit/page.tsx | 197 ++++++++++++++++++ .../{edit => }/[mirrorId]/nomirror.tsx | 0 ui/app/mirrors/{edit => }/[mirrorId]/page.tsx | 13 +- .../{edit => }/[mirrorId]/syncStatus.tsx | 0 .../{edit => }/[mirrorId]/syncStatusTable.tsx | 0 .../{edit => }/[mirrorId]/tablePairs.tsx | 0 ui/app/mirrors/create/cdc/cdc.tsx | 1 + ui/app/mirrors/create/cdc/schemabox.tsx | 7 + ui/app/mirrors/create/cdc/tablemapping.tsx | 4 + ui/app/mirrors/create/handlers.ts | 4 +- .../status/qrep/[mirrorId]/qrepGraph.tsx | 2 +- ui/app/peers/[peerName]/helpers.tsx | 2 +- ui/app/peers/[peerName]/lagGraph.tsx | 2 +- ui/components/EditButton.tsx | 28 +++ ui/components/MirrorLink.tsx | 2 +- 26 files changed, 406 insertions(+), 97 deletions(-) create mode 100644 ui/app/api/mirrors/state/route.ts rename ui/app/mirrors/{edit => }/[mirrorId]/aggregatedCountsByInterval.ts (100%) rename ui/app/mirrors/{edit => }/[mirrorId]/cdc.tsx (100%) rename ui/app/mirrors/{edit => }/[mirrorId]/cdcDetails.tsx (100%) rename ui/app/mirrors/{edit => }/[mirrorId]/cdcGraph.tsx (100%) rename ui/app/mirrors/{edit => }/[mirrorId]/configValues.ts (100%) create mode 100644 ui/app/mirrors/[mirrorId]/edit/page.tsx rename ui/app/mirrors/{edit => }/[mirrorId]/nomirror.tsx (100%) rename ui/app/mirrors/{edit => }/[mirrorId]/page.tsx (90%) rename ui/app/mirrors/{edit => }/[mirrorId]/syncStatus.tsx (100%) rename ui/app/mirrors/{edit => }/[mirrorId]/syncStatusTable.tsx (100%) rename ui/app/mirrors/{edit => }/[mirrorId]/tablePairs.tsx (100%) create mode 100644 ui/components/EditButton.tsx diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 1d96e5a84f..cbc9ed2fb3 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -425,47 +425,63 @@ func (h *FlowRequestHandler) FlowStateChange( return nil, err } - if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED && - currState == protos.FlowStatus_STATUS_RUNNING { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING) - if err != nil { - return nil, err - } - err = h.temporalClient.SignalWorkflow( - ctx, - workflowID, - "", - shared.FlowSignalName, - shared.PauseSignal, - ) - } else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING && - currState == protos.FlowStatus_STATUS_PAUSED { + if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil { err = h.temporalClient.SignalWorkflow( ctx, workflowID, "", - shared.FlowSignalName, - shared.NoopSignal, + shared.CDCDynamicPropertiesSignalName, + req.FlowConfigUpdate.GetCdcFlowConfigUpdate(), ) - } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && - (currState == protos.FlowStatus_STATUS_RUNNING || currState == protos.FlowStatus_STATUS_PAUSED) { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to signal workflow: %w", err) } - _, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{ - WorkflowId: workflowID, - FlowJobName: req.FlowJobName, - SourcePeer: req.SourcePeer, - DestinationPeer: req.DestinationPeer, - RemoveFlowEntry: false, - }) - } else { - return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v", - req.RequestedFlowState, currState) } - if err != nil { - return nil, fmt.Errorf("unable to signal workflow: %w", err) + + // in case we only want to update properties without changing status + if req.RequestedFlowState != protos.FlowStatus_STATUS_UNKNOWN { + if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED && + currState == protos.FlowStatus_STATUS_RUNNING { + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING) + if err != nil { + return nil, err + } + err = h.temporalClient.SignalWorkflow( + ctx, + workflowID, + "", + shared.FlowSignalName, + shared.PauseSignal, + ) + } else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING && + currState == protos.FlowStatus_STATUS_PAUSED { + err = h.temporalClient.SignalWorkflow( + ctx, + workflowID, + "", + shared.FlowSignalName, + shared.NoopSignal, + ) + } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && + (currState != protos.FlowStatus_STATUS_TERMINATED) { + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING) + if err != nil { + return nil, err + } + _, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{ + WorkflowId: workflowID, + FlowJobName: req.FlowJobName, + SourcePeer: req.SourcePeer, + DestinationPeer: req.DestinationPeer, + RemoveFlowEntry: false, + }) + } else if req.RequestedFlowState != currState { + return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v", + req.RequestedFlowState, currState) + } + if err != nil { + return nil, fmt.Errorf("unable to signal workflow: %w", err) + } } return &protos.FlowStateChangeResponse{ diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index ff14f04f35..ceca719c69 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" ) func (h *FlowRequestHandler) MirrorStatus( @@ -81,6 +82,18 @@ func (h *FlowRequestHandler) CDCFlowStatus( if err != nil { return nil, err } + workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) + if err != nil { + return nil, err + } + state, err := h.getCDCWorkflowState(ctx, workflowID) + if err != nil { + return nil, err + } + + // patching config to show latest values from state + config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds + config.MaxBatchSize = state.SyncFlowOptions.BatchSize var initialCopyStatus *protos.SnapshotStatus @@ -339,16 +352,16 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string) func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (protos.FlowStatus, error) { res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery) if err != nil { - slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) + slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) return protos.FlowStatus_STATUS_UNKNOWN, - fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) } var state protos.FlowStatus err = res.Get(&state) if err != nil { - slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) + slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) return protos.FlowStatus_STATUS_UNKNOWN, - fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) } return state, nil } @@ -365,3 +378,22 @@ func (h *FlowRequestHandler) updateWorkflowStatus( } return nil } + +func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context, + workflowID string, +) (*peerflow.CDCFlowWorkflowState, error) { + res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.CDCFlowStateQuery) + if err != nil { + slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) + return nil, + fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + } + var state peerflow.CDCFlowWorkflowState + err = res.Get(&state) + if err != nil { + slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) + return nil, + fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + } + return &state, nil +} diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 68edd3c6d3..69b4dfda20 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -18,7 +18,7 @@ const ( NormalizeSyncDoneSignalName = "normalize-sync-done" // Queries - CDCFlowStateQuery = "q-cdc-flow-status" + CDCFlowStateQuery = "q-cdc-flow-state" QRepFlowStateQuery = "q-qrep-flow-state" FlowStatusQuery = "q-flow-status" diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index d0f825dfaf..60a085f33b 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -53,6 +53,10 @@ type CDCFlowWorkflowState struct { TableNameSchemaMapping map[string]*protos.TableSchema // flow config update request, set to nil after processed FlowConfigUpdates []*protos.CDCFlowConfigUpdate + // options passed to all SyncFlows + SyncFlowOptions *protos.SyncFlowOptions + // options passed to all NormalizeFlows + NormalizeFlowOptions *protos.NormalizeFlowOptions } // returns a new empty PeerFlowState @@ -75,6 +79,8 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { SrcTableIdNameMapping: nil, TableNameSchemaMapping: nil, FlowConfigUpdates: nil, + SyncFlowOptions: nil, + NormalizeFlowOptions: nil, } } @@ -356,44 +362,17 @@ func CDCFlowWorkflowWithConfig( } } - syncFlowOptions := &protos.SyncFlowOptions{ - BatchSize: limits.MaxBatchSize, - IdleTimeoutSeconds: 0, + state.SyncFlowOptions = &protos.SyncFlowOptions{ + BatchSize: limits.MaxBatchSize, + // this means the env variable assignment path is never hit + IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, SrcTableIdNameMapping: state.SrcTableIdNameMapping, TableNameSchemaMapping: state.TableNameSchemaMapping, } - normalizeFlowOptions := &protos.NormalizeFlowOptions{ + state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ TableNameSchemaMapping: state.TableNameSchemaMapping, } - // add a signal to change CDC properties - cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName) - cdcPropertiesSelector := workflow.NewSelector(ctx) - cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) { - var cdcConfigUpdate *protos.CDCFlowConfigUpdate - c.Receive(ctx, &cdcConfigUpdate) - // only modify for options since SyncFlow uses it - if cdcConfigUpdate.BatchSize > 0 { - syncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize - } - if cdcConfigUpdate.IdleTimeout > 0 { - syncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout - } - if len(cdcConfigUpdate.AdditionalTables) > 0 { - state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) - } - - slog.Info("CDC Signal received. Parameters on signal reception:", - slog.Int("BatchSize", int(syncFlowOptions.BatchSize)), - slog.Int("IdleTimeout", int(syncFlowOptions.IdleTimeoutSeconds)), - slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) - }) - - cdcPropertiesSelector.AddDefault(func() { - w.logger.Info("no batch size signal received, batch size remains: ", - syncFlowOptions.BatchSize) - }) - currentSyncFlowNum := 0 totalRecordsSynced := int64(0) @@ -416,7 +395,7 @@ func CDCFlowWorkflowWithConfig( normCtx, NormalizeFlowWorkflow, cfg, - normalizeFlowOptions, + state.NormalizeFlowOptions, ) var normWaitChan workflow.ReceiveChannel @@ -454,6 +433,27 @@ func CDCFlowWorkflowWithConfig( c.ReceiveAsync(&signalVal) state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) }) + // add a signal to change CDC properties + cdcPropertiesSignalChan := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName) + mainLoopSelector.AddReceive(cdcPropertiesSignalChan, func(c workflow.ReceiveChannel, more bool) { + var cdcConfigUpdate *protos.CDCFlowConfigUpdate + c.Receive(ctx, &cdcConfigUpdate) + // only modify for options since SyncFlow uses it + if cdcConfigUpdate.BatchSize > 0 { + state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize + } + if cdcConfigUpdate.IdleTimeout > 0 { + state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout + } + if len(cdcConfigUpdate.AdditionalTables) > 0 { + state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) + } + + slog.Info("CDC Signal received. Parameters on signal reception:", + slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)), + slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)), + slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) + }) for { for !canceled && mainLoopSelector.HasPending() { @@ -466,31 +466,22 @@ func CDCFlowWorkflowWithConfig( if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED - signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) - var signalVal shared.CDCFlowSignal for state.ActiveSignal == shared.PauseSignal { w.logger.Info("mirror has been paused for ", time.Since(startTime)) // only place we block on receive, so signal processing is immediate - ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) - if ok { - state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) - // only process config updates when going from STATUS_PAUSED to STATUS_RUNNING - if state.ActiveSignal == shared.NoopSignal { - err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch) - if err != nil { - return state, err - } + mainLoopSelector.Select(ctx) + if state.ActiveSignal == shared.NoopSignal { + err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch) + if err != nil { + return state, err } - } else if err := ctx.Err(); err != nil { - return nil, err } } w.logger.Info("mirror has been resumed after ", time.Since(startTime)) } - cdcPropertiesSelector.Select(ctx) state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING // check if total sync flows have been completed @@ -526,12 +517,12 @@ func CDCFlowWorkflowWithConfig( WaitForCancellation: true, } syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) - syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping + state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping childSyncFlowFuture := workflow.ExecuteChildWorkflow( syncCtx, SyncFlowWorkflow, cfg, - syncFlowOptions, + state.SyncFlowOptions, ) var syncDone bool @@ -554,7 +545,6 @@ func CDCFlowWorkflowWithConfig( if childSyncFlowRes != nil { tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) - var normalizeTableNameSchemaMapping map[string]*protos.TableSchema // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. if tableSchemaDeltasCount != 0 { modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) @@ -583,14 +573,13 @@ func CDCFlowWorkflowWithConfig( dstTable := modifiedDstTables[i] state.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable] } - normalizeTableNameSchemaMapping = state.TableNameSchemaMapping } } signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{ Done: false, SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, - TableNameSchemaMapping: normalizeTableNameSchemaMapping, + TableNameSchemaMapping: state.TableNameSchemaMapping, }) normalizeSignalError = signalFuture.Get(ctx, nil) } else { diff --git a/protos/route.proto b/protos/route.proto index 48db51e019..f0d7dd0511 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -96,6 +96,7 @@ message QRepMirrorStatus { // or if we are in the continuous streaming mode. } +// to be removed eventually message CDCSyncStatus { int64 start_lsn = 1; int64 end_lsn = 2; diff --git a/ui/app/api/mirrors/state/route.ts b/ui/app/api/mirrors/state/route.ts new file mode 100644 index 0000000000..cbd0a9969d --- /dev/null +++ b/ui/app/api/mirrors/state/route.ts @@ -0,0 +1,23 @@ +import { + MirrorStatusRequest, + MirrorStatusResponse, +} from '@/grpc_generated/route'; +import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; + +export async function POST(request: Request) { + const body: MirrorStatusRequest = await request.json(); + const flowServiceAddr = GetFlowHttpAddressFromEnv(); + console.log('/mirrors/state: req:', body); + try { + const res: MirrorStatusResponse = await fetch( + `${flowServiceAddr}/v1/mirrors/${body.flowJobName}`, + { cache: 'no-store' } + ).then((res) => { + return res.json(); + }); + + return new Response(JSON.stringify(res)); + } catch (e) { + console.error(e); + } +} diff --git a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts rename to ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/[mirrorId]/cdc.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/cdc.tsx rename to ui/app/mirrors/[mirrorId]/cdc.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/[mirrorId]/cdcDetails.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx rename to ui/app/mirrors/[mirrorId]/cdcDetails.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx rename to ui/app/mirrors/[mirrorId]/cdcGraph.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/configValues.ts b/ui/app/mirrors/[mirrorId]/configValues.ts similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/configValues.ts rename to ui/app/mirrors/[mirrorId]/configValues.ts diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx new file mode 100644 index 0000000000..b86e312255 --- /dev/null +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -0,0 +1,197 @@ +'use client'; + +import { TableMapRow } from '@/app/dto/MirrorsDTO'; +import { CDCFlowConfigUpdate, FlowStatus } from '@/grpc_generated/flow'; +import { + FlowStateChangeRequest, + MirrorStatusResponse, +} from '@/grpc_generated/route'; +import { Button } from '@/lib/Button'; +import { Label } from '@/lib/Label'; +import { RowWithTextField } from '@/lib/Layout'; +import { TextField } from '@/lib/TextField'; +import { ProgressCircle } from '@tremor/react'; +import { useRouter } from 'next/navigation'; +import { useCallback, useEffect, useMemo, useState } from 'react'; +import TableMapping from '../../create/cdc/tablemapping'; +import { reformattedTableMapping } from '../../create/handlers'; +import { blankCDCSetting } from '../../create/helpers/common'; + +type EditMirrorProps = { + params: { mirrorId: string }; +}; + +const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { + const defaultBatchSize = blankCDCSetting.maxBatchSize; + const defaultIdleTimeout = blankCDCSetting.idleTimeoutSeconds; + + const [rows, setRows] = useState([]); + const [mirrorState, setMirrorState] = useState(); + const [config, setConfig] = useState({ + batchSize: defaultBatchSize, + idleTimeout: defaultIdleTimeout, + additionalTables: [], + }); + const { push } = useRouter(); + + const fetchStateAndUpdateDeps = useCallback(async () => { + await fetch('/api/mirrors/state', { + method: 'POST', + body: JSON.stringify({ + flowJobName: mirrorId, + }), + }) + .then((res) => res.json()) + .then((res) => { + setMirrorState(res); + + setConfig({ + batchSize: + (res as MirrorStatusResponse).cdcStatus?.config?.maxBatchSize || + defaultBatchSize, + idleTimeout: + (res as MirrorStatusResponse).cdcStatus?.config + ?.idleTimeoutSeconds || defaultIdleTimeout, + additionalTables: [], + }); + }); + }, [mirrorId, defaultBatchSize, defaultIdleTimeout]); + + useEffect(() => { + fetchStateAndUpdateDeps(); + }, [fetchStateAndUpdateDeps]); + + const omitAdditionalTablesMapping: Map = useMemo(() => { + const omitAdditionalTablesMapping: Map = new Map(); + mirrorState?.cdcStatus?.config?.tableMappings.forEach((value) => { + const sourceSchema = value.sourceTableIdentifier.split('.').at(0)!; + const mapVal: string[] = + omitAdditionalTablesMapping.get(sourceSchema) || []; + // needs to be schema qualified + mapVal.push(value.sourceTableIdentifier); + omitAdditionalTablesMapping.set(sourceSchema, mapVal); + }); + return omitAdditionalTablesMapping; + }, [mirrorState]); + + const additionalTables = useMemo(() => { + return reformattedTableMapping(rows); + }, [rows]); + + if (!mirrorState) { + return ; + } + + const sendFlowStateChangeRequest = async () => { + const req: FlowStateChangeRequest = { + flowJobName: mirrorId, + sourcePeer: mirrorState.cdcStatus?.config?.source, + destinationPeer: mirrorState.cdcStatus?.config?.destination, + requestedFlowState: FlowStatus.STATUS_UNKNOWN, + flowConfigUpdate: { + cdcFlowConfigUpdate: { ...config, additionalTables }, + }, + }; + await fetch(`/api/mirrors/state_change`, { + method: 'POST', + body: JSON.stringify(req), + cache: 'no-store', + }); + push(`/mirrors/${mirrorId}`); + }; + + return ( +
+ + + {'Pull Batch Size'} } + action={ +
+ ) => + setConfig({ + ...config, + batchSize: e.target.valueAsNumber, + }) + } + defaultValue={config.batchSize} + /> +
+ } + /> + + {'Sync Interval (Seconds)'} } + action={ +
+ ) => + setConfig({ + ...config, + idleTimeout: e.target.valueAsNumber, + }) + } + defaultValue={config.idleTimeout} + /> +
+ } + /> + + +
+ + +
+
+ ); +}; + +export default EditMirror; diff --git a/ui/app/mirrors/edit/[mirrorId]/nomirror.tsx b/ui/app/mirrors/[mirrorId]/nomirror.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/nomirror.tsx rename to ui/app/mirrors/[mirrorId]/nomirror.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/page.tsx b/ui/app/mirrors/[mirrorId]/page.tsx similarity index 90% rename from ui/app/mirrors/edit/[mirrorId]/page.tsx rename to ui/app/mirrors/[mirrorId]/page.tsx index 90263cc807..786776d47b 100644 --- a/ui/app/mirrors/edit/[mirrorId]/page.tsx +++ b/ui/app/mirrors/[mirrorId]/page.tsx @@ -1,5 +1,6 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import prisma from '@/app/utils/prisma'; +import EditButton from '@/components/EditButton'; import { ResyncDialog } from '@/components/ResyncDialog'; import { FlowConnectionConfigs } from '@/grpc_generated/flow'; import { MirrorStatusResponse } from '@/grpc_generated/route'; @@ -74,8 +75,10 @@ export default async function ViewMirror({ return
No mirror info found
; } - let syncStatusChild = <>; - let resyncComponent = <>; + let syncStatusChild = null; + let resyncComponent = null; + let editButtonHTML = null; + if (mirrorStatus.cdcStatus) { let rowsSynced = syncs.reduce((acc, sync) => { if (sync.end_time !== null) { @@ -93,6 +96,11 @@ export default async function ViewMirror({ syncStatusChild = ( ); + editButtonHTML = ( +
+ +
+ ); } else { redirect(`/mirrors/status/qrep/${mirrorId}`); } @@ -109,6 +117,7 @@ export default async function ViewMirror({ >
{mirrorId}
+ {editButtonHTML}
{resyncComponent} diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/[mirrorId]/syncStatus.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx rename to ui/app/mirrors/[mirrorId]/syncStatus.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx rename to ui/app/mirrors/[mirrorId]/syncStatusTable.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx b/ui/app/mirrors/[mirrorId]/tablePairs.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx rename to ui/app/mirrors/[mirrorId]/tablePairs.tsx diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index db26cefd51..25c15e58f4 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -113,6 +113,7 @@ export default function CDCConfigForm({ rows={rows} setRows={setRows} peerType={mirrorConfig.destination?.type} + omitAdditionalTablesMapping={new Map()} /> ); diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx index 6962ac4aa7..e79df0a88e 100644 --- a/ui/app/mirrors/create/cdc/schemabox.tsx +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -35,6 +35,7 @@ interface SchemaBoxProps { SetStateAction<{ tableName: string; columns: string[] }[]> >; peerType?: DBType; + omitAdditionalTables: string[] | undefined; } const SchemaBox = ({ sourcePeer, @@ -44,6 +45,7 @@ const SchemaBox = ({ setRows, tableColumns, setTableColumns, + omitAdditionalTables, }: SchemaBoxProps) => { const [tablesLoading, setTablesLoading] = useState(false); const [columnsLoading, setColumnsLoading] = useState(false); @@ -146,6 +148,11 @@ const SchemaBox = ({ if (rowsDoNotHaveSchemaTables(schemaName)) { setTablesLoading(true); fetchTables(sourcePeer, schemaName, peerType).then((newRows) => { + for (const row of newRows) { + if (omitAdditionalTables?.includes(row.source)) { + row.canMirror = false; + } + } setRows((oldRows) => [ ...oldRows.filter((oldRow) => oldRow.schema !== schema), ...newRows, diff --git a/ui/app/mirrors/create/cdc/tablemapping.tsx b/ui/app/mirrors/create/cdc/tablemapping.tsx index 541ceaebbb..22b3336ccb 100644 --- a/ui/app/mirrors/create/cdc/tablemapping.tsx +++ b/ui/app/mirrors/create/cdc/tablemapping.tsx @@ -15,6 +15,8 @@ interface TableMappingProps { rows: TableMapRow[]; setRows: Dispatch>; peerType?: DBType; + // schema -> omitted source table mapping + omitAdditionalTablesMapping: Map; } const TableMapping = ({ @@ -22,6 +24,7 @@ const TableMapping = ({ rows, setRows, peerType, + omitAdditionalTablesMapping, }: TableMappingProps) => { const [allSchemas, setAllSchemas] = useState(); const [schemaQuery, setSchemaQuery] = useState(''); @@ -88,6 +91,7 @@ const TableMapping = ({ tableColumns={tableColumns} setTableColumns={setTableColumns} peerType={peerType} + omitAdditionalTables={omitAdditionalTablesMapping.get(schema)} /> )) ) : ( diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 8087d9b37a..b279e2178b 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -118,7 +118,9 @@ interface TableMapping { partitionKey: string; exclude: string[]; } -const reformattedTableMapping = (tableMapping: TableMapRow[]) => { +export const reformattedTableMapping = ( + tableMapping: TableMapRow[] +): TableMapping[] => { const mapping = tableMapping .filter((row) => row?.selected === true) .map((row) => ({ diff --git a/ui/app/mirrors/status/qrep/[mirrorId]/qrepGraph.tsx b/ui/app/mirrors/status/qrep/[mirrorId]/qrepGraph.tsx index 510c473db7..b25e8db232 100644 --- a/ui/app/mirrors/status/qrep/[mirrorId]/qrepGraph.tsx +++ b/ui/app/mirrors/status/qrep/[mirrorId]/qrepGraph.tsx @@ -4,7 +4,7 @@ import { Label } from '@/lib/Label'; import { BarChart } from '@tremor/react'; import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; -import aggregateCountsByInterval from '../../../edit/[mirrorId]/aggregatedCountsByInterval'; +import aggregateCountsByInterval from '../../../[mirrorId]/aggregatedCountsByInterval'; type QrepStatusRow = { partitionID: string; diff --git a/ui/app/peers/[peerName]/helpers.tsx b/ui/app/peers/[peerName]/helpers.tsx index 503455af7a..ed125ec1a6 100644 --- a/ui/app/peers/[peerName]/helpers.tsx +++ b/ui/app/peers/[peerName]/helpers.tsx @@ -19,7 +19,7 @@ export const SlotNameDisplay = ({ slotName }: { slotName: string }) => { fontSize: 13, fontWeight: 'bold', }} - href={`/mirrors/edit/${flowName}`} + href={`/mirrors/${flowName}`} > {slotName} diff --git a/ui/app/peers/[peerName]/lagGraph.tsx b/ui/app/peers/[peerName]/lagGraph.tsx index b1ab9c6a9f..6763761ad8 100644 --- a/ui/app/peers/[peerName]/lagGraph.tsx +++ b/ui/app/peers/[peerName]/lagGraph.tsx @@ -1,6 +1,6 @@ 'use client'; import { SlotLagPoint } from '@/app/dto/PeersDTO'; -import aggregateCountsByInterval from '@/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval'; +import aggregateCountsByInterval from '@/app/mirrors/[mirrorId]/aggregatedCountsByInterval'; import { formatGraphLabel, timeOptions } from '@/app/utils/graph'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle/ProgressCircle'; diff --git a/ui/components/EditButton.tsx b/ui/components/EditButton.tsx new file mode 100644 index 0000000000..accce73873 --- /dev/null +++ b/ui/components/EditButton.tsx @@ -0,0 +1,28 @@ +'use client'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import { useRouter } from 'next/navigation'; + +const EditButton = ({ toLink }: { toLink: string }) => { + const router = useRouter(); + return ( + + ); +}; + +export default EditButton; diff --git a/ui/components/MirrorLink.tsx b/ui/components/MirrorLink.tsx index 53cc6f14a4..af73a083fa 100644 --- a/ui/components/MirrorLink.tsx +++ b/ui/components/MirrorLink.tsx @@ -11,7 +11,7 @@ const MirrorLink = ({ flowName }: { flowName: string }) => { {isLoading ? ( ) : ( - +