Skip to content

Commit

Permalink
UI for editing mirror parameters (#1190)
Browse files Browse the repository at this point in the history
`IdleTimeoutSeconds` and `SyncBatchSize` can be changed anytime, tables
can only be added IF mirror is paused, otherwise the button will be
disabled.

Additional changes for this:
1) moved CDC dynamic properties to the main signal selector and blocking
on the main signal selector while paused, so we can receive mirror edits
even while paused.
2) pass `TableNameSchemaMapping` from state always to `NormalizeFlow` in
case new tables were added.
3) added a new property to the tables component to disable selecting of
tables already in the mirror.

There is a bug where tables added to the mirror dynamically will not
reflect in the status screen, will be fixed in a follow up PR.
`TableMappings` Needs to be stored in state as well.
  • Loading branch information
heavycrystal committed Feb 1, 2024
1 parent 4fee2a3 commit cb68fc4
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 110 deletions.
82 changes: 49 additions & 33 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,47 +425,63 @@ func (h *FlowRequestHandler) FlowStateChange(
return nil, err
}

if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING)
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil {
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.NoopSignal,
shared.CDCDynamicPropertiesSignalName,
req.FlowConfigUpdate.GetCdcFlowConfigUpdate(),
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState == protos.FlowStatus_STATUS_RUNNING || currState == protos.FlowStatus_STATUS_PAUSED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to signal workflow: %w", err)
}
_, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{
WorkflowId: workflowID,
FlowJobName: req.FlowJobName,
SourcePeer: req.SourcePeer,
DestinationPeer: req.DestinationPeer,
RemoveFlowEntry: false,
})
} else {
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal workflow: %w", err)

// in case we only want to update properties without changing status
if req.RequestedFlowState != protos.FlowStatus_STATUS_UNKNOWN {
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING)
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState != protos.FlowStatus_STATUS_TERMINATED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING)
if err != nil {
return nil, err
}
_, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{
WorkflowId: workflowID,
FlowJobName: req.FlowJobName,
SourcePeer: req.SourcePeer,
DestinationPeer: req.DestinationPeer,
RemoveFlowEntry: false,
})
} else if req.RequestedFlowState != currState {
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal workflow: %w", err)
}
}

return &protos.FlowStateChangeResponse{
Expand Down
40 changes: 36 additions & 4 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

func (h *FlowRequestHandler) MirrorStatus(
Expand Down Expand Up @@ -81,6 +82,18 @@ func (h *FlowRequestHandler) CDCFlowStatus(
if err != nil {
return nil, err
}
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
}
state, err := h.getCDCWorkflowState(ctx, workflowID)
if err != nil {
return nil, err
}

// patching config to show latest values from state
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize

var initialCopyStatus *protos.SnapshotStatus

Expand Down Expand Up @@ -339,16 +352,16 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
var state protos.FlowStatus
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
return state, nil
}
Expand All @@ -365,3 +378,22 @@ func (h *FlowRequestHandler) updateWorkflowStatus(
}
return nil
}

func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context,
workflowID string,
) (*peerflow.CDCFlowWorkflowState, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.CDCFlowStateQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state peerflow.CDCFlowWorkflowState
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
return &state, nil
}
2 changes: 1 addition & 1 deletion flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
NormalizeSyncDoneSignalName = "normalize-sync-done"

// Queries
CDCFlowStateQuery = "q-cdc-flow-status"
CDCFlowStateQuery = "q-cdc-flow-state"
QRepFlowStateQuery = "q-qrep-flow-state"
FlowStatusQuery = "q-flow-status"

Expand Down
93 changes: 41 additions & 52 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type CDCFlowWorkflowState struct {
TableNameSchemaMapping map[string]*protos.TableSchema
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
// options passed to all NormalizeFlows
NormalizeFlowOptions *protos.NormalizeFlowOptions
}

// returns a new empty PeerFlowState
Expand All @@ -75,6 +79,8 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState {
SrcTableIdNameMapping: nil,
TableNameSchemaMapping: nil,
FlowConfigUpdates: nil,
SyncFlowOptions: nil,
NormalizeFlowOptions: nil,
}
}

Expand Down Expand Up @@ -356,44 +362,17 @@ func CDCFlowWorkflowWithConfig(
}
}

