From 31f26415411f6e2e47e4924e94d89d1206b31c9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 31 Jan 2024 20:19:37 +0000 Subject: [PATCH] Replace slog with t.Log in e2e tests (#1188) Global logger should not be used with parallel testing --- flow/e2e/bigquery/peer_flow_bq_test.go | 17 ++++++----------- flow/e2e/congen.go | 16 ++++++++++------ flow/e2e/postgres/qrep_flow_pg_test.go | 14 +++++--------- flow/e2e/s3/qrep_flow_s3_test.go | 5 ++--- flow/e2e/s3/s3_helper.go | 5 ----- flow/e2e/snowflake/peer_flow_sf_test.go | 17 ++++++----------- .../snowflake/snowflake_schema_delta_test.go | 7 ++----- flow/e2e/sqlserver/qrep_flow_sqlserver_test.go | 5 ++--- flow/e2e/test_utils.go | 4 ++-- 9 files changed, 35 insertions(+), 55 deletions(-) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 17cd3ecbea..3005385335 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log/slog" "strings" "testing" "time" @@ -67,8 +66,7 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId) if err != nil { - slog.Error("failed to tear down bigquery", slog.Any("error", err)) - s.t.FailNow() + s.t.Fatalf("failed to tear down bigquery: %v", err) } }) } @@ -146,14 +144,12 @@ func setupBigQuery(t *testing.T) *BigQueryTestHelper { bqHelper, err := NewBigQueryTestHelper() if err != nil { - slog.Error("Error in test", slog.Any("error", err)) - t.FailNow() + t.Fatalf("Failed to create helper: %v", err) } err = bqHelper.RecreateDataset() if err != nil { - slog.Error("Error in test", slog.Any("error", err)) - t.FailNow() + t.Fatalf("Failed to recreate dataset: %v", err) } return bqHelper @@ -167,16 +163,15 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { if err != nil { // it's okay if the .env file is not present // we will use the default values - slog.Info("Unable to load .env file, using default values from env") + t.Log("Unable to load .env file, using default values from env") } suffix := shared.RandomString(8) tsSuffix := time.Now().Format("20060102150405") bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) - conn, err := e2e.SetupPostgres(bqSuffix) + conn, err := e2e.SetupPostgres(t, bqSuffix) if err != nil || conn == nil { - slog.Error("failed to setup postgres", slog.Any("error", err)) - t.FailNow() + t.Fatalf("failed to setup postgres: %v", err) } bq := setupBigQuery(t) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index c6b02138d1..73450d130d 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -3,7 +3,7 @@ package e2e import ( "context" "fmt" - "log/slog" + "testing" "time" "github.com/jackc/pgx/v5" @@ -72,7 +72,9 @@ func cleanPostgres(conn *pgx.Conn, suffix string) error { return nil } -func setupPostgresSchema(conn *pgx.Conn, suffix string) error { +func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error { + t.Helper() + setupTx, err := conn.Begin(context.Background()) if err != nil { return fmt.Errorf("failed to start setup transaction") @@ -86,7 +88,7 @@ func setupPostgresSchema(conn *pgx.Conn, suffix string) error { defer func() { deferErr := setupTx.Rollback(context.Background()) if deferErr != pgx.ErrTxClosed && deferErr != nil { - slog.Error("error rolling back setup transaction", slog.Any("error", err)) + t.Errorf("error rolling back setup transaction: %v", err) } }() @@ -117,8 +119,10 @@ func setupPostgresSchema(conn *pgx.Conn, suffix string) error { return setupTx.Commit(context.Background()) } -// setupPostgres sets up the postgres connection. -func SetupPostgres(suffix string) (*pgx.Conn, error) { +// SetupPostgres sets up the postgres connection. +func SetupPostgres(t *testing.T, suffix string) (*pgx.Conn, error) { + t.Helper() + conn, err := pgx.Connect(context.Background(), utils.GetPGConnectionString(GetTestPostgresConf())) if err != nil { return nil, fmt.Errorf("failed to create postgres connection: %w", err) @@ -130,7 +134,7 @@ func SetupPostgres(suffix string) (*pgx.Conn, error) { return nil, err } - err = setupPostgresSchema(conn, suffix) + err = setupPostgresSchema(t, conn, suffix) if err != nil { conn.Close(context.Background()) return nil, err diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index f1e94aa76b..d7f04b7807 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -3,7 +3,6 @@ package e2e_postgres import ( "context" "fmt" - "log/slog" "strings" "testing" "time" @@ -54,11 +53,11 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { if err != nil { // it's okay if the .env file is not present // we will use the default values - slog.Info("Unable to load .env file, using default values from env") + t.Log("Unable to load .env file, using default values from env") } suffix := "pg_" + strings.ToLower(shared.RandomString(8)) - conn, err := e2e.SetupPostgres(suffix) + conn, err := e2e.SetupPostgres(t, suffix) if err != nil { require.Fail(t, "failed to setup postgres", err) } @@ -197,7 +196,6 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { require.NoError(s.t, err) flowJobName := "test_simple_slot_creation" - flowLog := slog.String(string(shared.FlowNameKey), flowJobName) setupReplicationInput := &protos.SetupReplicationInput{ FlowJobName: flowJobName, TableNameMapping: map[string]string{ @@ -212,16 +210,14 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { setupError <- s.connector.SetupReplication(signal, setupReplicationInput) }() - slog.Info("waiting for slot creation to complete", flowLog) + s.t.Log("waiting for slot creation to complete: ", flowJobName) slotInfo := <-signal.SlotCreated - slog.Info(fmt.Sprintf("slot creation complete: %v", slotInfo), flowLog) - - slog.Info("signaling clone complete after waiting for 2 seconds", flowLog) + s.t.Logf("slot creation complete: %v. Signaling clone complete in 2 seconds", slotInfo) time.Sleep(2 * time.Second) close(signal.CloneComplete) require.NoError(s.t, <-setupError) - slog.Info("successfully setup replication", flowLog) + s.t.Logf("successfully setup replication: %s", flowJobName) } func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 30543ebdc8..a27d52171f 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -3,7 +3,6 @@ package e2e_s3 import ( "context" "fmt" - "log/slog" "strings" "testing" "time" @@ -68,11 +67,11 @@ func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { if err != nil { // it's okay if the .env file is not present // we will use the default values - slog.Info("Unable to load .env file, using default values from env") + t.Log("Unable to load .env file, using default values from env") } suffix := "s3_" + strings.ToLower(shared.RandomString(8)) - conn, err := e2e.SetupPostgres(suffix) + conn, err := e2e.SetupPostgres(t, suffix) if err != nil || conn == nil { require.Fail(t, "failed to setup postgres", err) } diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 1be1765927..2faa57aedc 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "log/slog" "os" "time" @@ -99,10 +98,8 @@ func (h *S3TestHelper) ListAllFiles( Prefix: &Prefix, }) if err != nil { - slog.Error("failed to list bucket files", slog.Any("error", err)) return nil, err } - slog.Info(fmt.Sprintf("Files in ListAllFiles in S3 test: %v", files)) return files.Contents, nil } @@ -115,7 +112,6 @@ func (h *S3TestHelper) CleanUp(ctx context.Context) error { Prefix: &Prefix, }) if err != nil { - slog.Error("failed to list bucket files", slog.Any("error", err)) return err } @@ -132,6 +128,5 @@ func (h *S3TestHelper) CleanUp(ctx context.Context) error { } } - slog.Info("Deletion completed.") return nil } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 6189119399..585439e843 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log/slog" "strings" "testing" "time" @@ -62,15 +61,13 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { if s.sfHelper != nil { err := s.sfHelper.Cleanup() if err != nil { - slog.Error("failed to tear down Snowflake", slog.Any("error", err)) - s.t.FailNow() + s.t.Fatalf("failed to tear down Snowflake: %v", err) } } err := s.connector.Close() if err != nil { - slog.Error("failed to close Snowflake connector", slog.Any("error", err)) - s.t.FailNow() + s.t.Fatalf("failed to close Snowflake connector: %v", err) } }) } @@ -90,23 +87,21 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { if err != nil { // it's okay if the .env file is not present // we will use the default values - slog.Info("Unable to load .env file, using default values from env") + t.Log("Unable to load .env file, using default values from env") } suffix := shared.RandomString(8) tsSuffix := time.Now().Format("20060102150405") pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) - conn, err := e2e.SetupPostgres(pgSuffix) + conn, err := e2e.SetupPostgres(t, pgSuffix) if err != nil || conn == nil { - slog.Error("failed to setup Postgres", slog.Any("error", err)) - t.FailNow() + t.Fatalf("failed to setup Postgres: %v", err) } sfHelper, err := NewSnowflakeTestHelper() if err != nil { - slog.Error("failed to setup Snowflake", slog.Any("error", err)) - t.FailNow() + t.Fatalf("failed to setup Snowflake: %v", err) } connector, err := connsnowflake.NewSnowflakeConnector( diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index f83d1ac679..6d72b584ad 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -3,7 +3,6 @@ package e2e_snowflake import ( "context" "fmt" - "log/slog" "testing" "github.com/stretchr/testify/require" @@ -28,8 +27,7 @@ func setupSchemaDeltaSuite(t *testing.T) SnowflakeSchemaDeltaTestSuite { sfTestHelper, err := NewSnowflakeTestHelper() if err != nil { - slog.Error("Error in test", slog.Any("error", err)) - t.FailNow() + t.Fatalf("Error in test: %v", err) } connector, err := connsnowflake.NewSnowflakeConnector( @@ -37,8 +35,7 @@ func setupSchemaDeltaSuite(t *testing.T) SnowflakeSchemaDeltaTestSuite { sfTestHelper.Config, ) if err != nil { - slog.Error("Error in test", slog.Any("error", err)) - t.FailNow() + t.Fatalf("Error in test: %v", err) } return SnowflakeSchemaDeltaTestSuite{ diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 0d95ab0de8..f16128d9f5 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -3,7 +3,6 @@ package e2e_sqlserver import ( "context" "fmt" - "log/slog" "os" "strconv" "strings" @@ -61,11 +60,11 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSQLServer { if err != nil { // it's okay if the .env file is not present // we will use the default values - slog.Info("Unable to load .env file, using default values from env") + t.Log("Unable to load .env file, using default values from env") } suffix := "sqls_" + strings.ToLower(shared.RandomString(8)) - conn, err := e2e.SetupPostgres(suffix) + conn, err := e2e.SetupPostgres(t, suffix) if err != nil { require.NoError(t, err) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index a066895b0c..9bacd8c2eb 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -181,7 +181,7 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen var state peerflow.CDCFlowWorkflowState err = response.Get(&state) if err != nil { - slog.Error(err.Error()) + t.Log(err.Error()) } else if state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING { return } @@ -191,7 +191,7 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen runtime.Goexit() } else if counter > 5 { // log the error for informational purposes - slog.Error(err.Error()) + t.Log(err.Error()) } } }