Skip to content

Commit

Permalink
Use Selector in BoundSelector to wait in parallel (#683)
Browse files Browse the repository at this point in the history
BoundSelector is only used by table cloning

Selector does some juggling so that Select waits on all futures,
changing BoundSelector to use Selector avoids pathological cases

Imagine parallelism is 4. The time to clone tables is as follows: 15m 1m 2m 3m 4m 5m

Currently, cloning will take 20m (waits for 15m at 4m, then immediately adds 4m & 5m)
Optimally, cloning would take 15m. 4m would start after 1m
& another minute later 5m would start
  • Loading branch information
serprex authored Apr 5, 2024
1 parent d779847 commit 93c1adf
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 35 deletions.
48 changes: 22 additions & 26 deletions flow/concurrency/bound_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,44 @@ import (
)

type BoundSelector struct {
futures []workflow.Future
ferrors []error
limit int
selector workflow.Selector
ferrors []error
limit int
count int
}

func NewBoundSelector(limit int) *BoundSelector {
func NewBoundSelector(ctx workflow.Context, limit int) *BoundSelector {
return &BoundSelector{
limit: limit,
limit: limit,
selector: workflow.NewSelector(ctx),
}
}

func (s *BoundSelector) SpawnChild(chCtx workflow.Context, w interface{}, args ...interface{}) {
if len(s.futures) >= s.limit {
s.waitOne(chCtx)
func (s *BoundSelector) SpawnChild(ctx workflow.Context, w interface{}, args ...interface{}) {
if s.count >= s.limit {
s.waitOne(ctx)
}

future := workflow.ExecuteChildWorkflow(chCtx, w, args...)
s.futures = append(s.futures, future)
future := workflow.ExecuteChildWorkflow(ctx, w, args...)
s.selector.AddFuture(future, func(f workflow.Future) {
if err := f.Get(ctx, nil); err != nil {
s.ferrors = append(s.ferrors, err)
}
})
s.count += 1
}

func (s *BoundSelector) waitOne(ctx workflow.Context) {
if len(s.futures) == 0 {
return
}

f := s.futures[0]
s.futures = s.futures[1:]

err := f.Get(ctx, nil)
if err != nil {
s.ferrors = append(s.ferrors, err)
if s.count > 0 {
s.selector.Select(ctx)
s.count -= 1
}
}

func (s *BoundSelector) Wait(ctx workflow.Context) error {
for len(s.futures) > 0 {
for s.count > 0 {
s.waitOne(ctx)
}

if len(s.ferrors) > 0 {
return errors.Join(s.ferrors...)
}

return nil
return errors.Join(s.ferrors...)
}
36 changes: 36 additions & 0 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,42 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() {
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Multiple_Parallel_Initial() {
tableMapping := make([]*protos.TableMapping, 5)
for i := range tableMapping {
srcTable := fmt.Sprintf("test_multi_init_%d", i)
dstTable := srcTable + "_dst"
s.setupSourceTable(srcTable, (i+1)*101)
tableMapping[i] = &protos.TableMapping{
SourceTableIdentifier: s.attachSchemaSuffix(srcTable),
DestinationTableIdentifier: s.attachSchemaSuffix(dstTable),
}
}

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_multi_init"),
}
config := &protos.FlowConnectionConfigs{
DoInitialSnapshot: true,
InitialSnapshotOnly: true,
FlowJobName: connectionGen.FlowJobName,
Destination: s.peer,
TableMappings: tableMapping,
Source: e2e.GeneratePostgresPeer(),
CdcStagingPath: "",
SnapshotMaxParallelWorkers: 4,
SnapshotNumTablesInParallel: 3,
}

tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, config, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
for _, tm := range config.TableMappings {
require.NoError(s.t, s.comparePGTables(tm.SourceTableIdentifier, tm.DestinationTableIdentifier, "id,address,asset_id"))
}
}

func (s PeerFlowE2ETestSuitePG) Test_ContinueAsNew() {
srcTableName := s.attachSchemaSuffix("test_continueasnew")
dstTableName := s.attachSchemaSuffix("test_continueasnew_dst")
Expand Down
6 changes: 2 additions & 4 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ func TestPeerFlowE2ETestSuitePG(t *testing.T) {
}

func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) {
err := e2e.CreateTableForQRep(s.Conn(), s.suffix, tableName)
require.NoError(s.t, err)
require.NoError(s.t, e2e.CreateTableForQRep(s.Conn(), s.suffix, tableName))

if rowCount > 0 {
err = e2e.PopulateSourceTable(s.Conn(), s.suffix, tableName, rowCount)
require.NoError(s.t, err)
require.NoError(s.t, e2e.PopulateSourceTable(s.Conn(), s.suffix, tableName, rowCount))
}
}

Expand Down
5 changes: 1 addition & 4 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,8 @@ func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error {
CREATE TABLE e2e_test_%s.%s (
%s
);`, suffix, tableName, tblFieldStr))
if err != nil {
return err
}

return nil
return err
}

func generate20MBJson() ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *SnapshotFlowExecution) cloneTables(
cloneTablesInput.snapshotName)
}

boundSelector := concurrency.NewBoundSelector(cloneTablesInput.maxParallelClones)
boundSelector := concurrency.NewBoundSelector(ctx, cloneTablesInput.maxParallelClones)

defaultPartitionCol := "ctid"
if !cloneTablesInput.supportsTIDScans {
Expand Down

0 comments on commit 93c1adf

Please sign in to comment.