Skip to content

Commit

Permalink
pr review
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 24, 2024
1 parent 5075598 commit bcea38b
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 12 deletions.
2 changes: 2 additions & 0 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
// Signals
FlowSignalName = "peer-flow-signal"
CDCDynamicPropertiesSignalName = "cdc-dynamic-properties"
NormalizeSyncSignalName = "normalize-sync"
NormalizeSyncDoneSignalName = "normalize-sync-done"

// Queries
CDCFlowStateQuery = "q-cdc-flow-status"
Expand Down
6 changes: 3 additions & 3 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,11 @@ func CDCFlowWorkflowWithConfig(

var normWaitChan workflow.ReceiveChannel
if !peerdbenv.PeerDBEnableParallelSyncNormalize() {
normWaitChan = workflow.GetSignalChannel(ctx, "SyncDone")
normWaitChan = workflow.GetSignalChannel(ctx, shared.NormalizeSyncDoneSignalName)
}

finishNormalize := func() {
childNormalizeFlowFuture.SignalChildWorkflow(ctx, "Sync", model.NormalizeSignal{
childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{
Done: true,
SyncBatchID: -1,
})
Expand Down Expand Up @@ -597,7 +597,7 @@ func CDCFlowWorkflowWithConfig(
}
}

signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, "Sync", model.NormalizeSignal{
signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{
Done: false,
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: normalizeTableNameSchemaMapping,
Expand Down
10 changes: 8 additions & 2 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

func NormalizeFlowWorkflow(ctx workflow.Context,
Expand All @@ -25,7 +26,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context,

results := make([]model.NormalizeResponse, 0, 4)
errors := make([]string, 0)
syncChan := workflow.GetSignalChannel(ctx, "Sync")
syncChan := workflow.GetSignalChannel(ctx, shared.NormalizeSyncSignalName)

var stopLoop, canceled bool
var lastSyncBatchID, syncBatchID int64
Expand Down Expand Up @@ -54,6 +55,11 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
selector.Select(ctx)
}
if canceled || (stopLoop && lastSyncBatchID == syncBatchID) {
if canceled {
logger.Info("normalize canceled - ", config.FlowJobName)
} else {
logger.Info("normalize finished - ", config.FlowJobName)
}
break
}
if lastSyncBatchID != syncBatchID {
Expand Down Expand Up @@ -81,7 +87,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
ctx,
parent.ID,
parent.RunID,
"SyncDone",
shared.NormalizeSyncDoneSignalName,
struct{}{},
)
}
Expand Down
8 changes: 3 additions & 5 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,7 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta
return nil
}

func (q *QRepFlowExecution) receiveAndHandleSignalAsync(ctx workflow.Context) {
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)

func (q *QRepFlowExecution) receiveAndHandleSignalAsync(signalChan workflow.ReceiveChannel) {
var signalVal shared.CDCFlowSignal
ok := signalChan.ReceiveAsync(&signalVal)
if ok {
Expand Down Expand Up @@ -513,11 +511,11 @@ func QRepFlowWorkflow(

// here, we handle signals after the end of the flow because a new workflow does not inherit the signals
// and the chance of missing a signal is much higher if the check is before the time consuming parts run
q.receiveAndHandleSignalAsync(ctx)
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
q.receiveAndHandleSignalAsync(signalChan)
if q.activeSignal == shared.PauseSignal {
startTime := time.Now()
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
var signalVal shared.CDCFlowSignal

for q.activeSignal == shared.PauseSignal {
Expand Down
4 changes: 2 additions & 2 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ func XminFlowWorkflow(

// here, we handle signals after the end of the flow because a new workflow does not inherit the signals
// and the chance of missing a signal is much higher if the check is before the time consuming parts run
q.receiveAndHandleSignalAsync(ctx)
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
q.receiveAndHandleSignalAsync(signalChan)
if x.activeSignal == shared.PauseSignal {
startTime := time.Now()
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED
signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName)
var signalVal shared.CDCFlowSignal

for x.activeSignal == shared.PauseSignal {
Expand Down

0 comments on commit bcea38b

Please sign in to comment.