Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UI: Edit Page and Refactor Edit Mirror Route #1156

Merged
merged 5 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 49 additions & 33 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,47 +425,63 @@ func (h *FlowRequestHandler) FlowStateChange(
return nil, err
}

if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING)
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
if req.FlowConfigUpdate != nil && req.FlowConfigUpdate.GetCdcFlowConfigUpdate() != nil {
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.NoopSignal,
shared.CDCDynamicPropertiesSignalName,
req.FlowConfigUpdate.GetCdcFlowConfigUpdate(),
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState == protos.FlowStatus_STATUS_RUNNING || currState == protos.FlowStatus_STATUS_PAUSED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to signal workflow: %w", err)
}
_, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{
WorkflowId: workflowID,
FlowJobName: req.FlowJobName,
SourcePeer: req.SourcePeer,
DestinationPeer: req.DestinationPeer,
RemoveFlowEntry: false,
})
} else {
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal workflow: %w", err)

// in case we only want to update properties without changing status
if req.RequestedFlowState != protos.FlowStatus_STATUS_UNKNOWN {
if req.RequestedFlowState == protos.FlowStatus_STATUS_PAUSED &&
currState == protos.FlowStatus_STATUS_RUNNING {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_PAUSING)
if err != nil {
return nil, err
}
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.PauseSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_RUNNING &&
currState == protos.FlowStatus_STATUS_PAUSED {
err = h.temporalClient.SignalWorkflow(
ctx,
workflowID,
"",
shared.FlowSignalName,
shared.NoopSignal,
)
} else if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATED &&
(currState != protos.FlowStatus_STATUS_TERMINATED) {
err = h.updateWorkflowStatus(ctx, workflowID, protos.FlowStatus_STATUS_TERMINATING)
if err != nil {
return nil, err
}
_, err = h.ShutdownFlow(ctx, &protos.ShutdownRequest{
WorkflowId: workflowID,
FlowJobName: req.FlowJobName,
SourcePeer: req.SourcePeer,
DestinationPeer: req.DestinationPeer,
RemoveFlowEntry: false,
})
} else if req.RequestedFlowState != currState {
return nil, fmt.Errorf("illegal state change requested: %v, current state is: %v",
req.RequestedFlowState, currState)
}
if err != nil {
return nil, fmt.Errorf("unable to signal workflow: %w", err)
}
}

return &protos.FlowStateChangeResponse{
Expand Down
40 changes: 36 additions & 4 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

func (h *FlowRequestHandler) MirrorStatus(
Expand Down Expand Up @@ -81,6 +82,18 @@ func (h *FlowRequestHandler) CDCFlowStatus(
if err != nil {
return nil, err
}
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
return nil, err
}
state, err := h.getCDCWorkflowState(ctx, workflowID)
if err != nil {
return nil, err
}

// patching config to show latest values from state
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize

var initialCopyStatus *protos.SnapshotStatus

Expand Down Expand Up @@ -339,16 +352,16 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID string) (protos.FlowStatus, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.FlowStatusQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
var state protos.FlowStatus
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error()))
return protos.FlowStatus_STATUS_UNKNOWN,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err)
}
return state, nil
}
Expand All @@ -365,3 +378,22 @@ func (h *FlowRequestHandler) updateWorkflowStatus(
}
return nil
}

func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context,
workflowID string,
) (*peerflow.CDCFlowWorkflowState, error) {
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.CDCFlowStateQuery)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state peerflow.CDCFlowWorkflowState
err = res.Get(&state)
if err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
return &state, nil
}
2 changes: 1 addition & 1 deletion flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
NormalizeSyncDoneSignalName = "normalize-sync-done"

// Queries
CDCFlowStateQuery = "q-cdc-flow-status"
CDCFlowStateQuery = "q-cdc-flow-state"
QRepFlowStateQuery = "q-qrep-flow-state"
FlowStatusQuery = "q-flow-status"

Expand Down
93 changes: 41 additions & 52 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type CDCFlowWorkflowState struct {
TableNameSchemaMapping map[string]*protos.TableSchema
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
// options passed to all NormalizeFlows
NormalizeFlowOptions *protos.NormalizeFlowOptions
}

// returns a new empty PeerFlowState
Expand All @@ -75,6 +79,8 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState {
SrcTableIdNameMapping: nil,
TableNameSchemaMapping: nil,
FlowConfigUpdates: nil,
SyncFlowOptions: nil,
NormalizeFlowOptions: nil,
}
}

Expand Down Expand Up @@ -356,44 +362,17 @@ func CDCFlowWorkflowWithConfig(
}
}

