Skip to content

Commit

Permalink
e2e/s3: remove use of testify/suite (#947)
Browse files Browse the repository at this point in the history
Calling setupS3 after setup leaked initial s3Helper
Run test suite twice: once with s3, once with gcs
  • Loading branch information
serprex authored Jan 2, 2024
1 parent b60d327 commit fcfa74c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 161 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 16 ./... -timeout 1200s
gotestsum --format testname -- -p 24 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
115 changes: 18 additions & 97 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,17 @@ import (
"github.com/stretchr/testify/require"
)

func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string {
return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName)
func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string {
return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName)
}

func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string {
return fmt.Sprintf("%s_%s", input, s3Suffix)
func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string {
return fmt.Sprintf("%s_%s", input, s.suffix)
}

func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.T(), env)

setupErr := s.setupS3("s3")
if setupErr != nil {
s.Fail("failed to setup S3", setupErr)
}
func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_simple_flow_s3")
dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3")
Expand All @@ -37,79 +32,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
value TEXT NOT NULL
);
`, srcTableName))
s.NoError(err)
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: flowJobName,
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.s3Helper.GetPeer(),
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
ExitAfterRecords: 20,
MaxBatchSize: 5,
}

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
s.NoError(err)
// insert 20 rows
for i := 1; i <= 20; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
s.NoError(err)
}
s.NoError(err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.T().Logf("JobName: %s", flowJobName)
files, err := s.s3Helper.ListAllFiles(ctx, flowJobName)
s.T().Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files))
require.NoError(s.T(), err)

require.Equal(s.T(), 4, len(files))

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.T(), env)
setupErr := s.setupS3("gcs")
if setupErr != nil {
s.Fail("failed to setup S3", setupErr)
}

srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop")
dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop")
flowJobName := s.attachSuffix("test_simple_flow_gcs_interop")
_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))
s.NoError(err)
require.NoError(s.t, err)
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: flowJobName,
TableNameMapping: map[string]string{srcTableName: dstTableName},
Expand All @@ -118,7 +41,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
}

flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs()
s.NoError(err)
require.NoError(s.t, err)

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
Expand All @@ -128,18 +51,17 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {

go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
s.NoError(err)
require.NoError(s.t, err)
// insert 20 rows
for i := 1; i <= 20; i++ {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
s.NoError(err)
require.NoError(s.t, err)
}
s.T().Log("Inserted 20 rows into the source table")
s.NoError(err)
require.NoError(s.t, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand All @@ -149,17 +71,16 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
err = env.GetWorkflowError()

// allow only continue as new error
s.Error(err)
s.Contains(err.Error(), "continue as new")
require.Contains(s.t, err.Error(), "continue as new")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.T().Logf("JobName: %s", flowJobName)
s.t.Logf("JobName: %s", flowJobName)
files, err := s.s3Helper.ListAllFiles(ctx, flowJobName)
s.T().Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files))
require.NoError(s.T(), err)
s.t.Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files))
require.NoError(s.t, err)

require.Equal(s.T(), 4, len(files))
require.Equal(s.t, 4, len(files))

env.AssertExpectations(s.T())
env.AssertExpectations(s.t)
}
Loading

0 comments on commit fcfa74c

Please sign in to comment.