Skip to content

Commit

Permalink
Remove testify/suite from connectors tests (#916)
Browse files Browse the repository at this point in the history
Splitting up changes from #871
  • Loading branch information
serprex authored Dec 27, 2023
1 parent 6c57ce4 commit e4a71dc
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 139 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 1200s
gotestsum --format testname -- -p 16 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
61 changes: 30 additions & 31 deletions flow/connectors/utils/cdc_records/cdc_records_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@ package cdc_records

import (
"crypto/rand"
"testing"

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/testsuite"
"github.com/stretchr/testify/require"
)

type CDCRecordStorageTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
}
func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) {
t.Helper()

func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.Record) {
pkeyColVal := make([]byte, 0, 32)
pkeyColVal := make([]byte, 32)
_, err := rand.Read(pkeyColVal)
s.NoError(err)
require.NoError(t, err)

key := model.TableWithPkey{
TableName: "test_src_tbl",
Expand All @@ -39,50 +36,52 @@ func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.R
return key, rec
}

func (s *CDCRecordStorageTestSuite) TestSingleRecord() {
func TestSingleRecord(t *testing.T) {
t.Parallel()
cdcRecordsStore := NewCDCRecordsStore("test_single_record")
cdcRecordsStore.numRecordsSwitchThreshold = 10

key, rec := s.genKeyAndRec()
key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
s.NoError(err)
require.NoError(t, err)
// should not spill into DB
s.Equal(1, len(cdcRecordsStore.inMemoryRecords))
s.Nil(cdcRecordsStore.pebbleDB)
require.Equal(t, 1, len(cdcRecordsStore.inMemoryRecords))
require.Nil(t, cdcRecordsStore.pebbleDB)

reck, ok, err := cdcRecordsStore.Get(key)
s.NoError(err)
s.True(ok)
s.Equal(rec, reck)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, rec, reck)

s.NoError(cdcRecordsStore.Close())
require.NoError(t, cdcRecordsStore.Close())
}

func (s *CDCRecordStorageTestSuite) TestRecordsTillSpill() {
func TestRecordsTillSpill(t *testing.T) {
t.Parallel()
cdcRecordsStore := NewCDCRecordsStore("test_records_till_spill")
cdcRecordsStore.numRecordsSwitchThreshold = 10

// add records upto set limit
for i := 0; i < 10; i++ {
key, rec := s.genKeyAndRec()
key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
s.NoError(err)
s.Equal(i+1, len(cdcRecordsStore.inMemoryRecords))
s.Nil(cdcRecordsStore.pebbleDB)
require.NoError(t, err)
require.Equal(t, i+1, len(cdcRecordsStore.inMemoryRecords))
require.Nil(t, cdcRecordsStore.pebbleDB)
}

// this record should be spilled to DB
key, rec := s.genKeyAndRec()
key, rec := genKeyAndRec(t)
err := cdcRecordsStore.Set(key, rec)
s.NoError(err)
require.NoError(t, err)
_, ok := cdcRecordsStore.inMemoryRecords[key]
s.False(ok)
s.NotNil(cdcRecordsStore.pebbleDB)
require.False(t, ok)
require.NotNil(t, cdcRecordsStore.pebbleDB)

reck, ok, err := cdcRecordsStore.Get(key)
s.NoError(err)
s.True(ok)
s.Equal(rec, reck)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, rec, reck)

s.NoError(cdcRecordsStore.Close())
require.NoError(t, cdcRecordsStore.Close())
}
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
peer_bq "github.com/PeerDB-io/peer-flow/connectors/bigquery"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -46,7 +46,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) {
return nil, fmt.Errorf("TEST_BQ_CREDS env var not set")
}

content, err := e2e.ReadFileToBytes(jsonPath)
content, err := e2eshared.ReadFileToBytes(jsonPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
Expand Down
39 changes: 12 additions & 27 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand All @@ -30,19 +31,18 @@ type PeerFlowE2ETestSuiteBQ struct {
}

func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteBQ {
t.Helper()

g := got.New(t)
g.Parallel()

suite := setupSuite(t, g)

g.Cleanup(func() {
suite.tearDownSuite()
})
e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) {
err := e2e.TearDownPostgres(s.pool, s.bqSuffix)
if err != nil {
slog.Error("failed to tear down postgres", slog.Any("error", err))
s.FailNow()
}

return suite
err = s.bqHelper.DropDataset(s.bqHelper.datasetName)
if err != nil {
slog.Error("failed to tear down bigquery", slog.Any("error", err))
s.FailNow()
}
})
}

Expand Down Expand Up @@ -148,21 +148,6 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ {
}
}

