Skip to content

Commit

Permalink
fix type errors
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 6, 2024
1 parent ae10042 commit 6b783ff
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 133 deletions.
89 changes: 36 additions & 53 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 0,
Expand Down Expand Up @@ -274,8 +273,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 0,
Expand Down Expand Up @@ -319,8 +317,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
Expand All @@ -330,7 +327,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert 10 rows into the source table
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
Expand Down Expand Up @@ -385,8 +382,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 4,
Expand All @@ -396,7 +392,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
/*
Executing a transaction which
1. changes both toast column
Expand Down Expand Up @@ -452,8 +448,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 0,
Expand All @@ -464,7 +459,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
// and execute a transaction touching toast columns
done := make(chan struct{})
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
/* transaction updating no rows */
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
BEGIN;
Expand Down Expand Up @@ -515,8 +510,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 11,
Expand All @@ -526,7 +520,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// complex transaction with random DMLs on a table with toast columns
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
BEGIN;
Expand Down Expand Up @@ -587,8 +581,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 6,
Expand All @@ -598,7 +591,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// complex transaction with random DMLs on a table with toast columns
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
BEGIN;
Expand Down Expand Up @@ -654,8 +647,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 4,
Expand All @@ -665,7 +657,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
/*
transaction updating a single row
multiple times with changed/unchanged toast columns
Expand Down Expand Up @@ -720,8 +712,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 1,
Expand All @@ -731,7 +722,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
/* test inserting various types*/
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s SELECT 2,2,b'1',b'101',
Expand Down Expand Up @@ -800,8 +791,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 10,
Expand All @@ -811,7 +801,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert 10 rows into the source table
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 4 invalid shapes and 6 valid shapes into the source table
for i := 0; i < 4; i++ {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -881,8 +871,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 2,
Expand All @@ -892,7 +881,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
/* inserting across multiple tables*/
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (c1,c2) VALUES (1,'dummy_1');
Expand Down Expand Up @@ -941,8 +930,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: -1,
Expand All @@ -953,7 +941,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// and then insert and mutate schema repeatedly.
go func() {
// insert first row.
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES (1)`, srcTableName))
e2e.EnvNoError(s.t, env, err)
Expand Down Expand Up @@ -1032,8 +1020,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: -1,
Expand All @@ -1043,7 +1030,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
Expand Down Expand Up @@ -1098,8 +1085,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 20,
Expand All @@ -1109,7 +1095,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
rowsTx, err := s.pool.Begin(context.Background())
e2e.EnvNoError(s.t, env, err)

Expand Down Expand Up @@ -1173,8 +1159,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: -1,
Expand All @@ -1184,7 +1169,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -1233,16 +1218,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
SoftDelete: true,
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 2,
MaxBatchSize: 100,
}

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 1 row into the source table
testKey := fmt.Sprintf("test_key_%d", 1)
testValue := fmt.Sprintf("test_value_%d", 1)
Expand Down Expand Up @@ -1298,8 +1282,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
CdcStagingPath: "",
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
require.NoError(s.t, err)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 2,
Expand All @@ -1309,7 +1292,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and execute a transaction touching toast columns
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
/* inserting across multiple tables*/
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (c1,c2) VALUES (1,'dummy_1');
Expand Down Expand Up @@ -1383,7 +1366,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
Expand Down Expand Up @@ -1469,7 +1452,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

insertTx, err := s.pool.Begin(context.Background())
e2e.EnvNoError(s.t, env, err)
Expand Down Expand Up @@ -1553,7 +1536,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
Expand Down Expand Up @@ -1646,7 +1629,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
Expand Down
Loading

0 comments on commit 6b783ff

Please sign in to comment.