Skip to content

Commit

Permalink
UI: Edit Page and Refactor Edit Mirror Route (#1156)
Browse files Browse the repository at this point in the history
You get above by clicking on this button:
![Screenshot 2024-01-25 at 9 01
52 PM](https://github.com/PeerDB-io/peerdb/assets/65964360/865cdd5b-f2b4-41a1-ac10-b7c5efe6fbe8)



IdleTimeoutSeconds and SyncBatchSize can be changed anytime, tables can only be added IF mirror is paused, otherwise the button will be disabled.

Additional changes for this:

    moved CDC dynamic properties to the main signal selector and blocking on the main signal selector while paused, so we can receive mirror edits even while paused.
    pass TableNameSchemaMapping from state always to NormalizeFlow in case new tables were added.
    added a new property to the tables component to disable selecting of tables already in the mirror.

There is a bug where tables added to the mirror dynamically will not reflect in the status screen, will be fixed in a follow up PR. TableMappings Needs to be stored in state as well.


---------

Co-authored-by: Kevin Biju <[email protected]>
Co-authored-by: Kevin Biju <[email protected]>
  • Loading branch information
3 people authored Feb 1, 2024
1 parent 2a0842e commit 6969882
Show file tree
Hide file tree
Showing 26 changed files with 406 additions and 97 deletions.
82 changes: 49 additions & 33 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,47 +425,63 @@ func (h *FlowRequestHandler) FlowStateChange(
return nil, err
}

if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING)
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil {
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.NoopSignal,
shared.CDCDynamicPropertiesSignalName,
req.FlowConfigUpdate.GetCdcFlowConfigUpdate(),
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState == protos.FlowStatus_STATUS_RUNNING || currState == protos.FlowStatus_STATUS_PAUSED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to signal workflow: %w", err)
}
_, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{
WorkflowId: workflowID,
FlowJobName: req.FlowJobName,
SourcePeer: req.SourcePeer,
DestinationPeer: req.DestinationPeer,
RemoveFlowEntry: false,
})
} else {
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal workflow: %w", err)

// in case we only want to update properties without changing status
if req.RequestedFlowState != protos.FlowStatus_STATUS_UNKNOWN {
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING)
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState != protos.FlowStatus_STATUS_TERMINATED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING)
if err != nil {
return nil, err
}
_, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{
WorkflowId: workflowID,
FlowJobName: req.FlowJobName,
SourcePeer: req.SourcePeer,
DestinationPeer: req.DestinationPeer,
RemoveFlowEntry: false,
})
} else if req.RequestedFlowState != currState {
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal workflow: %w", err)
}
}

return &protos.FlowStateChangeResponse{
Expand Down
40 changes: 36 additions & 4 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

func (h *FlowRequestHandler) MirrorStatus(
Expand Down Expand Up @@ -81,6 +82,18 @@ func (h *FlowRequestHandler) CDCFlowStatus(
if err != nil {
return nil, err
}
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
}
state, err := h.getCDCWorkflowState(ctx, workflowID)
if err != nil {
return nil, err
}

// patching config to show latest values from state
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize

var initialCopyStatus *protos.SnapshotStatus

Expand Down Expand Up @@ -339,16 +352,16 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
var state protos.FlowStatus
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
return state, nil
}
Expand All @@ -365,3 +378,22 @@ func (h *FlowRequestHandler) updateWorkflowStatus(
}
return nil
}

func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context,
workflowID string,
) (*peerflow.CDCFlowWorkflowState, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.CDCFlowStateQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state peerflow.CDCFlowWorkflowState
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
return &state, nil
}
2 changes: 1 addition & 1 deletion flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
NormalizeSyncDoneSignalName = "normalize-sync-done"

// Queries
CDCFlowStateQuery = "q-cdc-flow-status"
CDCFlowStateQuery = "q-cdc-flow-state"
QRepFlowStateQuery = "q-qrep-flow-state"
FlowStatusQuery = "q-flow-status"

Expand Down
93 changes: 41 additions & 52 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type CDCFlowWorkflowState struct {
TableNameSchemaMapping map[string]*protos.TableSchema
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
// options passed to all NormalizeFlows
NormalizeFlowOptions *protos.NormalizeFlowOptions
}

// returns a new empty PeerFlowState
Expand All @@ -75,6 +79,8 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState {
SrcTableIdNameMapping: nil,
TableNameSchemaMapping: nil,
FlowConfigUpdates: nil,
SyncFlowOptions: nil,
NormalizeFlowOptions: nil,
}
}

Expand Down Expand Up @@ -356,44 +362,17 @@ func CDCFlowWorkflowWithConfig(
}
}

