Skip to content

Commit

Permalink
Try reverting e2e changes, only updating normalize status query to qu…
Browse files Browse the repository at this point in the history
…ery normalize flow instead of cdc flow
  • Loading branch information
serprex committed Jan 3, 2024
1 parent e1976c4 commit 97c309a
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 44 deletions.
23 changes: 3 additions & 20 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
wg := sync.WaitGroup{}
wg.Add(1)

done := make(chan struct{})
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
/* transaction updating no rows */
Expand All @@ -442,7 +440,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
`, srcTableName, srcTableName))
require.NoError(s.t, err)
s.t.Log("Executed a transaction touching toast columns")
wg.Done()
done <- struct{}{}
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand All @@ -456,7 +454,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
wg.Wait()
<-done
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
Expand Down Expand Up @@ -923,9 +921,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
MaxBatchSize: 100,
}

wg := sync.WaitGroup{}
wg.Add(1)

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert and mutate schema repeatedly.
go func() {
Expand Down Expand Up @@ -981,8 +976,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 8)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1")

wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand All @@ -994,7 +987,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

wg.Wait()
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -1034,8 +1026,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
// insert 10 rows into the source table
Expand All @@ -1057,11 +1047,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
require.NoError(s.t, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
wg.Wait()

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
Expand Down Expand Up @@ -1110,9 +1098,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
MaxBatchSize: 100,
}

wg := sync.WaitGroup{}
wg.Add(1)

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
Expand All @@ -1138,11 +1123,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {

err = rowsTx.Commit(context.Background())
require.NoError(s.t, err)
wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
wg.Wait()

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
Expand Down
20 changes: 0 additions & 20 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,9 +842,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
MaxBatchSize: 100,
}

wg := sync.WaitGroup{}
wg.Add(1)

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert and mutate schema repeatedly.
go func() {
Expand Down Expand Up @@ -990,8 +987,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
require.NoError(s.t, err)
require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
s.compareTableContentsSF("test_simple_schema_changes", "id,c1")

wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand All @@ -1003,7 +998,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

wg.Wait()
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -1040,9 +1034,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() {
MaxBatchSize: 100,
}

wg := sync.WaitGroup{}
wg.Add(1)

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
Expand All @@ -1066,11 +1057,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() {
require.NoError(s.t, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
wg.Wait()

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
Expand Down Expand Up @@ -1197,8 +1186,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() {

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

Expand All @@ -1218,11 +1205,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() {
require.NoError(s.t, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
wg.Wait()

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
Expand Down Expand Up @@ -1282,9 +1267,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
wg := sync.WaitGroup{}
wg.Add(1)

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

Expand All @@ -1304,11 +1286,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {
require.NoError(s.t, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
wg.Wait()
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,17 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment,
time.Sleep(5 * time.Second)
for {
response, err := env.QueryWorkflow(
peerflow.CDCFlowStatusQuery,
peerflow.CDCNormFlowStatusQuery,
connectionGen.FlowJobName,
)
if err == nil {
var state peerflow.CDCFlowWorkflowState
var state int
err = response.Get(&state)
if err != nil {
slog.Error(err.Error())
}

if len(state.NormalizeFlowStatuses) >= minCount {
if state >= minCount {
break
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

const (
CDCFlowStatusQuery = "q-cdc-flow-status"
CDCNormFlowStatusQuery = "q-norm-flow-status"
maxSyncFlowsPerCDCFlow = 32
)

Expand Down Expand Up @@ -169,7 +170,6 @@ func CDCFlowWorkflowWithConfig(
limits.TotalSyncFlows = maxSyncFlowsPerCDCFlow
}

// Support a Query for the current state of the peer flow.
err := workflow.SetQueryHandler(ctx, CDCFlowStatusQuery, func(jobName string) (CDCFlowWorkflowState, error) {
return *state, nil
})
Expand Down
7 changes: 7 additions & 0 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
errors := make([]string, 0)
syncChan := workflow.GetSignalChannel(normalizeFlowCtx, "Sync")

err := workflow.SetQueryHandler(ctx, CDCNormFlowStatusQuery, func(jobName string) (int, error) {
return len(results), nil
})
if err != nil {
errors = append(errors, err.Error())
}

stopLoop := false
needSync := true
for {
Expand Down

0 comments on commit 97c309a

Please sign in to comment.