// Implement TearDownAllSuite interface to tear down the test suite
func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() {
err := e2e.TearDownPostgres(s.pool, s.bqSuffix)
if err != nil {
slog.Error("failed to tear down postgres", slog.Any("error", err))
s.FailNow()
}

err = s.bqHelper.DropDataset(s.bqHelper.datasetName)
if err != nil {
slog.Error("failed to tear down bigquery", slog.Any("error", err))
s.FailNow()
}
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/s3/s3_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
Expand All @@ -34,7 +34,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) {
bucketName = "peerdb_staging"
}

content, err := e2e.ReadFileToBytes(credsPath)
content, err := e2eshared.ReadFileToBytes(credsPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
Expand Down
53 changes: 19 additions & 34 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand All @@ -33,19 +34,27 @@ type PeerFlowE2ETestSuiteSF struct {
}

func TestPeerFlowE2ETestSuiteSF(t *testing.T) {
got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteSF {
t.Helper()

g := got.New(t)
g.Parallel()
e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) {
err := e2e.TearDownPostgres(s.pool, s.pgSuffix)
if err != nil {
slog.Error("failed to tear down Postgres", slog.Any("error", err))
s.FailNow()
}

suite := SetupSuite(t, g)
if s.sfHelper != nil {
err = s.sfHelper.Cleanup()
if err != nil {
slog.Error("failed to tear down Snowflake", slog.Any("error", err))
s.FailNow()
}
}

g.Cleanup(func() {
suite.tearDownSuite()
})
err = s.connector.Close()

return suite
if err != nil {
slog.Error("failed to close Snowflake connector", slog.Any("error", err))
s.FailNow()
}
})
}

Expand Down Expand Up @@ -101,30 +110,6 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF {
return suite
}

// Implement TearDownAllSuite interface to tear down the test suite
func (s PeerFlowE2ETestSuiteSF) tearDownSuite() {
err := e2e.TearDownPostgres(s.pool, s.pgSuffix)
if err != nil {
slog.Error("failed to tear down Postgres", slog.Any("error", err))
s.FailNow()
}

if s.sfHelper != nil {
err = s.sfHelper.Cleanup()
if err != nil {
slog.Error("failed to tear down Snowflake", slog.Any("error", err))
s.FailNow()
}
}

err = s.connector.Close()

if err != nil {
slog.Error("failed to close Snowflake connector", slog.Any("error", err))
s.FailNow()
}
}

func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -37,7 +37,7 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) {
return nil, fmt.Errorf("TEST_SF_CREDS env var not set")
}

content, err := e2e.ReadFileToBytes(jsonPath)
content, err := e2eshared.ReadFileToBytes(jsonPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
Expand Down
24 changes: 4 additions & 20 deletions flow/e2e/snowflake/snowflake_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/ysmood/got"
Expand Down Expand Up @@ -58,13 +59,6 @@ func setupSchemaDeltaSuite(
}
}

func (suite SnowflakeSchemaDeltaTestSuite) tearDownSuite() {
err := suite.sfTestHelper.Cleanup()
suite.failTestError(err)
err = suite.connector.Close()
suite.failTestError(err)
}

func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() {
tableName := fmt.Sprintf("%s.SIMPLE_ADD_COLUMN", schemaDeltaTestSchemaName)
err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName))
Expand Down Expand Up @@ -222,18 +216,8 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() {
}

func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) {
got.Each(t, func(t *testing.T) SnowflakeSchemaDeltaTestSuite {
t.Helper()

g := got.New(t)
g.Parallel()

suite := setupSchemaDeltaSuite(t, g)

g.Cleanup(func() {
suite.tearDownSuite()
})

return suite
e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(suite SnowflakeSchemaDeltaTestSuite) {
suite.failTestError(suite.sfTestHelper.Cleanup())
suite.failTestError(suite.connector.Close())
})
}
20 changes: 0 additions & 20 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"strings"
Expand All @@ -24,25 +23,6 @@ import (
"go.temporal.io/sdk/testsuite"
)

// ReadFileToBytes reads a file to a byte array.
func ReadFileToBytes(path string) ([]byte, error) {
var ret []byte

f, err := os.Open(path)
if err != nil {
return ret, fmt.Errorf("failed to open file: %w", err)
}

defer f.Close()

ret, err = io.ReadAll(f)
if err != nil {
return ret, fmt.Errorf("failed to read file: %w", err)
}

return ret, nil
}

func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnvironment) {
t.Helper()

Expand Down
Loading

0 comments on commit e4a71dc

Please sign in to comment.