diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 1d96e5a84f..cbc9ed2fb3 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -425,47 +425,63 @@ func (h *FlowRequestHandler) FlowStateChange( return nil, err } - if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED && - currState == protos.FlowStatus_STATUS_RUNNING { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING) - if err != nil { - return nil, err - } - err = h.temporalClient.SignalWorkflow( - ctx, - workflowID, - "", - shared.FlowSignalName, - shared.PauseSignal, - ) - } else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING && - currState == protos.FlowStatus_STATUS_PAUSED { + if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil { err = h.temporalClient.SignalWorkflow( ctx, workflowID, "", - shared.FlowSignalName, - shared.NoopSignal, + shared.CDCDynamicPropertiesSignalName, + req.FlowConfigUpdate.GetCdcFlowConfigUpdate(), ) - } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && - (currState == protos.FlowStatus_STATUS_RUNNING || currState == protos.FlowStatus_STATUS_PAUSED) { - err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to signal workflow: %w", err) } - _, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{ - WorkflowId: workflowID, - FlowJobName: req.FlowJobName, - SourcePeer: req.SourcePeer, - DestinationPeer: req.DestinationPeer, - RemoveFlowEntry: false, - }) - } else { - return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v", - req.RequestedFlowState, currState) } - if err != nil { - return nil, fmt.Errorf("unable to signal workflow: %w", err) + + // in case we only want to update properties without changing status + if req.RequestedFlowState != protos.FlowStatus_STATUS_UNKNOWN { + if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED && + currState == protos.FlowStatus_STATUS_RUNNING { + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING) + if err != nil { + return nil, err + } + err = h.temporalClient.SignalWorkflow( + ctx, + workflowID, + "", + shared.FlowSignalName, + shared.PauseSignal, + ) + } else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING && + currState == protos.FlowStatus_STATUS_PAUSED { + err = h.temporalClient.SignalWorkflow( + ctx, + workflowID, + "", + shared.FlowSignalName, + shared.NoopSignal, + ) + } else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED && + (currState != protos.FlowStatus_STATUS_TERMINATED) { + err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING) + if err != nil { + return nil, err + } + _, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{ + WorkflowId: workflowID, + FlowJobName: req.FlowJobName, + SourcePeer: req.SourcePeer, + DestinationPeer: req.DestinationPeer, + RemoveFlowEntry: false, + }) + } else if req.RequestedFlowState != currState { + return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v", + req.RequestedFlowState, currState) + } + if err != nil { + return nil, fmt.Errorf("unable to signal workflow: %w", err) + } } return &protos.FlowStateChangeResponse{ diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index ff14f04f35..ceca719c69 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" ) func (h *FlowRequestHandler) MirrorStatus( @@ -81,6 +82,18 @@ func (h *FlowRequestHandler) CDCFlowStatus( if err != nil { return nil, err } + workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) + if err != nil { + return nil, err + } + state, err := h.getCDCWorkflowState(ctx, workflowID) + if err != nil { + return nil, err + } + + // patching config to show latest values from state + config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds + config.MaxBatchSize = state.SyncFlowOptions.BatchSize var initialCopyStatus *protos.SnapshotStatus @@ -339,16 +352,16 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string) func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (protos.FlowStatus, error) { res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery) if err != nil { - slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) + slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) return protos.FlowStatus_STATUS_UNKNOWN, - fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) } var state protos.FlowStatus err = res.Get(&state) if err != nil { - slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) + slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) return protos.FlowStatus_STATUS_UNKNOWN, - fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) } return state, nil } @@ -365,3 +378,22 @@ func (h *FlowRequestHandler) updateWorkflowStatus( } return nil } + +func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context, + workflowID string, +) (*peerflow.CDCFlowWorkflowState, error) { + res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.CDCFlowStateQuery) + if err != nil { + slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) + return nil, + fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + } + var state peerflow.CDCFlowWorkflowState + err = res.Get(&state) + if err != nil { + slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error())) + return nil, + fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err) + } + return &state, nil +} diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 68edd3c6d3..69b4dfda20 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -18,7 +18,7 @@ const ( NormalizeSyncDoneSignalName = "normalize-sync-done" // Queries - CDCFlowStateQuery = "q-cdc-flow-status" + CDCFlowStateQuery = "q-cdc-flow-state" QRepFlowStateQuery = "q-qrep-flow-state" FlowStatusQuery = "q-flow-status" diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index d0f825dfaf..60a085f33b 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -53,6 +53,10 @@ type CDCFlowWorkflowState struct { TableNameSchemaMapping map[string]*protos.TableSchema // flow config update request, set to nil after processed FlowConfigUpdates []*protos.CDCFlowConfigUpdate + // options passed to all SyncFlows + SyncFlowOptions *protos.SyncFlowOptions + // options passed to all NormalizeFlows + NormalizeFlowOptions *protos.NormalizeFlowOptions } // returns a new empty PeerFlowState @@ -75,6 +79,8 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { SrcTableIdNameMapping: nil, TableNameSchemaMapping: nil, FlowConfigUpdates: nil, + SyncFlowOptions: nil, + NormalizeFlowOptions: nil, } } @@ -356,44 +362,17 @@ func CDCFlowWorkflowWithConfig( } } - syncFlowOptions := &protos.SyncFlowOptions{ - BatchSize: limits.MaxBatchSize, - IdleTimeoutSeconds: 0, + state.SyncFlowOptions = &protos.SyncFlowOptions{ + BatchSize: limits.MaxBatchSize, + // this means the env variable assignment path is never hit + IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, SrcTableIdNameMapping: state.SrcTableIdNameMapping, TableNameSchemaMapping: state.TableNameSchemaMapping, } - normalizeFlowOptions := &protos.NormalizeFlowOptions{ + state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ TableNameSchemaMapping: state.TableNameSchemaMapping, } - // add a signal to change CDC properties - cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName) - cdcPropertiesSelector := workflow.NewSelector(ctx) - cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) { - var cdcConfigUpdate *protos.CDCFlowConfigUpdate - c.Receive(ctx, &cdcConfigUpdate) - // only modify for options since SyncFlow uses it - if cdcConfigUpdate.BatchSize > 0 { - syncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize - } - if cdcConfigUpdate.IdleTimeout > 0 { - syncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout - } - if len(cdcConfigUpdate.AdditionalTables) > 0 { - state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) - } - - slog.Info("CDC Signal received. Parameters on signal reception:", - slog.Int("BatchSize", int(syncFlowOptions.BatchSize)), - slog.Int("IdleTimeout", int(syncFlowOptions.IdleTimeoutSeconds)), - slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) - }) - - cdcPropertiesSelector.AddDefault(func() { - w.logger.Info("no batch size signal received, batch size remains: ", - syncFlowOptions.BatchSize) - }) - currentSyncFlowNum := 0 totalRecordsSynced := int64(0) @@ -416,7 +395,7 @@ func CDCFlowWorkflowWithConfig( normCtx, NormalizeFlowWorkflow, cfg, - normalizeFlowOptions, + state.NormalizeFlowOptions, ) var normWaitChan workflow.ReceiveChannel @@ -454,6 +433,27 @@ func CDCFlowWorkflowWithConfig( c.ReceiveAsync(&signalVal) state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) }) + // add a signal to change CDC properties + cdcPropertiesSignalChan := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName) + mainLoopSelector.AddReceive(cdcPropertiesSignalChan, func(c workflow.ReceiveChannel, more bool) { + var cdcConfigUpdate *protos.CDCFlowConfigUpdate + c.Receive(ctx, &cdcConfigUpdate) + // only modify for options since SyncFlow uses it + if cdcConfigUpdate.BatchSize > 0 { + state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize + } + if cdcConfigUpdate.IdleTimeout > 0 { + state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout + } + if len(cdcConfigUpdate.AdditionalTables) > 0 { + state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate) + } + + slog.Info("CDC Signal received. Parameters on signal reception:", + slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)), + slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)), + slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables)) + }) for { for !canceled && mainLoopSelector.HasPending() { @@ -466,31 +466,22 @@ func CDCFlowWorkflowWithConfig( if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED - signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) - var signalVal shared.CDCFlowSignal for state.ActiveSignal == shared.PauseSignal { w.logger.Info("mirror has been paused for ", time.Since(startTime)) // only place we block on receive, so signal processing is immediate - ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) - if ok { - state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) - // only process config updates when going from STATUS_PAUSED to STATUS_RUNNING - if state.ActiveSignal == shared.NoopSignal { - err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch) - if err != nil { - return state, err - } + mainLoopSelector.Select(ctx) + if state.ActiveSignal == shared.NoopSignal { + err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch) + if err != nil { + return state, err } - } else if err := ctx.Err(); err != nil { - return nil, err } } w.logger.Info("mirror has been resumed after ", time.Since(startTime)) } - cdcPropertiesSelector.Select(ctx) state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING // check if total sync flows have been completed @@ -526,12 +517,12 @@ func CDCFlowWorkflowWithConfig( WaitForCancellation: true, } syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts) - syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping + state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping childSyncFlowFuture := workflow.ExecuteChildWorkflow( syncCtx, SyncFlowWorkflow, cfg, - syncFlowOptions, + state.SyncFlowOptions, ) var syncDone bool @@ -554,7 +545,6 @@ func CDCFlowWorkflowWithConfig( if childSyncFlowRes != nil { tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas) - var normalizeTableNameSchemaMapping map[string]*protos.TableSchema // slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes. if tableSchemaDeltasCount != 0 { modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount) @@ -583,14 +573,13 @@ func CDCFlowWorkflowWithConfig( dstTable := modifiedDstTables[i] state.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable] } - normalizeTableNameSchemaMapping = state.TableNameSchemaMapping } } signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{ Done: false, SyncBatchID: childSyncFlowRes.CurrentSyncBatchID, - TableNameSchemaMapping: normalizeTableNameSchemaMapping, + TableNameSchemaMapping: state.TableNameSchemaMapping, }) normalizeSignalError = signalFuture.Get(ctx, nil) } else { diff --git a/protos/route.proto b/protos/route.proto index 48db51e019..f0d7dd0511 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -96,6 +96,7 @@ message QRepMirrorStatus { // or if we are in the continuous streaming mode. } +// to be removed eventually message CDCSyncStatus { int64 start_lsn = 1; int64 end_lsn = 2; diff --git a/ui/app/api/mirrors/state/route.ts b/ui/app/api/mirrors/state/route.ts new file mode 100644 index 0000000000..cbd0a9969d --- /dev/null +++ b/ui/app/api/mirrors/state/route.ts @@ -0,0 +1,23 @@ +import { + MirrorStatusRequest, + MirrorStatusResponse, +} from '@/grpc_generated/route'; +import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; + +export async function POST(request: Request) { + const body: MirrorStatusRequest = await request.json(); + const flowServiceAddr = GetFlowHttpAddressFromEnv(); + console.log('/mirrors/state: req:', body); + try { + const res: MirrorStatusResponse = await fetch( + `${flowServiceAddr}/v1/mirrors/${body.flowJobName}`, + { cache: 'no-store' } + ).then((res) => { + return res.json(); + }); + + return new Response(JSON.stringify(res)); + } catch (e) { + console.error(e); + } +} diff --git a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts rename to ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/[mirrorId]/cdc.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/cdc.tsx rename to ui/app/mirrors/[mirrorId]/cdc.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/[mirrorId]/cdcDetails.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx rename to ui/app/mirrors/[mirrorId]/cdcDetails.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx rename to ui/app/mirrors/[mirrorId]/cdcGraph.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/configValues.ts b/ui/app/mirrors/[mirrorId]/configValues.ts similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/configValues.ts rename to ui/app/mirrors/[mirrorId]/configValues.ts diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx new file mode 100644 index 0000000000..b86e312255 --- /dev/null +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -0,0 +1,197 @@ +'use client'; + +import { TableMapRow } from '@/app/dto/MirrorsDTO'; +import { CDCFlowConfigUpdate, FlowStatus } from '@/grpc_generated/flow'; +import { + FlowStateChangeRequest, + MirrorStatusResponse, +} from '@/grpc_generated/route'; +import { Button } from '@/lib/Button'; +import { Label } from '@/lib/Label'; +import { RowWithTextField } from '@/lib/Layout'; +import { TextField } from '@/lib/TextField'; +import { ProgressCircle } from '@tremor/react'; +import { useRouter } from 'next/navigation'; +import { useCallback, useEffect, useMemo, useState } from 'react'; +import TableMapping from '../../create/cdc/tablemapping'; +import { reformattedTableMapping } from '../../create/handlers'; +import { blankCDCSetting } from '../../create/helpers/common'; + +type EditMirrorProps = { + params: { mirrorId: string }; +}; + +const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { + const defaultBatchSize = blankCDCSetting.maxBatchSize; + const defaultIdleTimeout = blankCDCSetting.idleTimeoutSeconds; + + const [rows, setRows] = useState([]); + const [mirrorState, setMirrorState] = useState(); + const [config, setConfig] = useState({ + batchSize: defaultBatchSize, + idleTimeout: defaultIdleTimeout, + additionalTables: [], + }); + const { push } = useRouter(); + + const fetchStateAndUpdateDeps = useCallback(async () => { + await fetch('/api/mirrors/state', { + method: 'POST', + body: JSON.stringify({ + flowJobName: mirrorId, + }), + }) + .then((res) => res.json()) + .then((res) => { + setMirrorState(res); + + setConfig({ + batchSize: + (res as MirrorStatusResponse).cdcStatus?.config?.maxBatchSize || + defaultBatchSize, + idleTimeout: + (res as MirrorStatusResponse).cdcStatus?.config + ?.idleTimeoutSeconds || defaultIdleTimeout, + additionalTables: [], + }); + }); + }, [mirrorId, defaultBatchSize, defaultIdleTimeout]); + + useEffect(() => { + fetchStateAndUpdateDeps(); + }, [fetchStateAndUpdateDeps]); + + const omitAdditionalTablesMapping: Map = useMemo(() => { + const omitAdditionalTablesMapping: Map = new Map(); + mirrorState?.cdcStatus?.config?.tableMappings.forEach((value) => { + const sourceSchema = value.sourceTableIdentifier.split('.').at(0)!; + const mapVal: string[] = + omitAdditionalTablesMapping.get(sourceSchema) || []; + // needs to be schema qualified + mapVal.push(value.sourceTableIdentifier); + omitAdditionalTablesMapping.set(sourceSchema, mapVal); + }); + return omitAdditionalTablesMapping; + }, [mirrorState]); + + const additionalTables = useMemo(() => { + return reformattedTableMapping(rows); + }, [rows]); + + if (!mirrorState) { + return ; + } + + const sendFlowStateChangeRequest = async () => { + const req: FlowStateChangeRequest = { + flowJobName: mirrorId, + sourcePeer: mirrorState.cdcStatus?.config?.source, + destinationPeer: mirrorState.cdcStatus?.config?.destination, + requestedFlowState: FlowStatus.STATUS_UNKNOWN, + flowConfigUpdate: { + cdcFlowConfigUpdate: { ...config, additionalTables }, + }, + }; + await fetch(`/api/mirrors/state_change`, { + method: 'POST', + body: JSON.stringify(req), + cache: 'no-store', + }); + push(`/mirrors/${mirrorId}`); + }; + + return ( +
+ + + {'Pull Batch Size'} } + action={ +
+ ) => + setConfig({ + ...config, + batchSize: e.target.valueAsNumber, + }) + } + defaultValue={config.batchSize} + /> +
+ } + /> + + {'Sync Interval (Seconds)'} } + action={ +
+ ) => + setConfig({ + ...config, + idleTimeout: e.target.valueAsNumber, + }) + } + defaultValue={config.idleTimeout} + /> +
+ } + /> + + +
+ + +
+
+ ); +}; + +export default EditMirror; diff --git a/ui/app/mirrors/edit/[mirrorId]/nomirror.tsx b/ui/app/mirrors/[mirrorId]/nomirror.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/nomirror.tsx rename to ui/app/mirrors/[mirrorId]/nomirror.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/page.tsx b/ui/app/mirrors/[mirrorId]/page.tsx similarity index 90% rename from ui/app/mirrors/edit/[mirrorId]/page.tsx rename to ui/app/mirrors/[mirrorId]/page.tsx index 90263cc807..786776d47b 100644 --- a/ui/app/mirrors/edit/[mirrorId]/page.tsx +++ b/ui/app/mirrors/[mirrorId]/page.tsx @@ -1,5 +1,6 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import prisma from '@/app/utils/prisma'; +import EditButton from '@/components/EditButton'; import { ResyncDialog } from '@/components/ResyncDialog'; import { FlowConnectionConfigs } from '@/grpc_generated/flow'; import { MirrorStatusResponse } from '@/grpc_generated/route'; @@ -74,8 +75,10 @@ export default async function ViewMirror({ return
No mirror info found
; } - let syncStatusChild = <>; - let resyncComponent = <>; + let syncStatusChild = null; + let resyncComponent = null; + let editButtonHTML = null; + if (mirrorStatus.cdcStatus) { let rowsSynced = syncs.reduce((acc, sync) => { if (sync.end_time !== null) { @@ -93,6 +96,11 @@ export default async function ViewMirror({ syncStatusChild = ( ); + editButtonHTML = ( +
+ +
+ ); } else { redirect(`/mirrors/status/qrep/${mirrorId}`); } @@ -109,6 +117,7 @@ export default async function ViewMirror({ >
{mirrorId}
+ {editButtonHTML}
{resyncComponent} diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/[mirrorId]/syncStatus.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx rename to ui/app/mirrors/[mirrorId]/syncStatus.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx rename to ui/app/mirrors/[mirrorId]/syncStatusTable.tsx diff --git a/ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx b/ui/app/mirrors/[mirrorId]/tablePairs.tsx similarity index 100% rename from ui/app/mirrors/edit/[mirrorId]/tablePairs.tsx rename to ui/app/mirrors/[mirrorId]/tablePairs.tsx diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index db26cefd51..25c15e58f4 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -113,6 +113,7 @@ export default function CDCConfigForm({ rows={rows} setRows={setRows} peerType={mirrorConfig.destination?.type} + omitAdditionalTablesMapping={new Map()} /> ); diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx index 6962ac4aa7..e79df0a88e 100644 --- a/ui/app/mirrors/create/cdc/schemabox.tsx +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -35,6 +35,7 @@ interface SchemaBoxProps { SetStateAction<{ tableName: string; columns: string[] }[]> >; peerType?: DBType; + omitAdditionalTables: string[] | undefined; } const SchemaBox = ({ sourcePeer, @@ -44,6 +45,7 @@ const SchemaBox = ({ setRows, tableColumns, setTableColumns, + omitAdditionalTables, }: SchemaBoxProps) => { const [tablesLoading, setTablesLoading] = useState(false); const [columnsLoading, setColumnsLoading] = useState(false); @@ -146,6 +148,11 @@ const SchemaBox = ({ if (rowsDoNotHaveSchemaTables(schemaName)) { setTablesLoading(true); fetchTables(sourcePeer, schemaName, peerType).then((newRows) => { + for (const row of newRows) { + if (omitAdditionalTables?.includes(row.source)) { + row.canMirror = false; + } + } setRows((oldRows) => [ ...oldRows.filter((oldRow) => oldRow.schema !== schema), ...newRows, diff --git a/ui/app/mirrors/create/cdc/tablemapping.tsx b/ui/app/mirrors/create/cdc/tablemapping.tsx index 541ceaebbb..22b3336ccb 100644 --- a/ui/app/mirrors/create/cdc/tablemapping.tsx +++ b/ui/app/mirrors/create/cdc/tablemapping.tsx @@ -15,6 +15,8 @@ interface TableMappingProps { rows: TableMapRow[]; setRows: Dispatch>; peerType?: DBType; + // schema -> omitted source table mapping + omitAdditionalTablesMapping: Map; } const TableMapping = ({ @@ -22,6 +24,7 @@ const TableMapping = ({ rows, setRows, peerType, + omitAdditionalTablesMapping, }: TableMappingProps) => { const [allSchemas, setAllSchemas] = useState(); const [schemaQuery, setSchemaQuery] = useState(''); @@ -88,6 +91,7 @@ const TableMapping = ({ tableColumns={tableColumns} setTableColumns={setTableColumns} peerType={peerType} + omitAdditionalTables={omitAdditionalTablesMapping.get(schema)} /> )) ) : ( diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 8087d9b37a..b279e2178b 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -118,7 +118,9 @@ interface TableMapping { partitionKey: string; exclude: string[]; } -const reformattedTableMapping = (tableMapping: TableMapRow[]) => { +export const reformattedTableMapping = ( + tableMapping: TableMapRow[] +): TableMapping[] => { const mapping = tableMapping .filter((row) => row?.selected === true) .map((row) => ({ diff --git a/ui/app/mirrors/status/qrep/[mirrorId]/qrepGraph.tsx b/ui/app/mirrors/status/qrep/[mirrorId]/qrepGraph.tsx index 510c473db7..b25e8db232 100644 --- a/ui/app/mirrors/status/qrep/[mirrorId]/qrepGraph.tsx +++ b/ui/app/mirrors/status/qrep/[mirrorId]/qrepGraph.tsx @@ -4,7 +4,7 @@ import { Label } from '@/lib/Label'; import { BarChart } from '@tremor/react'; import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; -import aggregateCountsByInterval from '../../../edit/[mirrorId]/aggregatedCountsByInterval'; +import aggregateCountsByInterval from '../../../[mirrorId]/aggregatedCountsByInterval'; type QrepStatusRow = { partitionID: string; diff --git a/ui/app/peers/[peerName]/helpers.tsx b/ui/app/peers/[peerName]/helpers.tsx index 503455af7a..ed125ec1a6 100644 --- a/ui/app/peers/[peerName]/helpers.tsx +++ b/ui/app/peers/[peerName]/helpers.tsx @@ -19,7 +19,7 @@ export const SlotNameDisplay = ({ slotName }: { slotName: string }) => { fontSize: 13, fontWeight: 'bold', }} - href={`/mirrors/edit/${flowName}`} + href={`/mirrors/${flowName}`} > {slotName} diff --git a/ui/app/peers/[peerName]/lagGraph.tsx b/ui/app/peers/[peerName]/lagGraph.tsx index b1ab9c6a9f..6763761ad8 100644 --- a/ui/app/peers/[peerName]/lagGraph.tsx +++ b/ui/app/peers/[peerName]/lagGraph.tsx @@ -1,6 +1,6 @@ 'use client'; import { SlotLagPoint } from '@/app/dto/PeersDTO'; -import aggregateCountsByInterval from '@/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval'; +import aggregateCountsByInterval from '@/app/mirrors/[mirrorId]/aggregatedCountsByInterval'; import { formatGraphLabel, timeOptions } from '@/app/utils/graph'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle/ProgressCircle'; diff --git a/ui/components/EditButton.tsx b/ui/components/EditButton.tsx new file mode 100644 index 0000000000..accce73873 --- /dev/null +++ b/ui/components/EditButton.tsx @@ -0,0 +1,28 @@ +'use client'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import { useRouter } from 'next/navigation'; + +const EditButton = ({ toLink }: { toLink: string }) => { + const router = useRouter(); + return ( + + ); +}; + +export default EditButton; diff --git a/ui/components/MirrorLink.tsx b/ui/components/MirrorLink.tsx index 53cc6f14a4..af73a083fa 100644 --- a/ui/components/MirrorLink.tsx +++ b/ui/components/MirrorLink.tsx @@ -11,7 +11,7 @@ const MirrorLink = ({ flowName }: { flowName: string }) => { {isLoading ? ( ) : ( - +