Skip to content

Commit

Permalink
Merge branch 'main' into remove-temporal-testsuite
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 3, 2024
2 parents 464f61c + 72cfd9d commit 31aa444
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 109 deletions.
6 changes: 0 additions & 6 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,8 +1152,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1)
// we have limited batch size to 6, so atleast 3 syncs needed
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

if !s.t.Failed() {
addRows(1)
Expand Down Expand Up @@ -1199,10 +1197,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2)
// 3 from first insert of 18 rows in 1 table
// 3 from second insert of 18 rows in 2 tables, batch size updated
// TODO 5 to pass test, probably need to fix code losing a sync flow status
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 5)
}

env.Cancel()
Expand Down
14 changes: 1 addition & 13 deletions flow/model/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ var SyncStopSignal = TypedSignal[struct{}]{
Name: "sync-stop",
}

var SyncErrorSignal = TypedSignal[string]{
Name: "sync-error",
}

var SyncResultSignal = TypedSignal[SyncResponse]{
var SyncResultSignal = TypedSignal[*SyncResponse]{
Name: "sync-result",
}

Expand All @@ -150,14 +146,6 @@ var NormalizeSignal = TypedSignal[NormalizePayload]{
Name: "normalize",
}

var NormalizeErrorSignal = TypedSignal[string]{
Name: "normalize-error",
}

var NormalizeResultSignal = TypedSignal[NormalizeResponse]{
Name: "normalize-result",
}

var NormalizeDoneSignal = TypedSignal[struct{}]{
Name: "normalize-done",
}
83 changes: 11 additions & 72 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,8 @@ import (
)

type CDCFlowWorkflowState struct {
// Progress events for the peer flow.
Progress []string
// Accumulates status for sync flows spawned.
SyncFlowStatuses []model.SyncResponse
// Accumulates status for normalize flows spawned.
NormalizeFlowStatuses []model.NormalizeResponse
// Current signalled state of the peer flow.
ActiveSignal model.CDCFlowSignal
// Errors encountered during child sync flow executions.
SyncFlowErrors []string
// Errors encountered during child sync flow executions.
NormalizeFlowErrors []string
// Global mapping of relation IDs to RelationMessages sent as a part of logical replication.
// Needed to support schema changes.
RelationMessageMapping model.RelationMessageMapping
Expand All @@ -51,15 +41,10 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow
tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping))
}
return &CDCFlowWorkflowState{
Progress: []string{"started"},
// 1 more than the limit of 10
SyncFlowStatuses: make([]model.SyncResponse, 0, 11),
NormalizeFlowStatuses: make([]model.NormalizeResponse, 0, 11),
ActiveSignal: model.NoopSignal,
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
FlowConfigUpdate: nil,
ActiveSignal: model.NoopSignal,
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
FlowConfigUpdate: nil,
SyncFlowOptions: &protos.SyncFlowOptions{
BatchSize: cfg.MaxBatchSize,
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
Expand All @@ -68,32 +53,6 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow
}
}

// truncate the progress and other arrays to a max of 10 elements
func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) {
if len(s.Progress) > 10 {
copy(s.Progress, s.Progress[len(s.Progress)-10:])
s.Progress = s.Progress[:10]
}
if len(s.SyncFlowStatuses) > 10 {
copy(s.SyncFlowStatuses, s.SyncFlowStatuses[len(s.SyncFlowStatuses)-10:])
s.SyncFlowStatuses = s.SyncFlowStatuses[:10]
}
if len(s.NormalizeFlowStatuses) > 10 {
copy(s.NormalizeFlowStatuses, s.NormalizeFlowStatuses[len(s.NormalizeFlowStatuses)-10:])
s.NormalizeFlowStatuses = s.NormalizeFlowStatuses[:10]
}

if s.SyncFlowErrors != nil {
logger.Warn("SyncFlowErrors", slog.Any("errors", s.SyncFlowErrors))
s.SyncFlowErrors = nil
}

if s.NormalizeFlowErrors != nil {
logger.Warn("NormalizeFlowErrors", slog.Any("errors", s.NormalizeFlowErrors))
s.NormalizeFlowErrors = nil
}
}