syncFlowOptions := &protos.SyncFlowOptions{
BatchSize: limits.MaxBatchSize,
IdleTimeoutSeconds: 0,
state.SyncFlowOptions = &protos.SyncFlowOptions{
BatchSize: limits.MaxBatchSize,
// this means the env variable assignment path is never hit
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
}
normalizeFlowOptions := &protos.NormalizeFlowOptions{
state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{
TableNameSchemaMapping: state.TableNameSchemaMapping,
}

// add a signal to change CDC properties
cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
cdcPropertiesSelector := workflow.NewSelector(ctx)
cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) {
var cdcConfigUpdate *protos.CDCFlowConfigUpdate
c.Receive(ctx, &cdcConfigUpdate)
// only modify for options since SyncFlow uses it
if cdcConfigUpdate.BatchSize > 0 {
syncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize
}
if cdcConfigUpdate.IdleTimeout > 0 {
syncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

slog.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(syncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(syncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})

cdcPropertiesSelector.AddDefault(func() {
w.logger.Info("no batch size signal received, batch size remains: ",
syncFlowOptions.BatchSize)
})

currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

Expand All @@ -416,7 +395,7 @@ func CDCFlowWorkflowWithConfig(
normCtx,
NormalizeFlowWorkflow,
cfg,
normalizeFlowOptions,
state.NormalizeFlowOptions,
)

var normWaitChan workflow.ReceiveChannel
Expand Down Expand Up @@ -454,6 +433,27 @@ func CDCFlowWorkflowWithConfig(
c.ReceiveAsync(&signalVal)
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
})
// add a signal to change CDC properties
cdcPropertiesSignalChan := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
mainLoopSelector.AddReceive(cdcPropertiesSignalChan, func(c workflow.ReceiveChannel, more bool) {
var cdcConfigUpdate *protos.CDCFlowConfigUpdate
c.Receive(ctx, &cdcConfigUpdate)
// only modify for options since SyncFlow uses it
if cdcConfigUpdate.BatchSize > 0 {
state.SyncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize
}
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

slog.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})

for {
for !canceled && mainLoopSelector.HasPending() {
Expand All @@ -466,31 +466,22 @@ func CDCFlowWorkflowWithConfig(
if state.ActiveSignal == shared.PauseSignal {
startTime := time.Now()
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
var signalVal shared.CDCFlowSignal

for state.ActiveSignal == shared.PauseSignal {
w.logger.Info("mirror has been paused for ", time.Since(startTime))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger)
// only process config updates when going from STATUS_PAUSED to STATUS_RUNNING
if state.ActiveSignal == shared.NoopSignal {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch)
if err != nil {
return state, err
}
mainLoopSelector.Select(ctx)
if state.ActiveSignal == shared.NoopSignal {
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, limits, mirrorNameSearch)
if err != nil {
return state, err
}
} else if err := ctx.Err(); err != nil {
return nil, err
}
}

w.logger.Info("mirror has been resumed after ", time.Since(startTime))
}

cdcPropertiesSelector.Select(ctx)
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING

// check if total sync flows have been completed
Expand Down Expand Up @@ -526,12 +517,12 @@ func CDCFlowWorkflowWithConfig(
WaitForCancellation: true,
}
syncCtx := workflow.WithChildOptions(ctx, childSyncFlowOpts)
syncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
childSyncFlowFuture := workflow.ExecuteChildWorkflow(
syncCtx,
SyncFlowWorkflow,
cfg,
syncFlowOptions,
state.SyncFlowOptions,
)

var syncDone bool
Expand All @@ -554,7 +545,6 @@ func CDCFlowWorkflowWithConfig(
if childSyncFlowRes != nil {
tableSchemaDeltasCount := len(childSyncFlowRes.TableSchemaDeltas)

var normalizeTableNameSchemaMapping map[string]*protos.TableSchema
// slightly hacky: table schema mapping is cached, so we need to manually update it if schema changes.
if tableSchemaDeltasCount != 0 {
modifiedSrcTables := make([]string, 0, tableSchemaDeltasCount)
Expand Down Expand Up @@ -583,14 +573,13 @@ func CDCFlowWorkflowWithConfig(
dstTable := modifiedDstTables[i]
state.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable]
}
normalizeTableNameSchemaMapping = state.TableNameSchemaMapping
}
}

signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{
Done: false,
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: normalizeTableNameSchemaMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
})
normalizeSignalError = signalFuture.Get(ctx, nil)
} else {
Expand Down
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ message QRepMirrorStatus {
// or if we are in the continuous streaming mode.
}

// to be removed eventually
message CDCSyncStatus {
int64 start_lsn = 1;
int64 end_lsn = 2;
Expand Down
23 changes: 23 additions & 0 deletions ui/app/api/mirrors/state/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import {
MirrorStatusRequest,
MirrorStatusResponse,
} from '@/grpc_generated/route';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body: MirrorStatusRequest = await request.json();
const flowServiceAddr = GetFlowHttpAddressFromEnv();
console.log('/mirrors/state: req:', body);
try {
const res: MirrorStatusResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/${body.flowJobName}`,
{ cache: 'no-store' }
).then((res) => {
return res.json();
});

return new Response(JSON.stringify(res));
} catch (e) {
console.error(e);
}
}
Loading
Loading