Skip to content

Commit

Permalink
e2e/s3: remove use of testify/suite
Browse files Browse the repository at this point in the history
Remove setupS3, calling it after setup leaked initial s3Helper,
instead test creates helper & defers cleanup within test using
  • Loading branch information
serprex committed Jan 1, 2024
1 parent 529079b commit 7f61ec0
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 109 deletions.
90 changes: 45 additions & 45 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,17 @@ import (
"github.com/stretchr/testify/require"
)

func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string {
return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName)
func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string {
return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName)
}

func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string {
return fmt.Sprintf("%s_%s", input, s3Suffix)
func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string {
return fmt.Sprintf("%s_%s", input, s.suffix)
}

func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.T(), env)

setupErr := s.setupS3("s3")
if setupErr != nil {
s.Fail("failed to setup S3", setupErr)
}
func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_simple_flow_s3")
dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3")
Expand All @@ -37,7 +32,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
value TEXT NOT NULL
);
`, srcTableName))
s.NoError(err)
require.NoError(s.t, err)
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: flowJobName,
TableNameMapping: map[string]string{srcTableName: dstTableName},
Expand All @@ -46,7 +41,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
Expand All @@ -56,17 +51,17 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
s.NoError(err)
require.NoError(s.t, err)
// insert 20 rows
for i := 1; i <= 20; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
s.NoError(err)
require.NoError(s.t, err)
}
s.NoError(err)
require.NoError(s.t, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand All @@ -77,48 +72,55 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {

// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")
require.Contains(s.t, err.Error(), "continue as new")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.T().Logf("JobName: %s", flowJobName)
s.t.Logf("JobName: %s", flowJobName)
files, err := s.s3Helper.ListAllFiles(ctx, flowJobName)
s.T().Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files))
require.NoError(s.T(), err)
s.t.Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files))
require.NoError(s.t, err)

require.Equal(s.T(), 4, len(files))
require.Equal(s.t, 4, len(files))

env.AssertExpectations(s.T())
env.AssertExpectations(s.t)
}

func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.T(), env)
setupErr := s.setupS3("gcs")
if setupErr != nil {
s.Fail("failed to setup S3", setupErr)
func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

helper, err := NewS3TestHelper(true)
if err != nil {
require.Fail(s.t, "failed to setup S3", err)
}
defer func() {
err = s.s3Helper.CleanUp()
if err != nil {
require.Fail(s.t, "failed to clean up s3", err)
}
}()

srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop")
dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop")
flowJobName := s.attachSuffix("test_simple_flow_gcs_interop")
_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))
s.NoError(err)
require.NoError(s.t, err)
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: flowJobName,
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.s3Helper.GetPeer(),
Destination: helper.GetPeer(),
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
Expand All @@ -128,18 +130,17 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
s.NoError(err)
require.NoError(s.t, err)
// insert 20 rows
for i := 1; i <= 20; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
s.NoError(err)
require.NoError(s.t, err)
}
s.T().Log("Inserted 20 rows into the source table")
s.NoError(err)
require.NoError(s.t, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand All @@ -149,17 +150,16 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")
require.Contains(s.t, err.Error(), "continue as new")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.T().Logf("JobName: %s", flowJobName)
files, err := s.s3Helper.ListAllFiles(ctx, flowJobName)
s.T().Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files))
require.NoError(s.T(), err)
s.t.Logf("JobName: %s", flowJobName)
files, err := helper.ListAllFiles(ctx, flowJobName)
s.t.Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files))
require.NoError(s.t, err)

require.Equal(s.T(), 4, len(files))
require.Equal(s.t, 4, len(files))

env.AssertExpectations(s.T())
env.AssertExpectations(s.t)
}
Loading

0 comments on commit 7f61ec0

Please sign in to comment.