Skip to content

Commit

Permalink
Revert "making UX for edit mirror simpler (#1338)"
Browse files Browse the repository at this point in the history
This reverts commit b6eb5d1.
  • Loading branch information
heavycrystal authored Feb 29, 2024
1 parent b6eb5d1 commit 0efc334
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 47 deletions.
10 changes: 10 additions & 0 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
sentUpdate = true
return true
})
}, 28*time.Second)
env.RegisterDelayedCallback(func() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool {
if !sentUpdate {
return false
}
e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal)
s.t.Log("Sent resume signal")
return true
})
}, 56*time.Second)

go func() {
Expand Down
33 changes: 15 additions & 18 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type CDCFlowWorkflowState struct {
RelationMessageMapping model.RelationMessageMapping
CurrentFlowStatus protos.FlowStatus
// flow config update request, set to nil after processed
FlowConfigUpdate *protos.CDCFlowConfigUpdate
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
}
Expand All @@ -59,7 +59,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
FlowConfigUpdate: nil,
FlowConfigUpdates: nil,
SyncFlowOptions: &protos.SyncFlowOptions{
BatchSize: cfg.MaxBatchSize,
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
Expand Down Expand Up @@ -144,18 +144,17 @@ const (
maxSyncsPerCdcFlow = 60
)

func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context,
func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context,
cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState,
mirrorNameSearch map[string]interface{},
) error {
flowConfigUpdate := state.FlowConfigUpdate
if flowConfigUpdate != nil {
for _, flowConfigUpdate := range state.FlowConfigUpdates {
if len(flowConfigUpdate.AdditionalTables) == 0 {
return nil
continue
}
if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) {
w.logger.Warn("duplicate source/destination tables found in additionalTables")
return nil
continue
}

alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand Down Expand Up @@ -201,12 +200,12 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte

maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping)
maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping)
maps.Copy(state.SyncFlowOptions.RelationMessageMapping, res.SyncFlowOptions.RelationMessageMapping)

state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)

// finished processing, wipe it
state.FlowConfigUpdate = nil
}
// finished processing, wipe it
state.FlowConfigUpdates = nil
return nil
}

Expand All @@ -224,8 +223,9 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener(
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
// do this irrespective of additional tables being present, for auto unpausing
state.FlowConfigUpdate = cdcConfigUpdate
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

if w.syncFlowFuture != nil {
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
Expand Down Expand Up @@ -307,12 +307,9 @@ func CDCFlowWorkflow(
return state, err
}

if state.FlowConfigUpdate != nil {
err = w.processCDCFlowConfigUpdate(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
}
state.ActiveSignal = model.NoopSignal
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
}
}

Expand Down
16 changes: 6 additions & 10 deletions ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
}
};

const isNotPaused =
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];

return (
<div>
<Label variant='title3'>Edit {mirrorId}</Label>
Expand Down Expand Up @@ -182,11 +178,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
omitAdditionalTablesMapping={omitAdditionalTablesMapping}
/>

{isNotPaused ? (
<Label>Mirror can only be edited while paused.</Label>
) : (
<Label>Editing mirror will automatically unpause it.</Label>
)}
<div style={{ display: 'flex' }}>
<Button
style={{
Expand All @@ -196,7 +187,12 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
height: '2.5rem',
}}
variant='normalSolid'
disabled={loading || isNotPaused}
disabled={
loading ||
(additionalTables.length > 0 &&
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED])
}
onClick={sendFlowStateChangeRequest}
>
{loading ? (
Expand Down
10 changes: 2 additions & 8 deletions ui/app/mirrors/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import prisma from '@/app/utils/prisma';
import EditButton from '@/components/EditButton';
import { ResyncDialog } from '@/components/ResyncDialog';
import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { DBType } from '@/grpc_generated/peers';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Header } from '@/lib/Header';
Expand Down Expand Up @@ -104,15 +104,9 @@ export default async function ViewMirror({
syncStatusChild = (
<SyncStatus rowsSynced={rowsSynced} rows={rows} flowJobName={mirrorId} />
);
const isNotPaused =
mirrorStatus.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];
editButtonHTML = (
<div style={{ display: 'flex', alignItems: 'center' }}>
<EditButton
toLink={`/mirrors/${mirrorId}/edit`}
disabled={isNotPaused}
/>
<EditButton toLink={`/mirrors/${mirrorId}/edit`} />
</div>
);
} else {
Expand Down
14 changes: 3 additions & 11 deletions ui/components/EditButton.tsx
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
'use client';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { useRouter } from 'next/navigation';
import { useState } from 'react';

const EditButton = ({
toLink,
disabled,
}: {
toLink: string;
disabled: boolean;
}) => {
const EditButton = ({ toLink }: { toLink: string }) => {
const [loading, setLoading] = useState(false);
const router = useRouter();

Expand All @@ -21,7 +14,7 @@ const EditButton = ({
router.push(toLink);
};
return (
<Button
<button
className='IconButton'
onClick={handleEdit}
aria-label='sort up'
Expand All @@ -33,15 +26,14 @@ const EditButton = ({
border: '1px solid rgba(0,0,0,0.1)',
borderRadius: '0.5rem',
}}
disabled={disabled}
>
<Label>Edit Mirror</Label>
{loading ? (
<ProgressCircle variant='determinate_progress_circle' />
) : (
<Icon name='edit' />
)}
</Button>
</button>
);
};

Expand Down

0 comments on commit 0efc334

Please sign in to comment.