Skip to content

Commit

Permalink
Convert rest of e2e/postgres to WaitFor
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 5, 2024
1 parent d399397 commit cf0b228
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 56 deletions.
122 changes: 67 additions & 55 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {

err = s.comparePGTables(srcTableName, dstTableName, "id,key,value")
require.NoError(s.t, err)

env.AssertExpectations(s.t)
}

func WaitFuncSchema(
Expand Down Expand Up @@ -268,7 +266,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
wg.Wait()
env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
Expand Down Expand Up @@ -337,9 +334,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
time.Sleep(10 * time.Second)
wg.Wait()
env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
Expand Down Expand Up @@ -419,8 +414,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
// verify our updates and delete happened
err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2")
require.NoError(s.t, err)

env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
Expand Down Expand Up @@ -457,13 +450,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
ExitAfterRecords: -1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

// insert 10 rows into the source table
Expand All @@ -476,28 +472,24 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
}
s.t.Log("Inserted 10 rows into the source table")

e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize 10 rows", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") == nil
})
_, err = s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize update", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") == nil
})

// verify our updates and delete happened
err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2")
require.NoError(s.t, err)
env.CancelWorkflow()
}()

env.AssertExpectations(s.t)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
wg.Wait()
}

func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
Expand Down Expand Up @@ -560,7 +552,6 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
require.Contains(s.t, err.Error(), "continue as new")
checkErr := s.checkPeerdbColumns(dstTableName, 1)
require.NoError(s.t, checkErr)
env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
Expand Down Expand Up @@ -602,7 +593,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 3,
ExitAfterRecords: -1,
MaxBatchSize: 100,
}

Expand All @@ -618,35 +609,42 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize update", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})
// since we delete stuff, create another table to compare with
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)

e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil
})

env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

wg.Wait()

// verify our updates and delete happened
err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t")
require.NoError(s.t, err)

softDeleteQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`,
dstTableName)
numRows, err := s.countRowsInQuery(softDeleteQuery)
softDeleteQuery := fmt.Sprintf(
`SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`,
dstTableName,
)
numRows, err := s.RunInt64Query(softDeleteQuery)
require.NoError(s.t, err)
require.Equal(s.t, int64(1), numRows)
}
Expand Down Expand Up @@ -729,9 +727,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() {
require.NoError(s.t, err)

softDeleteQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`,
SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`,
dstTableName)
numRows, err := s.countRowsInQuery(softDeleteQuery)
numRows, err := s.RunInt64Query(softDeleteQuery)
require.NoError(s.t, err)
require.Equal(s.t, int64(1), numRows)
}
Expand Down Expand Up @@ -781,13 +779,18 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})

insertTx, err := s.pool.Begin(context.Background())
e2e.EnvNoError(s.t, env, err)
Expand All @@ -806,21 +809,24 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
e2e.EnvNoError(s.t, env, err)

e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))

e2e.EnvWaitFor(s.t, env, time.Minute, "normalize transaction", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil
})

env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")
wg.Wait()

// verify our updates and delete happened
err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t")
require.NoError(s.t, err)

softDeleteQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`,
SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`,
dstTableName)
numRows, err := s.countRowsInQuery(softDeleteQuery)
numRows, err := s.RunInt64Query(softDeleteQuery)
require.NoError(s.t, err)
require.Equal(s.t, int64(1), numRows)
}
Expand Down Expand Up @@ -863,41 +869,47 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() {
}

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 3,
ExitAfterRecords: -1,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert and delete rows in the table.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil
})
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize reinsert", func(ctx context.Context) bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})

env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t")
require.NoError(s.t, err)
wg.Wait()

softDeleteQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`,
SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`,
dstTableName)
numRows, err := s.countRowsInQuery(softDeleteQuery)
numRows, err := s.RunInt64Query(softDeleteQuery)
require.NoError(s.t, err)
require.Equal(s.t, int64(0), numRows)
}
2 changes: 1 addition & 1 deletion flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error {
return rows.Err()
}

func (s PeerFlowE2ETestSuitePG) countRowsInQuery(query string) (int64, error) {
func (s PeerFlowE2ETestSuitePG) RunInt64Query(query string) (int64, error) {
var count pgtype.Int8
err := s.pool.QueryRow(context.Background(), query).Scan(&count)
return count.Int64, err
Expand Down

0 comments on commit cf0b228

Please sign in to comment.