Skip to content

Commit

Permalink
adjust timeouts, refactor WaitFuncSchema into WaitForSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 6, 2024
1 parent 6b783ff commit c95180e
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 122 deletions.
86 changes: 40 additions & 46 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1379,19 +1379,17 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete",
func(ctx context.Context) bool {
pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, srcName, "id,c1,c2,t")
if err != nil {
return false
}
rows, err := s.GetRowsWhere(tableName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED")
if err != nil {
return false
}
return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows)
},
)
e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize delete", func() bool {
pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, srcName, "id,c1,c2,t")
if err != nil {
return false
}
rows, err := s.GetRowsWhere(tableName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED")
if err != nil {
return false
}
return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows)
})

env.CancelWorkflow()
}()
Expand Down Expand Up @@ -1556,27 +1554,25 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
e2e.EnvNoError(s.t, env, err)
e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))

e2e.EnvWaitFor(s.t, env, time.Minute, "normalize transaction",
func(ctx context.Context) bool {
pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, srcName, "id,c1,c2,t")
e2e.EnvNoError(s.t, env, err)
rows, err := s.GetRowsWhere(dstName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED")
if err != nil {
return false
}
if !e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) {
return false
}
e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize transaction", func() bool {
pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, srcName, "id,c1,c2,t")
e2e.EnvNoError(s.t, env, err)
rows, err := s.GetRowsWhere(dstName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED")
if err != nil {
return false
}
if !e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) {
return false
}

newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
s.bqHelper.Config.DatasetId, dstName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
e2e.EnvNoError(s.t, env, err)
s.t.Log("waiting on _PEERDB_IS_DELETED to be 1, currently", numNewRows)
return err == nil && numNewRows == 1
},
)
newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
s.bqHelper.Config.DatasetId, dstName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
e2e.EnvNoError(s.t, env, err)
s.t.Log("waiting on _PEERDB_IS_DELETED to be 1, currently", numNewRows)
return err == nil && numNewRows == 1
})

env.CancelWorkflow()
}()
Expand Down Expand Up @@ -1638,19 +1634,17 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete",
func(ctx context.Context) bool {
pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, tableName, "id,c1,c2,t")
if err != nil {
return false
}
rows, err := s.GetRowsWhere(tableName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED")
if err != nil {
return false
}
return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows)
},
)
e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize delete", func() bool {
pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, tableName, "id,c1,c2,t")
if err != nil {
return false
}
rows, err := s.GetRowsWhere(tableName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED")
if err != nil {
return false
}
return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows)
})
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
Expand Down
136 changes: 66 additions & 70 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func (s PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string {
Expand Down Expand Up @@ -101,14 +102,17 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
require.NoError(s.t, err)
}

func WaitFuncSchema(
s PeerFlowE2ETestSuitePG,
func (s PeerFlowE2ETestSuitePG) WaitForSchema(
env *testsuite.TestWorkflowEnvironment,
reason string,
srcTableName string,
dstTableName string,
cols string,
expectedSchema *protos.TableSchema,
) func(context.Context) bool {
return func(ctx context.Context) bool {
) {
s.t.Helper()
e2e.EnvWaitFor(s.t, env, 2*time.Minute, reason, func() bool {
s.t.Helper()
output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{dstTableName},
})
Expand All @@ -121,7 +125,7 @@ func WaitFuncSchema(
return false
}
return s.comparePGTables(srcTableName, dstTableName, cols) == nil
}
})
}

func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
Expand Down Expand Up @@ -163,18 +167,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted initial row in the source table")

e2e.EnvWaitFor(s.t, env, time.Minute, "normalizing first row",
WaitFuncSchema(s, srcTableName, dstTableName, "id,c1", &protos.TableSchema{
TableIdentifier: dstTableName,
ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT"},
ColumnTypes: []string{
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindTimestamp),
},
PrimaryKeyColumns: []string{"id"},
}),
)
s.WaitForSchema(env, "normalizing first row", srcTableName, dstTableName, "id,c1", &protos.TableSchema{
TableIdentifier: dstTableName,
ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT"},
ColumnTypes: []string{
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindTimestamp),
},
PrimaryKeyColumns: []string{"id"},
})

// alter source table, add column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -186,19 +188,17 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c2 in the source table")