syncFlowOptions := &protos.SyncFlowOptions{
BatchSize: limits.MaxBatchSize,
IdleTimeoutSeconds: 0,
state.SyncFlowOptions = &protos.SyncFlowOptions{
BatchSize: limits.MaxBatchSize,
// this means the env variable assignment path is never hit
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
}
normalizeFlowOptions := &protos.NormalizeFlowOptions{
state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{
TableNameSchemaMapping: state.TableNameSchemaMapping,
}

// add a signal to change CDC properties
cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
cdcPropertiesSelector := workflow.NewSelector(ctx)
cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) {
var cdcConfigUpdate *protos.CDCFlowConfigUpdate
c.Receive(ctx, &cdcConfigUpdate)
// only modify for options since SyncFlow uses it
if cdcConfigUpdate.BatchSize > 0 {
syncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize
}
if cdcConfigUpdate.IdleTimeout > 0 {
syncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

slog.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(syncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(syncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})

cdcPropertiesSelector.AddDefault(func() {
w.logger.Info("no batch size signal received, batch size remains: ",
syncFlowOptions.BatchSize)
})

currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

Expand All @@ -416,7 +395,7 @@ func CDCFlowWorkflowWithConfig(
normCtx,
NormalizeFlowWorkflow,
cfg,
normalizeFlowOptions,
state.NormalizeFlowOptions,
)

var normWaitChan workflow.ReceiveChannel
Expand Down Expand Up @@ -454,6 +433,27 @@ func CDCFlowWorkflowWithConfig(
c.ReceiveAsync(&signalVal)
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
})
// add a signal to change CDC properties
cdcPropertiesSignalChan := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
mainLoopSelector.AddReceive(cdcPropertiesSignalChan, func(c workflow.ReceiveChannel, more bool) {
var cdcConfigUpdate *protos.CDCFlowConfigUpdate
c.Receive(ctx, &cdcConfigUpdate)
// only modify for options since SyncFlow uses it
if cdcConfigUpdate.BatchSize > 0 {
state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize
}
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

slog.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})

for {
for !canceled && mainLoopSelector.HasPending() {
Expand All @@ -466,31 +466,22 @@ func CDCFlowWorkflowWithConfig(
if state.ActiveSignal == shared.PauseSignal {
startTime := time.Now()
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
var signalVal shared.CDCFlowSignal

for state.ActiveSignal == shared.PauseSignal {
w.logger.Info("mirror has been paused for ", time.Since(startTime))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
// only process config updates when going from STATUS_PAUSED to STATUS_RUNNING
if state.ActiveSignal == shared.NoopSignal {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch)
if err != nil {
return state, err
}
mainLoopSelector.Select(ctx)
if state.ActiveSignal == shared.NoopSignal {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch)
if err != nil {
return state, err
}
} else if err := ctx.Err(); err != nil {
return nil, err
}
}

w.logger.Info("mirror has been resumed after ", time.Since(startTime))
}

cdcPropertiesSelector.Select(ctx)
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING

// check if total sync flows have been completed
Expand Down Expand Up @@ -526,12 +517,12 @@ func CDCFlowWorkflowWithConfig(
WaitForCancellation: true,
}
syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
childSyncFlowFuture := workflow.ExecuteChildWorkflow(
syncCtx,
SyncFlowWorkflow,
cfg,
syncFlowOptions,
state.SyncFlowOptions,
)

var syncDone bool
Expand All @@ -554,7 +545,6 @@ func CDCFlowWorkflowWithConfig(
if childSyncFlowRes != nil {
tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas)

var normalizeTableNameSchemaMapping map[string]*protos.TableSchema
// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltasCount != 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
Expand Down Expand Up @@ -583,14 +573,13 @@ func CDCFlowWorkflowWithConfig(
dstTable := modifiedDstTables[i]
state.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable]
}
normalizeTableNameSchemaMapping = state.TableNameSchemaMapping
}
}

signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{
Done: false,
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: normalizeTableNameSchemaMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
})
normalizeSignalError = signalFuture.Get(ctx, nil)
} else {
Expand Down
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ message QRepMirrorStatus {
// or if we are in the continuous streaming mode.
}

// to be removed eventually
message CDCSyncStatus {
int64 start_lsn = 1;
int64 end_lsn = 2;
Expand Down
23 changes: 23 additions & 0 deletions ui/app/api/mirrors/state/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import {
MirrorStatusRequest,
MirrorStatusResponse,
} from '@/grpc_generated/route';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body: MirrorStatusRequest = await request.json();
const flowServiceAddr = GetFlowHttpAddressFromEnv();
console.log('/mirrors/state: req:', body);
try {
const res: MirrorStatusResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/${body.flowJobName}`,
{ cache: 'no-store' }
).then((res) => {
return res.json();
});

return new Response(JSON.stringify(res));
} catch (e) {
console.error(e);
}
}
Loading

0 comments on commit cb68fc4

Please sign in to comment.