Skip to content

Commit

Permalink
Replace slog with t.Log in e2e tests (#1188)
Browse files Browse the repository at this point in the history
Global logger should not be used with parallel testing
  • Loading branch information
serprex authored Jan 31, 2024
1 parent fddae78 commit 31f2641
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 55 deletions.
17 changes: 6 additions & 11 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -67,8 +66,7 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {

err := s.bqHelper.DropDataset(s.bqHelper.Config.DatasetId)
if err != nil {
slog.Error("failed to tear down bigquery", slog.Any("error", err))
s.t.FailNow()
s.t.Fatalf("failed to tear down bigquery: %v", err)
}
})
}
Expand Down Expand Up @@ -146,14 +144,12 @@ func setupBigQuery(t *testing.T) *BigQueryTestHelper {

bqHelper, err := NewBigQueryTestHelper()
if err != nil {
slog.Error("Error in test", slog.Any("error", err))
t.FailNow()
t.Fatalf("Failed to create helper: %v", err)
}

err = bqHelper.RecreateDataset()
if err != nil {
slog.Error("Error in test", slog.Any("error", err))
t.FailNow()
t.Fatalf("Failed to recreate dataset: %v", err)
}

return bqHelper
Expand All @@ -167,16 +163,15 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
if err != nil {
// it's okay if the .env file is not present
// we will use the default values
slog.Info("Unable to load .env file, using default values from env")
t.Log("Unable to load .env file, using default values from env")
}

suffix := shared.RandomString(8)
tsSuffix := time.Now().Format("20060102150405")
bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix)
conn, err := e2e.SetupPostgres(bqSuffix)
conn, err := e2e.SetupPostgres(t, bqSuffix)
if err != nil || conn == nil {
slog.Error("failed to setup postgres", slog.Any("error", err))
t.FailNow()
t.Fatalf("failed to setup postgres: %v", err)
}

