Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed handling of selector for CDC dynamic properties #1405

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,16 +1255,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
sentUpdate = true
return true
})
}, 28*time.Second)
env.RegisterDelayedCallback(func() {
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "send resume signal after update confirmed", func() bool {
if !sentUpdate {
return false
}
e2e.EnvSignalWorkflow(env, model.FlowSignal, model.NoopSignal)
s.t.Log("Sent resume signal")
return true
})
}, 56*time.Second)

go func() {
Expand Down
39 changes: 23 additions & 16 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type CDCFlowWorkflowState struct {
RelationMessageMapping model.RelationMessageMapping
CurrentFlowStatus protos.FlowStatus
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
FlowConfigUpdate *protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
}
Expand All @@ -59,7 +59,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
FlowConfigUpdates: nil,
FlowConfigUpdate: nil,
SyncFlowOptions: &protos.SyncFlowOptions{
BatchSize: cfg.MaxBatchSize,
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
Expand Down Expand Up @@ -144,17 +144,18 @@ const (
maxSyncsPerCdcFlow = 60
)

func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context,
func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context,
cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState,
mirrorNameSearch map[string]interface{},
) error {
for _, flowConfigUpdate := range state.FlowConfigUpdates {
flowConfigUpdate := state.FlowConfigUpdate
if flowConfigUpdate != nil {
if len(flowConfigUpdate.AdditionalTables) == 0 {
continue
return nil
}
if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) {
w.logger.Warn("duplicate source/destination tables found in additionalTables")
continue
return nil
}

alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand Down Expand Up @@ -200,12 +201,16 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont

maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping)
maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping)
maps.Copy(state.SyncFlowOptions.RelationMessageMapping, res.SyncFlowOptions.RelationMessageMapping)

state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)

if w.syncFlowFuture != nil {
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
}

// finished processing, wipe it
state.FlowConfigUpdate = nil
}
// finished processing, wipe it
state.FlowConfigUpdates = nil
return nil
}

Expand All @@ -223,9 +228,8 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener(
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}
// do this irrespective of additional tables being present, for auto unpausing
state.FlowConfigUpdate = cdcConfigUpdate

if w.syncFlowFuture != nil {
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
Expand Down Expand Up @@ -299,17 +303,20 @@ func CDCFlowWorkflow(

for state.ActiveSignal == model.PauseSignal {
// only place we block on receive, so signal processing is immediate
for state.ActiveSignal == model.PauseSignal && ctx.Err() == nil {
for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil {
w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
selector.Select(ctx)
}
if err := ctx.Err(); err != nil {
return state, err
}

err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
if state.FlowConfigUpdate != nil {
err = w.processCDCFlowConfigUpdate(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
}
state.ActiveSignal = model.NoopSignal
}
}

Expand Down
16 changes: 10 additions & 6 deletions ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
}
};

const isNotPaused =
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];

return (
<div>
<Label variant='title3'>Edit {mirrorId}</Label>
Expand Down Expand Up @@ -178,6 +182,11 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
omitAdditionalTablesMapping={omitAdditionalTablesMapping}
/>

{isNotPaused ? (
<Label>Mirror can only be edited while paused.</Label>
) : (
<Label>Editing mirror will automatically unpause it.</Label>
)}
<div style={{ display: 'flex' }}>
<Button
style={{
Expand All @@ -187,12 +196,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
height: '2.5rem',
}}
variant='normalSolid'
disabled={
loading ||
(additionalTables.length > 0 &&
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED])
}
disabled={loading || isNotPaused}
onClick={sendFlowStateChangeRequest}
>
{loading ? (
Expand Down
10 changes: 8 additions & 2 deletions ui/app/mirrors/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import prisma from '@/app/utils/prisma';
import EditButton from '@/components/EditButton';
import { ResyncDialog } from '@/components/ResyncDialog';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow';
import { DBType } from '@/grpc_generated/peers';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Header } from '@/lib/Header';
Expand Down Expand Up @@ -104,9 +104,15 @@ export default async function ViewMirror({
syncStatusChild = (
<SyncStatus rowsSynced={rowsSynced} rows={rows} flowJobName={mirrorId} />
);
const isNotPaused =
mirrorStatus.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];
editButtonHTML = (
<div style={{ display: 'flex', alignItems: 'center' }}>
<EditButton toLink={`/mirrors/${mirrorId}/edit`} />
<EditButton
toLink={`/mirrors/${mirrorId}/edit`}
disabled={isNotPaused}
/>
</div>
);
} else {
Expand Down
14 changes: 11 additions & 3 deletions ui/components/EditButton.tsx
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
'use client';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { useRouter } from 'next/navigation';
import { useState } from 'react';

const EditButton = ({ toLink }: { toLink: string }) => {
const EditButton = ({
toLink,
disabled,
}: {
toLink: string;
disabled: boolean;
}) => {
const [loading, setLoading] = useState(false);
const router = useRouter();

Expand All @@ -14,7 +21,7 @@ const EditButton = ({ toLink }: { toLink: string }) => {
router.push(toLink);
};
return (
<button
<Button
className='IconButton'
onClick={handleEdit}
aria-label='sort up'
Expand All @@ -26,14 +33,15 @@ const EditButton = ({ toLink }: { toLink: string }) => {
border: '1px solid rgba(0,0,0,0.1)',
borderRadius: '0.5rem',
}}
disabled={disabled}
>
<Label>Edit Mirror</Label>
{loading ? (
<ProgressCircle variant='determinate_progress_circle' />
) : (
<Icon name='edit' />
)}
</button>
</Button>
);
};

Expand Down
Loading