Skip to content

Commit

Permalink
making UI for edit mirror simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 20, 2024
1 parent d75eb8b commit 4ab024c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 21 deletions.
22 changes: 12 additions & 10 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type CDCFlowWorkflowState struct {
SrcTableIdNameMapping map[uint32]string
TableNameSchemaMapping map[string]*protos.TableSchema
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
FlowConfigUpdate *protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
// initially copied from config, all changes are made here though
Expand Down Expand Up @@ -76,7 +76,7 @@ func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWo
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
SrcTableIdNameMapping: nil,
TableNameSchemaMapping: nil,
FlowConfigUpdates: nil,
FlowConfigUpdate: nil,
SyncFlowOptions: nil,
TableMappings: tableMappings,
}
Expand Down Expand Up @@ -143,13 +143,14 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState,
mirrorNameSearch map[string]interface{},
) error {
for _, flowConfigUpdate := range state.FlowConfigUpdates {
flowConfigUpdate := state.FlowConfigUpdate
if flowConfigUpdate != nil {
if len(flowConfigUpdate.AdditionalTables) == 0 {
continue
return nil
}
if shared.AdditionalTablesHasOverlap(state.TableMappings, flowConfigUpdate.AdditionalTables) {
w.logger.Warn("duplicate source/destination tables found in additionalTables")
continue
return nil
}

alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand Down Expand Up @@ -206,7 +207,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
state.TableMappings = append(state.TableMappings, flowConfigUpdate.AdditionalTables...)
state.SyncFlowOptions.TableMappings = state.TableMappings
// finished processing, wipe it
state.FlowConfigUpdates = nil
state.FlowConfigUpdate = nil
}
return nil
}
Expand Down Expand Up @@ -444,9 +445,8 @@ func CDCFlowWorkflowWithConfig(
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}
// do this irrespective of additional tables being present, for auto unpausing
state.FlowConfigUpdate = cdcConfigUpdate

w.logger.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)),
Expand All @@ -470,11 +470,13 @@ func CDCFlowWorkflowWithConfig(
w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
// only place we block on receive, so signal processing is immediate
mainLoopSelector.Select(ctx)
if state.ActiveSignal == shared.NoopSignal {
if state.FlowConfigUpdate != nil {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
}
// explicitly unpause
state.ActiveSignal = shared.NoopSignal
}
}

Expand Down
16 changes: 10 additions & 6 deletions ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
}
};

const isNotPaused =
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];

return (
<div>
<Label variant='title3'>Edit {mirrorId}</Label>
Expand Down Expand Up @@ -178,6 +182,11 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
omitAdditionalTablesMapping={omitAdditionalTablesMapping}
/>

{isNotPaused ? (
<Label>Mirror can only be edited while paused.</Label>
) : (
<Label>Editing mirror will automatically unpause it.</Label>
)}
<div style={{ display: 'flex' }}>
<Button
style={{
Expand All @@ -187,12 +196,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
height: '2.5rem',
}}
variant='normalSolid'
disabled={
loading ||
(additionalTables.length > 0 &&
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED])
}
disabled={loading || isNotPaused}
onClick={sendFlowStateChangeRequest}
>
{loading ? (
Expand Down
10 changes: 8 additions & 2 deletions ui/app/mirrors/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import prisma from '@/app/utils/prisma';
import EditButton from '@/components/EditButton';
import { ResyncDialog } from '@/components/ResyncDialog';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Header } from '@/lib/Header';
import { LayoutMain } from '@/lib/Layout';
Expand Down Expand Up @@ -96,9 +96,15 @@ export default async function ViewMirror({
syncStatusChild = (
<SyncStatus rowsSynced={rowsSynced} rows={rows} flowJobName={mirrorId} />
);
const isNotPaused =
mirrorStatus.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];
editButtonHTML = (
<div style={{ display: 'flex', alignItems: 'center' }}>
<EditButton toLink={`/mirrors/${mirrorId}/edit`} />
<EditButton
toLink={`/mirrors/${mirrorId}/edit`}
disabled={isNotPaused}
/>
</div>
);
} else {
Expand Down
8 changes: 5 additions & 3 deletions ui/components/EditButton.tsx
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
'use client';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { useRouter } from 'next/navigation';
import { useState } from 'react';

const EditButton = ({ toLink }: { toLink: string }) => {
const EditButton = ({ toLink, disabled }: { toLink: string, disabled: boolean }) => {
const [loading, setLoading] = useState(false);
const router = useRouter();

Expand All @@ -14,7 +15,7 @@ const EditButton = ({ toLink }: { toLink: string }) => {
router.push(toLink);
};
return (
<button
<Button
className='IconButton'
onClick={handleEdit}
aria-label='sort up'
Expand All @@ -26,14 +27,15 @@ const EditButton = ({ toLink }: { toLink: string }) => {
border: '1px solid rgba(0,0,0,0.1)',
borderRadius: '0.5rem',
}}
disabled={disabled}
>
<Label>Edit Mirror</Label>
{loading ? (
<ProgressCircle variant='determinate_progress_circle' />
) : (
<Icon name='edit' />
)}
</button>
</Button>
);
};

Expand Down

0 comments on commit 4ab024c

Please sign in to comment.