// CDCFlowWorkflowExecution represents the state for execution of a peer flow.
type CDCFlowWorkflowExecution struct {
flowExecutionID string
Expand Down Expand Up @@ -429,7 +388,7 @@ func CDCFlowWorkflow(
}

state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")
w.logger.Info("executed setup flow and snapshot flow")

// if initial_copy_only is opted for, we end the flow here.
if cfg.InitialSnapshotOnly {
Expand Down Expand Up @@ -492,7 +451,6 @@ func CDCFlowWorkflow(
err := f.Get(ctx, nil)
if err != nil {
handleError("sync", err)
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
}

if restart {
Expand All @@ -504,7 +462,6 @@ func CDCFlowWorkflow(
}).Get(ctx, nil)
} else {
w.logger.Warn("sync flow ended, restarting", slog.Any("error", err))
state.TruncateProgress(w.logger)
w.startSyncFlow(syncCtx, cfg, state.SyncFlowOptions)
mainLoopSelector.AddFuture(w.syncFlowFuture, handleSyncFlow)
}
Expand All @@ -513,7 +470,6 @@ func CDCFlowWorkflow(
err := f.Get(ctx, nil)
if err != nil {
handleError("normalize", err)
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err.Error())
}

if restart {
Expand All @@ -522,7 +478,6 @@ func CDCFlowWorkflow(
finished = true
} else {
w.logger.Warn("normalize flow ended, restarting", slog.Any("error", err))
state.TruncateProgress(w.logger)
w.startNormFlow(normCtx, cfg)
mainLoopSelector.AddFuture(w.normFlowFuture, handleNormFlow)
}
Expand All @@ -538,30 +493,16 @@ func CDCFlowWorkflow(
state.ActiveSignal = model.FlowSignalHandler(state.ActiveSignal, val, w.logger)
})

syncErrorChan := model.SyncErrorSignal.GetSignalChannel(ctx)
syncErrorChan.AddToSelector(mainLoopSelector, func(err string, _ bool) {
syncCount += 1
state.SyncFlowErrors = append(state.SyncFlowErrors, err)
})
syncResultChan := model.SyncResultSignal.GetSignalChannel(ctx)
syncResultChan.AddToSelector(mainLoopSelector, func(result model.SyncResponse, _ bool) {
syncResultChan.AddToSelector(mainLoopSelector, func(result *model.SyncResponse, _ bool) {
syncCount += 1
if state.SyncFlowOptions.RelationMessageMapping == nil {
state.SyncFlowOptions.RelationMessageMapping = result.RelationMessageMapping
} else {
maps.Copy(state.SyncFlowOptions.RelationMessageMapping, result.RelationMessageMapping)
if result != nil {
if state.SyncFlowOptions.RelationMessageMapping == nil {
state.SyncFlowOptions.RelationMessageMapping = result.RelationMessageMapping
} else {
maps.Copy(state.SyncFlowOptions.RelationMessageMapping, result.RelationMessageMapping)
}
}
state.SyncFlowStatuses = append(state.SyncFlowStatuses, result)
})

normErrorChan := model.NormalizeErrorSignal.GetSignalChannel(ctx)
normErrorChan.AddToSelector(mainLoopSelector, func(err string, _ bool) {
state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, err)
})

normResultChan := model.NormalizeResultSignal.GetSignalChannel(ctx)
normResultChan.AddToSelector(mainLoopSelector, func(result model.NormalizeResponse, _ bool) {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, result)
})

normChan := model.NormalizeSignal.GetSignalChannel(ctx)
Expand Down Expand Up @@ -613,8 +554,6 @@ func CDCFlowWorkflow(
return nil, err
}

// important to control the size of inputs.
state.TruncateProgress(w.logger)
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
}
}
Expand Down
14 changes: 2 additions & 12 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,9 @@ func NormalizeFlowWorkflow(

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
_ = model.NormalizeErrorSignal.SignalExternalWorkflow(
ctx,
parent.ID,
"",
err.Error(),
).Get(ctx, nil)
logger.Info("Normalize errored", slog.Any("error", err))
} else if normalizeResponse != nil {
_ = model.NormalizeResultSignal.SignalExternalWorkflow(
ctx,
parent.ID,
"",
*normalizeResponse,
).Get(ctx, nil)
logger.Info("Normalize finished", slog.Any("result", normalizeResponse))
}
}

Expand Down
15 changes: 9 additions & 6 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

// For now cdc restarts sync flow whenever it itself restarts,
// set this value high enough to never be met, relying on cdc restarts.
// In the future cdc flow restarts could be decoupled from sync flow restarts.
const (
maxSyncsPerSyncFlow = 64
)
Expand Down Expand Up @@ -103,19 +106,19 @@ func SyncFlowWorkflow(
var childSyncFlowRes *model.SyncResponse
if err := f.Get(ctx, &childSyncFlowRes); err != nil {
logger.Error("failed to execute sync flow", slog.Any("error", err))
_ = model.SyncErrorSignal.SignalExternalWorkflow(
_ = model.SyncResultSignal.SignalExternalWorkflow(
ctx,
parent.ID,
"",
err.Error(),
nil,
).Get(ctx, nil)
syncErr = true
} else if childSyncFlowRes != nil {
_ = model.SyncResultSignal.SignalExternalWorkflow(
ctx,
parent.ID,
"",
*childSyncFlowRes,
childSyncFlowRes,
).Get(ctx, nil)
options.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping
totalRecordsSynced += childSyncFlowRes.NumRecordsSynced
Expand Down Expand Up @@ -145,12 +148,12 @@ func SyncFlowWorkflow(

var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
logger.Error("failed to execute schema update at source: ", err)
_ = model.SyncErrorSignal.SignalExternalWorkflow(
logger.Error("failed to execute schema update at source", slog.Any("error", err))
_ = model.SyncResultSignal.SignalExternalWorkflow(
ctx,
parent.ID,
"",
err.Error(),
nil,
).Get(ctx, nil)
} else {
for i, srcTable := range modifiedSrcTables {
Expand Down

0 comments on commit 31aa444

Please sign in to comment.