Skip to content

Commit

Permalink
Run worker integration tests in parallel (#2336)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Jul 24, 2024
1 parent 3365e8d commit 43d7d2c
Show file tree
Hide file tree
Showing 2 changed files with 380 additions and 304 deletions.
181 changes: 122 additions & 59 deletions worker/pkg/workflows/datasync/workflow/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,51 @@ import (
testpg "github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/modules/redis"
"github.com/testcontainers/testcontainers-go/wait"
"golang.org/x/sync/errgroup"
)

type PostgresTestContainer struct {
type postgresTestContainer struct {
pool *pgxpool.Pool
url string
}
type PostgresTest struct {
type postgresTest struct {
pool *pgxpool.Pool
testcontainer *testpg.PostgresContainer

source *PostgresTestContainer
target *PostgresTestContainer
source *postgresTestContainer
target *postgresTestContainer

databases []string
}

type MysqlTestContainer struct {
type mysqlTestContainer struct {
pool *sql.DB
container *testmysql.MySQLContainer
url string
close func()
}

type MysqlTest struct {
source *MysqlTestContainer
target *MysqlTestContainer
type mysqlTest struct {
source *mysqlTestContainer
target *mysqlTestContainer
}

type redisTest struct {
url string
testcontainer *redis.RedisContainer
}

type IntegrationTestSuite struct {
suite.Suite

ctx context.Context

mysql *MysqlTest
postgres *PostgresTest

redisUrl string
rediscontainer *redis.RedisContainer
mysql *mysqlTest
postgres *postgresTest
redis *redisTest
}

func (s *IntegrationTestSuite) SetupPostgres() {
func (s *IntegrationTestSuite) SetupPostgres() (*postgresTest, error) {
pgcontainer, err := testpg.Run(
s.ctx,
"postgres:15",
Expand All @@ -72,80 +76,97 @@ func (s *IntegrationTestSuite) SetupPostgres() {
),
)
if err != nil {
panic(err)
return nil, err
}
s.postgres = &PostgresTest{
postgresTest := &postgresTest{
testcontainer: pgcontainer,
}
connstr, err := pgcontainer.ConnectionString(s.ctx, "sslmode=disable")
if err != nil {
panic(err)
return nil, err
}

s.postgres.databases = []string{"datasync_source", "datasync_target"}
postgresTest.databases = []string{"datasync_source", "datasync_target"}
pool, err := pgxpool.New(s.ctx, connstr)
if err != nil {
panic(err)
return nil, err
}
s.postgres.pool = pool
postgresTest.pool = pool

s.T().Logf("creating databases. %+v \n", s.postgres.databases)
for _, db := range s.postgres.databases {
_, err = s.postgres.pool.Exec(s.ctx, fmt.Sprintf("CREATE DATABASE %s;", db))
s.T().Logf("creating databases. %+v \n", postgresTest.databases)
for _, db := range postgresTest.databases {
_, err = postgresTest.pool.Exec(s.ctx, fmt.Sprintf("CREATE DATABASE %s;", db))
if err != nil {
panic(err)
return nil, err
}
}

srcUrl, err := getDbPgUrl(connstr, "datasync_source", "disable")
if err != nil {
panic(err)
return nil, err
}
s.postgres.source = &PostgresTestContainer{
postgresTest.source = &postgresTestContainer{
url: srcUrl,
}
sourceConn, err := pgxpool.New(s.ctx, s.postgres.source.url)
sourceConn, err := pgxpool.New(s.ctx, postgresTest.source.url)
if err != nil {
panic(err)
return nil, err
}
s.postgres.source.pool = sourceConn
postgresTest.source.pool = sourceConn

targetUrl, err := getDbPgUrl(connstr, "datasync_target", "disable")
if err != nil {
panic(err)
return nil, err
}
s.postgres.target = &PostgresTestContainer{
postgresTest.target = &postgresTestContainer{
url: targetUrl,
}
targetConn, err := pgxpool.New(s.ctx, s.postgres.target.url)
targetConn, err := pgxpool.New(s.ctx, postgresTest.target.url)
if err != nil {
panic(err)
return nil, err
}
s.postgres.target.pool = targetConn
postgresTest.target.pool = targetConn
return postgresTest, nil
}

func (s *IntegrationTestSuite) SetupMysql() {
s.ctx = context.Background()
func (s *IntegrationTestSuite) SetupMysql() (*mysqlTest, error) {
var source *mysqlTestContainer
var target *mysqlTestContainer

s.mysql = &MysqlTest{}
errgrp := errgroup.Group{}
errgrp.Go(func() error {
sourcecontainer, err := createMysqlTestContainer(s.ctx, "datasync", "root", "pass-source")
if err != nil {
return err
}
source = sourcecontainer
return nil
})

sourcecontainer, err := createMysqlTestContainer(s.ctx, "datasync", "root", "pass-source")
if err != nil {
panic(err)
}
s.mysql.source = sourcecontainer
errgrp.Go(func() error {
targetcontainer, err := createMysqlTestContainer(s.ctx, "datasync", "root", "pass-target")
if err != nil {
return err
}
target = targetcontainer
return nil
})

targetcontainer, err := createMysqlTestContainer(s.ctx, "datasync", "root", "pass-target")
err := errgrp.Wait()
if err != nil {
panic(err)
return nil, err
}
s.mysql.target = targetcontainer

return &mysqlTest{
source: source,
target: target,
}, nil
}

func createMysqlTestContainer(
ctx context.Context,
database, username, password string,
) (*MysqlTestContainer, error) {
) (*mysqlTestContainer, error) {
container, err := testmysql.Run(ctx,
"mysql:8.0.36",
testmysql.WithDatabase(database),
Expand Down Expand Up @@ -177,7 +198,7 @@ func createMysqlTestContainer(
}

connUrl := fmt.Sprintf("mysql://%s:%s@%s:%s/%s?multiStatements=true", username, password, containerHost, containerPort.Port(), database)
return &MysqlTestContainer{
return &mysqlTestContainer{
pool: pool,
url: connUrl,
container: container,
Expand All @@ -189,27 +210,69 @@ func createMysqlTestContainer(
}, nil
}

func (s *IntegrationTestSuite) SetupSuite() {
s.ctx = context.Background()

s.SetupPostgres()
s.SetupMysql()

// redis
func (s *IntegrationTestSuite) SetupRedis() (*redisTest, error) {
redisContainer, err := redis.Run(
s.ctx,
"docker.io/redis:7",
redis.WithSnapshotting(10, 1),
redis.WithLogLevel(redis.LogLevelVerbose),
)
if err != nil {
panic(err)
return nil, err
}
s.rediscontainer = redisContainer
s.redisUrl, err = redisContainer.ConnectionString(s.ctx)
redisUrl, err := redisContainer.ConnectionString(s.ctx)
if err != nil {
return nil, err
}
return &redisTest{
testcontainer: redisContainer,
url: redisUrl,
}, nil
}

func (s *IntegrationTestSuite) SetupSuite() {
s.ctx = context.Background()

var postgresTest *postgresTest
var mysqlTest *mysqlTest
var redisTest *redisTest

errgrp := errgroup.Group{}
errgrp.Go(func() error {
p, err := s.SetupPostgres()
if err != nil {
return err
}
postgresTest = p
return nil
})

errgrp.Go(func() error {
m, err := s.SetupMysql()
if err != nil {
return err
}
mysqlTest = m
return nil
})

errgrp.Go(func() error {
r, err := s.SetupRedis()
if err != nil {
return err
}
redisTest = r
return nil
})

err := errgrp.Wait()
if err != nil {
panic(err)
}

s.postgres = postgresTest
s.mysql = mysqlTest
s.redis = redisTest
}

func (s *IntegrationTestSuite) RunPostgresSqlFiles(pool *pgxpool.Pool, testFolder string, files []string) {
Expand Down Expand Up @@ -283,8 +346,8 @@ func (s *IntegrationTestSuite) TearDownSuite() {
}

// redis
if s.rediscontainer != nil {
if err := s.rediscontainer.Terminate(s.ctx); err != nil {
if s.redis.testcontainer != nil {
if err := s.redis.testcontainer.Terminate(s.ctx); err != nil {
panic(err)
}
}
Expand Down
Loading

0 comments on commit 43d7d2c

Please sign in to comment.