syncFlowOptions := &protos.SyncFlowOptions{
BatchSize: limits.MaxBatchSize,
IdleTimeoutSeconds: 0,
state.SyncFlowOptions = &protos.SyncFlowOptions{
BatchSize: limits.MaxBatchSize,
// this means the env variable assignment path is never hit
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
}
normalizeFlowOptions := &protos.NormalizeFlowOptions{
state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{
TableNameSchemaMapping: state.TableNameSchemaMapping,
}

// add a signal to change CDC properties
cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
cdcPropertiesSelector := workflow.NewSelector(ctx)
cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) {
var cdcConfigUpdate *protos.CDCFlowConfigUpdate
c.Receive(ctx, &cdcConfigUpdate)
// only modify for options since SyncFlow uses it
if cdcConfigUpdate.BatchSize > 0 {
syncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize
}
if cdcConfigUpdate.IdleTimeout > 0 {
syncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

slog.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(syncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(syncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})

cdcPropertiesSelector.AddDefault(func() {
w.logger.Info("no batch size signal received, batch size remains: ",
syncFlowOptions.BatchSize)
})

currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

Expand All @@ -416,7 +395,7 @@ func CDCFlowWorkflowWithConfig(
normCtx,
NormalizeFlowWorkflow,
cfg,
normalizeFlowOptions,
state.NormalizeFlowOptions,
)

var normWaitChan workflow.ReceiveChannel
Expand Down Expand Up @@ -454,6 +433,27 @@ func CDCFlowWorkflowWithConfig(
c.ReceiveAsync(&signalVal)
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
})
// add a signal to change CDC properties
cdcPropertiesSignalChan := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
mainLoopSelector.AddReceive(cdcPropertiesSignalChan, func(c workflow.ReceiveChannel, more bool) {
var cdcConfigUpdate *protos.CDCFlowConfigUpdate
c.Receive(ctx, &cdcConfigUpdate)
// only modify for options since SyncFlow uses it
if cdcConfigUpdate.BatchSize > 0 {
state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize
}
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

slog.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})

for {
for !canceled && mainLoopSelector.HasPending() {
Expand All @@ -466,31 +466,22 @@ func CDCFlowWorkflowWithConfig(
if state.ActiveSignal == shared.PauseSignal {
startTime := time.Now()
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
var signalVal shared.CDCFlowSignal

for state.ActiveSignal == shared.PauseSignal {
w.logger.Info("mirror has been paused for ", time.Since(startTime))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
// only process config updates when going from STATUS_PAUSED to STATUS_RUNNING
if state.ActiveSignal == shared.NoopSignal {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch)
if err != nil {
return state, err
}
mainLoopSelector.Select(ctx)
if state.ActiveSignal == shared.NoopSignal {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch)
if err != nil {
return state, err
}
} else if err := ctx.Err(); err != nil {
return nil, err
}
}

w.logger.Info("mirror has been resumed after ", time.Since(startTime))
}

cdcPropertiesSelector.Select(ctx)
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING

// check if total sync flows have been completed
Expand Down Expand Up @@ -526,12 +517,12 @@ func CDCFlowWorkflowWithConfig(
WaitForCancellation: true,
}
syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
childSyncFlowFuture := workflow.ExecuteChildWorkflow(
syncCtx,
SyncFlowWorkflow,
cfg,
syncFlowOptions,
state.SyncFlowOptions,
)

var syncDone bool
Expand All @@ -554,7 +545,6 @@ func CDCFlowWorkflowWithConfig(
if childSyncFlowRes != nil {
tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas)

var normalizeTableNameSchemaMapping map[string]*protos.TableSchema
// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltasCount != 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
Expand Down Expand Up @@ -583,14 +573,13 @@ func CDCFlowWorkflowWithConfig(
dstTable := modifiedDstTables[i]
state.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable]
}
normalizeTableNameSchemaMapping = state.TableNameSchemaMapping
}
}

signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{
Done: false,
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: normalizeTableNameSchemaMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
})
normalizeSignalError = signalFuture.Get(ctx, nil)
} else {
Expand Down
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ message QRepMirrorStatus {
// or if we are in the continuous streaming mode.
}

// to be removed eventually
message CDCSyncStatus {
int64 start_lsn = 1;
int64 end_lsn = 2;
Expand Down
23 changes: 23 additions & 0 deletions ui/app/api/mirrors/state/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import {
MirrorStatusRequest,
MirrorStatusResponse,
} from '@/grpc_generated/route';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body: MirrorStatusRequest = await request.json();
const flowServiceAddr = GetFlowHttpAddressFromEnv();
console.log('/mirrors/state: req:', body);
try {
const res: MirrorStatusResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/${body.flowJobName}`,
{ cache: 'no-store' }
).then((res) => {
return res.json();
});

return new Response(JSON.stringify(res));
} catch (e) {
console.error(e);
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 6969882

Please sign in to comment.