e2e.EnvWaitFor(s.t, env, time.Minute, "normalizing altered row",
WaitFuncSchema(s, srcTableName, dstTableName, "id,c1,c2", &protos.TableSchema{
TableIdentifier: dstTableName,
ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT", "c2"},
ColumnTypes: []string{
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindTimestamp),
string(qvalue.QValueKindInt64),
},
PrimaryKeyColumns: []string{"id"},
}),
)
s.WaitForSchema(env, "normalizing altered row", srcTableName, dstTableName, "id,c1,c2", &protos.TableSchema{
TableIdentifier: dstTableName,
ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT", "c2"},
ColumnTypes: []string{
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindTimestamp),
string(qvalue.QValueKindInt64),
},
PrimaryKeyColumns: []string{"id"},
})

// alter source table, add column c3, drop column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -210,20 +210,18 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c3 in the source table")

e2e.EnvWaitFor(s.t, env, time.Minute, "normalizing dropped column row",
WaitFuncSchema(s, srcTableName, dstTableName, "id,c1,c3", &protos.TableSchema{
TableIdentifier: dstTableName,
ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT", "c2", "c3"},
ColumnTypes: []string{
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindTimestamp),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
},
PrimaryKeyColumns: []string{"id"},
}),
)
s.WaitForSchema(env, "normalizing dropped column row", srcTableName, dstTableName, "id,c1,c3", &protos.TableSchema{
TableIdentifier: dstTableName,
ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT", "c2", "c3"},
ColumnTypes: []string{
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindTimestamp),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
},
PrimaryKeyColumns: []string{"id"},
})

// alter source table, drop column c3 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -235,20 +233,18 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row after dropping all columns in the source table")

e2e.EnvWaitFor(s.t, env, time.Minute, "normalizing 2nd dropped column row",
WaitFuncSchema(s, srcTableName, dstTableName, "id,c1", &protos.TableSchema{
TableIdentifier: dstTableName,
ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT", "c2", "c3"},
ColumnTypes: []string{
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindTimestamp),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
},
PrimaryKeyColumns: []string{"id"},
}),
)
s.WaitForSchema(env, "normalizing 2nd dropped column row", srcTableName, dstTableName, "id,c1", &protos.TableSchema{
TableIdentifier: dstTableName,
ColumnNames: []string{"id", "c1", "_PEERDB_SYNCED_AT", "c2", "c3"},
ColumnTypes: []string{
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindTimestamp),
string(qvalue.QValueKindInt64),
string(qvalue.QValueKindInt64),
},
PrimaryKeyColumns: []string{"id"},
})

env.CancelWorkflow()
}()
Expand Down Expand Up @@ -302,7 +298,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
}
s.t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitFor(s.t, env, time.Minute, "normalize 10 rows", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize 10 rows", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})

Expand All @@ -311,7 +307,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize modifications", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize modifications", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})
env.CancelWorkflow()
Expand Down Expand Up @@ -450,7 +446,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
}
s.t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitFor(s.t, env, time.Minute, "normalize 10 rows", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize 10 rows", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") == nil
})
_, err = s.pool.Exec(context.Background(),
Expand All @@ -459,7 +455,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
e2e.EnvNoError(s.t, env, err)

e2e.EnvWaitFor(s.t, env, time.Minute, "normalize update", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize update", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") == nil
})

Expand Down Expand Up @@ -581,13 +577,13 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize update", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize update", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})
// since we delete stuff, create another table to compare with
Expand All @@ -598,7 +594,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)

e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete", func() bool {
return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil
})

Expand Down Expand Up @@ -756,7 +752,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})

Expand All @@ -773,7 +769,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
e2e.EnvNoError(s.t, env, err)
e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))

e2e.EnvWaitFor(s.t, env, time.Minute, "normalize transaction", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize transaction", func() bool {
return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil
})

Expand Down Expand Up @@ -843,19 +839,19 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize row", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize delete", func() bool {
return s.comparePGTables(srcTableName, dstTableName+` WHERE NOT "_PEERDB_IS_DELETED"`, "id,c1,c2,t") == nil
})
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize reinsert", func(ctx context.Context) bool {
e2e.EnvWaitFor(s.t, env, time.Minute, "normalize reinsert", func() bool {
return s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") == nil
})

Expand Down
Loading

0 comments on commit c95180e

Please sign in to comment.