Skip to content

Commit

Permalink
Revert "Revert "making UX for edit mirror simpler" (#1403)"
Browse files Browse the repository at this point in the history
This reverts commit 03982d5.
  • Loading branch information
heavycrystal authored Feb 29, 2024
1 parent 03982d5 commit 2cc4d7a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 36 deletions.
10 changes: 0 additions & 10 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,16 +1255,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
sentUpdate = true
return true
})
}, 28*time.Second)
env.RegisterDelayedCallback(func() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool {
if !sentUpdate {
return false
}
e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal)
s.t.Log("Sent resume signal")
return true
})
}, 56*time.Second)

go func() {
Expand Down
33 changes: 18 additions & 15 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type CDCFlowWorkflowState struct {
RelationMessageMapping model.RelationMessageMapping
CurrentFlowStatus protos.FlowStatus
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
FlowConfigUpdate *protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
}
Expand All @@ -59,7 +59,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
FlowConfigUpdates: nil,
FlowConfigUpdate: nil,
SyncFlowOptions: &protos.SyncFlowOptions{
BatchSize: cfg.MaxBatchSize,
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
Expand Down Expand Up @@ -144,17 +144,18 @@ const (
maxSyncsPerCdcFlow = 60
)

func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context,
func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context,
cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState,
mirrorNameSearch map[string]interface{},
) error {
for _, flowConfigUpdate := range state.FlowConfigUpdates {
flowConfigUpdate := state.FlowConfigUpdate
if flowConfigUpdate != nil {
if len(flowConfigUpdate.AdditionalTables) == 0 {
continue
return nil
}
if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) {
w.logger.Warn("duplicate source/destination tables found in additionalTables")
continue
return nil
}

alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand Down Expand Up @@ -200,12 +201,12 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont

maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping)
maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping)
maps.Copy(state.SyncFlowOptions.RelationMessageMapping, res.SyncFlowOptions.RelationMessageMapping)

state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)

// finished processing, wipe it
state.FlowConfigUpdate = nil
}
// finished processing, wipe it
state.FlowConfigUpdates = nil
return nil
}

Expand All @@ -223,9 +224,8 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener(
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}
// do this irrespective of additional tables being present, for auto unpausing
state.FlowConfigUpdate = cdcConfigUpdate

if w.syncFlowFuture != nil {
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
Expand Down Expand Up @@ -307,9 +307,12 @@ func CDCFlowWorkflow(
return state, err
}

err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
if state.FlowConfigUpdate != nil {
err = w.processCDCFlowConfigUpdate(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
}
state.ActiveSignal = model.NoopSignal
}
}

Expand Down
16 changes: 10 additions & 6 deletions ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
}
};

const isNotPaused =
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];

return (
<div>
<Label variant='title3'>Edit {mirrorId}</Label>
Expand Down Expand Up @@ -178,6 +182,11 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
omitAdditionalTablesMapping={omitAdditionalTablesMapping}
/>

{isNotPaused ? (
<Label>Mirror can only be edited while paused.</Label>
) : (
<Label>Editing mirror will automatically unpause it.</Label>
)}
<div style={{ display: 'flex' }}>
<Button
style={{
Expand All @@ -187,12 +196,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
height: '2.5rem',
}}
variant='normalSolid'
disabled={
loading ||
(additionalTables.length > 0 &&
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED])
}
disabled={loading || isNotPaused}
onClick={sendFlowStateChangeRequest}
>
{loading ? (
Expand Down
10 changes: 8 additions & 2 deletions ui/app/mirrors/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import prisma from '@/app/utils/prisma';
import EditButton from '@/components/EditButton';
import { ResyncDialog } from '@/components/ResyncDialog';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow';
import { DBType } from '@/grpc_generated/peers';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Header } from '@/lib/Header';
Expand Down Expand Up @@ -104,9 +104,15 @@ export default async function ViewMirror({
syncStatusChild = (
<SyncStatus rowsSynced={rowsSynced} rows={rows} flowJobName={mirrorId} />
);
const isNotPaused =
mirrorStatus.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];
editButtonHTML = (
<div style={{ display: 'flex', alignItems: 'center' }}>
<EditButton toLink={`/mirrors/${mirrorId}/edit`} />
<EditButton
toLink={`/mirrors/${mirrorId}/edit`}
disabled={isNotPaused}
/>
</div>
);
} else {
Expand Down
14 changes: 11 additions & 3 deletions ui/components/EditButton.tsx
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
'use client';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { useRouter } from 'next/navigation';
import { useState } from 'react';

const EditButton = ({ toLink }: { toLink: string }) => {
const EditButton = ({
toLink,
disabled,
}: {
toLink: string;
disabled: boolean;
}) => {
const [loading, setLoading] = useState(false);
const router = useRouter();

Expand All @@ -14,7 +21,7 @@ const EditButton = ({ toLink }: { toLink: string }) => {
router.push(toLink);
};
return (
<button
<Button
className='IconButton'
onClick={handleEdit}
aria-label='sort up'
Expand All @@ -26,14 +33,15 @@ const EditButton = ({ toLink }: { toLink: string }) => {
border: '1px solid rgba(0,0,0,0.1)',
borderRadius: '0.5rem',
}}
disabled={disabled}
>
<Label>Edit Mirror</Label>
{loading ? (
<ProgressCircle variant='determinate_progress_circle' />
) : (
<Icon name='edit' />
)}
</button>
</Button>
);
};

Expand Down

0 comments on commit 2cc4d7a

Please sign in to comment.