bq := setupBigQuery(t)
Expand Down
16 changes: 10 additions & 6 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package e2e
import (
"context"
"fmt"
"log/slog"
"testing"
"time"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -72,7 +72,9 @@ func cleanPostgres(conn *pgx.Conn, suffix string) error {
return nil
}

func setupPostgresSchema(conn *pgx.Conn, suffix string) error {
func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error {
t.Helper()

setupTx, err := conn.Begin(context.Background())
if err != nil {
return fmt.Errorf("failed to start setup transaction")
Expand All @@ -86,7 +88,7 @@ func setupPostgresSchema(conn *pgx.Conn, suffix string) error {
defer func() {
deferErr := setupTx.Rollback(context.Background())
if deferErr != pgx.ErrTxClosed && deferErr != nil {
slog.Error("error rolling back setup transaction", slog.Any("error", err))
t.Errorf("error rolling back setup transaction: %v", err)
}
}()

Expand Down Expand Up @@ -117,8 +119,10 @@ func setupPostgresSchema(conn *pgx.Conn, suffix string) error {
return setupTx.Commit(context.Background())
}

// setupPostgres sets up the postgres connection.
func SetupPostgres(suffix string) (*pgx.Conn, error) {
// SetupPostgres sets up the postgres connection.
func SetupPostgres(t *testing.T, suffix string) (*pgx.Conn, error) {
t.Helper()

conn, err := pgx.Connect(context.Background(), utils.GetPGConnectionString(GetTestPostgresConf()))
if err != nil {
return nil, fmt.Errorf("failed to create postgres connection: %w", err)
Expand All @@ -130,7 +134,7 @@ func SetupPostgres(suffix string) (*pgx.Conn, error) {
return nil, err
}

err = setupPostgresSchema(conn, suffix)
err = setupPostgresSchema(t, conn, suffix)
if err != nil {
conn.Close(context.Background())
return nil, err
Expand Down
14 changes: 5 additions & 9 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package e2e_postgres
import (
"context"
"fmt"
"log/slog"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -54,11 +53,11 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG {
if err != nil {
// it's okay if the .env file is not present
// we will use the default values
slog.Info("Unable to load .env file, using default values from env")
t.Log("Unable to load .env file, using default values from env")
}

suffix := "pg_" + strings.ToLower(shared.RandomString(8))
conn, err := e2e.SetupPostgres(suffix)
conn, err := e2e.SetupPostgres(t, suffix)
if err != nil {
require.Fail(t, "failed to setup postgres", err)
}
Expand Down Expand Up @@ -197,7 +196,6 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() {
require.NoError(s.t, err)

flowJobName := "test_simple_slot_creation"
flowLog := slog.String(string(shared.FlowNameKey), flowJobName)
setupReplicationInput := &protos.SetupReplicationInput{
FlowJobName: flowJobName,
TableNameMapping: map[string]string{
Expand All @@ -212,16 +210,14 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() {
setupError <- s.connector.SetupReplication(signal, setupReplicationInput)
}()

slog.Info("waiting for slot creation to complete", flowLog)
s.t.Log("waiting for slot creation to complete: ", flowJobName)
slotInfo := <-signal.SlotCreated
slog.Info(fmt.Sprintf("slot creation complete: %v", slotInfo), flowLog)

slog.Info("signaling clone complete after waiting for 2 seconds", flowLog)
s.t.Logf("slot creation complete: %v. Signaling clone complete in 2 seconds", slotInfo)
time.Sleep(2 * time.Second)
close(signal.CloneComplete)

require.NoError(s.t, <-setupError)
slog.Info("successfully setup replication", flowLog)
s.t.Logf("successfully setup replication: %s", flowJobName)
}

func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
Expand Down
5 changes: 2 additions & 3 deletions flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package e2e_s3
import (
"context"
"fmt"
"log/slog"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -68,11 +67,11 @@ func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 {
if err != nil {
// it's okay if the .env file is not present
// we will use the default values
slog.Info("Unable to load .env file, using default values from env")
t.Log("Unable to load .env file, using default values from env")
}

suffix := "s3_" + strings.ToLower(shared.RandomString(8))
conn, err := e2e.SetupPostgres(suffix)
conn, err := e2e.SetupPostgres(t, suffix)
if err != nil || conn == nil {
require.Fail(t, "failed to setup postgres", err)
}
Expand Down
5 changes: 0 additions & 5 deletions flow/e2e/s3/s3_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"time"

Expand Down Expand Up @@ -99,10 +98,8 @@ func (h *S3TestHelper) ListAllFiles(
Prefix: &Prefix,
})
if err != nil {
slog.Error("failed to list bucket files", slog.Any("error", err))
return nil, err
}
slog.Info(fmt.Sprintf("Files in ListAllFiles in S3 test: %v", files))
return files.Contents, nil
}

Expand All @@ -115,7 +112,6 @@ func (h *S3TestHelper) CleanUp(ctx context.Context) error {
Prefix: &Prefix,
})
if err != nil {
slog.Error("failed to list bucket files", slog.Any("error", err))
return err
}

Expand All @@ -132,6 +128,5 @@ func (h *S3TestHelper) CleanUp(ctx context.Context) error {
}
}

slog.Info("Deletion completed.")
return nil
}
17 changes: 6 additions & 11 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -62,15 +61,13 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) {
if s.sfHelper != nil {
err := s.sfHelper.Cleanup()
if err != nil {
slog.Error("failed to tear down Snowflake", slog.Any("error", err))
s.t.FailNow()
s.t.Fatalf("failed to tear down Snowflake: %v", err)
}
}

err := s.connector.Close()
if err != nil {
slog.Error("failed to close Snowflake connector", slog.Any("error", err))
s.t.FailNow()
s.t.Fatalf("failed to close Snowflake connector: %v", err)
}
})
}
Expand All @@ -90,23 +87,21 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF {
if err != nil {
// it's okay if the .env file is not present
// we will use the default values
slog.Info("Unable to load .env file, using default values from env")
t.Log("Unable to load .env file, using default values from env")
}

suffix := shared.RandomString(8)
tsSuffix := time.Now().Format("20060102150405")
pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix)

conn, err := e2e.SetupPostgres(pgSuffix)
conn, err := e2e.SetupPostgres(t, pgSuffix)
if err != nil || conn == nil {
slog.Error("failed to setup Postgres", slog.Any("error", err))
t.FailNow()
t.Fatalf("failed to setup Postgres: %v", err)
}

sfHelper, err := NewSnowflakeTestHelper()
if err != nil {
slog.Error("failed to setup Snowflake", slog.Any("error", err))
t.FailNow()
t.Fatalf("failed to setup Snowflake: %v", err)
}

connector, err := connsnowflake.NewSnowflakeConnector(
Expand Down
7 changes: 2 additions & 5 deletions flow/e2e/snowflake/snowflake_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package e2e_snowflake
import (
"context"
"fmt"
"log/slog"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -28,17 +27,15 @@ func setupSchemaDeltaSuite(t *testing.T) SnowflakeSchemaDeltaTestSuite {

sfTestHelper, err := NewSnowflakeTestHelper()
if err != nil {
slog.Error("Error in test", slog.Any("error", err))
t.FailNow()
t.Fatalf("Error in test: %v", err)
}

connector, err := connsnowflake.NewSnowflakeConnector(
context.Background(),
sfTestHelper.Config,
)
if err != nil {
slog.Error("Error in test", slog.Any("error", err))
t.FailNow()
t.Fatalf("Error in test: %v", err)
}

return SnowflakeSchemaDeltaTestSuite{
Expand Down
5 changes: 2 additions & 3 deletions flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package e2e_sqlserver
import (
"context"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -61,11 +60,11 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSQLServer {
if err != nil {
// it's okay if the .env file is not present
// we will use the default values
slog.Info("Unable to load .env file, using default values from env")
t.Log("Unable to load .env file, using default values from env")
}

suffix := "sqls_" + strings.ToLower(shared.RandomString(8))
conn, err := e2e.SetupPostgres(suffix)
conn, err := e2e.SetupPostgres(t, suffix)
if err != nil {
require.NoError(t, err)
}
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen
var state peerflow.CDCFlowWorkflowState
err = response.Get(&state)
if err != nil {
slog.Error(err.Error())
t.Log(err.Error())
} else if state.CurrentFlowStatus == protos.FlowStatus_STATUS_RUNNING {
return
}
Expand All @@ -191,7 +191,7 @@ func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironmen
runtime.Goexit()
} else if counter > 5 {
// log the error for informational purposes
slog.Error(err.Error())
t.Log(err.Error())
}
}
}
Expand Down

0 comments on commit 31f2641

Please sign in to comment.