Skip to content

Commit

Permalink
Temporal test workflow: don't log to os.Stdout (#1185)
Browse files Browse the repository at this point in the history
Logging to os.Stdout mixes up logging in test output,
wrap t.Log in an io.Writer interface to avoid that

This is not ideal, there are proposals to address this in go:
1. golang/go#22513
2. golang/go#59928
  • Loading branch information
serprex authored Jan 31, 2024
1 parent 4ad61a7 commit ee02c15
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 148 deletions.
66 changes: 22 additions & 44 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

// TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores.
limits := peerflow.CDCFlowLimits{
Expand All @@ -210,8 +209,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_no_data")
dstTableName := "test_no_data"
Expand Down Expand Up @@ -251,8 +249,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_char_coltype")
dstTableName := "test_char_coltype"
Expand Down Expand Up @@ -295,8 +292,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
// The test inserts 10 rows into the source table and verifies that the data is
// correctly synced to the destination table after sync flow completes.
func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_simple_flow_bq")
dstTableName := "test_simple_flow_bq"
Expand Down Expand Up @@ -359,8 +355,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_toast_bq_1")
dstTableName := "test_toast_bq_1"
Expand Down Expand Up @@ -419,8 +414,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_toast_bq_3")
dstTableName := "test_toast_bq_3"
Expand Down Expand Up @@ -485,8 +479,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_toast_bq_4")
dstTableName := "test_toast_bq_4"
Expand Down Expand Up @@ -543,8 +536,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_toast_bq_5")
dstTableName := "test_toast_bq_5"
Expand Down Expand Up @@ -601,8 +593,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_types_bq")
dstTableName := "test_types_bq"
Expand Down Expand Up @@ -703,8 +694,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_nans_bq")
dstTableName := "test_nans_bq"
Expand Down Expand Up @@ -755,8 +745,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_invalid_geo_bq_avro_cdc")
dstTableName := "test_invalid_geo_bq_avro_cdc"
Expand Down Expand Up @@ -836,8 +825,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTable1Name := s.attachSchemaSuffix("test1_bq")
dstTable1Name := "test1_bq"
Expand Down Expand Up @@ -895,8 +883,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {

// TODO: not checking schema exactly, add later
func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

tableName := "test_simple_schema_changes"
srcTableName := s.attachSchemaSuffix(tableName)
Expand Down Expand Up @@ -982,8 +969,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

tableName := "test_simple_cpkey"
srcTableName := s.attachSchemaSuffix("test_simple_cpkey")
Expand Down Expand Up @@ -1046,8 +1032,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast1")
dstTableName := "test_cpkey_toast1"
Expand Down Expand Up @@ -1112,8 +1097,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

tableName := "test_cpkey_toast2"
srcTableName := s.attachSchemaSuffix("test_cpkey_toast2")
Expand Down Expand Up @@ -1175,8 +1159,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_peerdb_cols")
dstTableName := "test_peerdb_cols_dst"
Expand Down Expand Up @@ -1235,8 +1218,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTable1Name := s.attachSchemaSuffix("test1_bq")
dstTable1Name := "test1_bq"
Expand Down Expand Up @@ -1300,8 +1282,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

tableName := "test_softdel"
srcName := "test_softdel_src"
Expand Down Expand Up @@ -1384,8 +1365,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel_iud")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
Expand Down Expand Up @@ -1466,8 +1446,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcName := "test_softdel_ud_src"
srcTableName := s.attachSchemaSuffix(srcName)
Expand Down Expand Up @@ -1555,8 +1534,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

tableName := "test_softdel_iad"
srcTableName := s.attachSchemaSuffix(tableName)
Expand Down
6 changes: 2 additions & 4 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

numRows := 10

Expand Down Expand Up @@ -48,8 +47,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

numRows := 10

Expand Down
39 changes: 13 additions & 26 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func (s PeerFlowE2ETestSuitePG) WaitForSchema(
}

func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_simple_flow")
dstTableName := s.attachSchemaSuffix("test_simple_flow_dst")
Expand Down Expand Up @@ -136,8 +135,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_geospatial_pg")
dstTableName := s.attachSchemaSuffix("test_geospatial_pg_dst")
Expand Down Expand Up @@ -190,8 +188,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_types_pg")
dstTableName := s.attachSchemaSuffix("test_types_pg_dst")
Expand Down Expand Up @@ -264,8 +261,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_enum_flow")
dstTableName := s.attachSchemaSuffix("test_enum_flow_dst")
Expand Down Expand Up @@ -321,8 +317,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_simple_schema_changes")
dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst")
Expand Down Expand Up @@ -445,8 +440,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_simple_cpkey")
dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst")
Expand Down Expand Up @@ -509,8 +503,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast1")
randomString := s.attachSchemaSuffix("random_string")
Expand Down Expand Up @@ -587,8 +580,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast2")
randomString := s.attachSchemaSuffix("random_string")
Expand Down Expand Up @@ -658,8 +650,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_peerdb_cols")
dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst")
Expand Down Expand Up @@ -719,8 +710,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
}

func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
Expand Down Expand Up @@ -809,8 +799,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
}

func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel_iud")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
Expand Down Expand Up @@ -894,8 +883,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() {
}

func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

cmpTableName := s.attachSchemaSuffix("test_softdel_ud")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
Expand Down Expand Up @@ -982,8 +970,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
}

func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_softdel_iad")
dstTableName := s.attachSchemaSuffix("test_softdel_iad_dst")
Expand Down
6 changes: 2 additions & 4 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() {
}

func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

numRows := 10

Expand Down Expand Up @@ -271,8 +270,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

numRows := 10

Expand Down
3 changes: 1 addition & 2 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string {
}

func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

srcTableName := s.attachSchemaSuffix("test_simple_flow_s3")
dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3")
Expand Down
Loading

0 comments on commit ee02c15

Please sign in to comment.