Skip to content

Commit

Permalink
Redirect workflow logs to testing logs
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 22, 2023
1 parent 1fe80ac commit 3cf0e97
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 70 deletions.
32 changes: 16 additions & 16 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s PeerFlowE2ETestSuiteBQ) TearDownSuite() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

// TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores.
Expand All @@ -167,7 +167,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_no_data")
Expand Down Expand Up @@ -211,7 +211,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_char_coltype")
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
// The test inserts 10 rows into the source table and verifies that the data is
// correctly synced to the destination table after sync flow completes.
func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_simple_flow_bq")
Expand Down Expand Up @@ -325,7 +325,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_toast_bq_1")
Expand Down Expand Up @@ -393,7 +393,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_toast_bq_2")
Expand Down Expand Up @@ -457,7 +457,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_toast_bq_3")
Expand Down Expand Up @@ -531,7 +531,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_toast_bq_4")
Expand Down Expand Up @@ -598,7 +598,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_toast_bq_5")
Expand Down Expand Up @@ -665,7 +665,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_types_bq")
Expand Down Expand Up @@ -744,7 +744,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTable1Name := s.attachSchemaSuffix("test1_bq")
Expand Down Expand Up @@ -806,7 +806,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {

// TODO: not checking schema exactly, add later
func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_simple_schema_changes")
Expand Down Expand Up @@ -906,7 +906,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_simple_cpkey")
Expand Down Expand Up @@ -979,7 +979,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast1")
Expand Down Expand Up @@ -1056,7 +1056,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast2")
Expand Down Expand Up @@ -1129,7 +1129,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_peerdb_cols")
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr
}

func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

numRows := 10
Expand Down Expand Up @@ -82,7 +82,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
}

func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

numRows := 10
Expand Down
12 changes: 6 additions & 6 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, ro
}

func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_simple_flow")
Expand Down Expand Up @@ -105,7 +105,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_simple_schema_changes")
Expand Down Expand Up @@ -266,7 +266,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_simple_cpkey")
Expand Down Expand Up @@ -341,7 +341,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast1")
Expand Down Expand Up @@ -422,7 +422,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_cpkey_toast2")
Expand Down Expand Up @@ -499,7 +499,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

srcTableName := s.attachSchemaSuffix("test_peerdb_cols")
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error {
}

func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

numRows := 10
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
}

func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

numRows := 10
Expand Down
24 changes: 11 additions & 13 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string {
}

func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

helper, setupErr := setupS3("s3")
Expand Down Expand Up @@ -62,9 +62,8 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
for i := 1; i <= 20; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
_, err = s.pool.Exec(context.Background(),
fmt.Sprintf("INSERT INTO %s (key, value) VALUES ($1, $2)", srcTableName), testKey, testValue)
require.NoError(s.t, err)
}
require.NoError(s.t, err)
Expand All @@ -82,9 +81,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
fmt.Println("JobName: ", flowJobName)
s.t.Log("JobName", flowJobName)
files, err := s.s3Helper.ListAllFiles(ctx, flowJobName)
fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files))
s.t.Log("Files in Test_Complete_Simple_Flow_S3: ", len(files))
require.NoError(s.t, err)

require.Equal(s.t, 4, len(files))
Expand All @@ -93,8 +92,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
}

func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

helper, setupErr := setupS3("gcs")
if setupErr != nil {
require.Fail(s.t, "failed to setup S3", setupErr)
Expand Down Expand Up @@ -135,12 +135,10 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
for i := 1; i <= 20; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
_, err = s.pool.Exec(context.Background(),
fmt.Sprintf("INSERT INTO %s (key, value) VALUES ($1, $2)", srcTableName), testKey, testValue)
require.NoError(s.t, err)
}
fmt.Println("Inserted 20 rows into the source table")
require.NoError(s.t, err)
}()

Expand All @@ -156,9 +154,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
fmt.Println("JobName: ", flowJobName)
s.t.Log("JobName", flowJobName)
files, err := s.s3Helper.ListAllFiles(ctx, flowJobName)
fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files))
s.t.Log("Files in Test_Complete_Simple_Flow_GCS: ", len(files))
require.NoError(s.t, err)

require.Equal(s.t, 4, len(files))
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() {
s.t.Skip("Skipping S3 test")
}

env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

jobName := "test_complete_flow_s3"
Expand Down Expand Up @@ -137,7 +137,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() {
s.t.Skip("Skipping S3 test")
}

env := e2e.NewTemporalTestWorkflowEnvironment()
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)
e2e.RegisterWorkflowsAndActivities(env, s.t)

jobName := "test_complete_flow_s3_ctid"
Expand Down
Loading

0 comments on commit 3cf0e97

Please sign in to comment.