Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update workflow integration tests to use API and run in parallel #2896

Merged
merged 33 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4772935
trying to use api
alishakawaguchi Oct 25, 2024
9708959
Merge branch 'main' into alisha/workflow-int-tests
alishakawaguchi Oct 31, 2024
9d0ab99
Merge branch 'main' into alisha/workflow-int-tests
alishakawaguchi Nov 4, 2024
2f7e13d
api working
alishakawaguchi Nov 5, 2024
7c38bf6
notes
alishakawaguchi Nov 5, 2024
25b77c3
Merge branch 'main' into alisha/workflow-int-tests
alishakawaguchi Dec 3, 2024
96cc037
fixed race condition
alishakawaguchi Dec 3, 2024
0b81402
test pk
alishakawaguchi Dec 3, 2024
50e07d3
passing
alishakawaguchi Dec 3, 2024
9258a70
Merge branch 'main' into alisha/workflow-int-tests
alishakawaguchi Dec 5, 2024
2d61cc6
redis postgres test and new edgecases test data
alishakawaguchi Dec 5, 2024
1b87527
Merge branch 'main' into alisha/workflow-int-tests
alishakawaguchi Dec 9, 2024
4f27ae7
Merge branch 'main' into alisha/workflow-int-tests
alishakawaguchi Dec 14, 2024
d771cb1
all types test
alishakawaguchi Dec 14, 2024
62eaab3
postgres tests working in parallel
alishakawaguchi Dec 16, 2024
866b77f
fix generator
alishakawaguchi Dec 18, 2024
6b59b1d
update jobmapping generator to set default transformer where required
alishakawaguchi Dec 19, 2024
2f3da92
more postgres tests
alishakawaguchi Dec 19, 2024
b8b87d7
move js transformer sql files
alishakawaguchi Dec 19, 2024
d109f85
postgres tests done
alishakawaguchi Dec 20, 2024
c3ff917
mssql tests
alishakawaguchi Dec 24, 2024
28cd9d6
undo changes
alishakawaguchi Dec 24, 2024
4cc9ee0
Merge branch 'develop/v5' into alisha/workflow-int-tests
alishakawaguchi Dec 24, 2024
603e6e4
mssql tests
alishakawaguchi Dec 24, 2024
76f55cb
dynamodb
alishakawaguchi Dec 24, 2024
57fd3cb
mongodb
alishakawaguchi Dec 24, 2024
c16b740
fix mssql tests
alishakawaguchi Dec 26, 2024
2763a33
fix mysql test
alishakawaguchi Dec 26, 2024
d049a76
reorg
alishakawaguchi Dec 26, 2024
faf0914
mysql job mapping generator keep same order
alishakawaguchi Dec 26, 2024
b6c18ec
clean up old tests
alishakawaguchi Dec 26, 2024
e787d08
clean up
alishakawaguchi Dec 26, 2024
f2ed2f0
fix test file names
alishakawaguchi Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions backend/pkg/integration-test/integration-test-util.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,34 @@ func CreateMysqlConnection(
return resp.Msg.GetConnection()
}

func CreateMssqlConnection(
ctx context.Context,
t *testing.T,
connclient mgmtv1alpha1connect.ConnectionServiceClient,
accountId string,
name string,
mssqlurl string,
) *mgmtv1alpha1.Connection {
resp, err := connclient.CreateConnection(
ctx,
connect.NewRequest(&mgmtv1alpha1.CreateConnectionRequest{
AccountId: accountId,
Name: name,
ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{
Config: &mgmtv1alpha1.ConnectionConfig_MssqlConfig{
MssqlConfig: &mgmtv1alpha1.MssqlConnectionConfig{
ConnectionConfig: &mgmtv1alpha1.MssqlConnectionConfig_Url{
Url: mssqlurl,
},
},
},
},
}),
)
RequireNoErrResp(t, resp, err)
return resp.Msg.GetConnection()
}

func CreateS3Connection(
ctx context.Context,
t *testing.T,
Expand Down Expand Up @@ -132,6 +160,32 @@ func CreateDynamoDBConnection(
return resp.Msg.GetConnection()
}

func CreateMongodbConnection(
ctx context.Context,
t *testing.T,
connclient mgmtv1alpha1connect.ConnectionServiceClient,
accountId, name, url string,
) *mgmtv1alpha1.Connection {
resp, err := connclient.CreateConnection(
ctx,
connect.NewRequest(&mgmtv1alpha1.CreateConnectionRequest{
AccountId: accountId,
Name: name,
ConnectionConfig: &mgmtv1alpha1.ConnectionConfig{
Config: &mgmtv1alpha1.ConnectionConfig_MongoConfig{
MongoConfig: &mgmtv1alpha1.MongoConnectionConfig{
ConnectionConfig: &mgmtv1alpha1.MongoConnectionConfig_Url{
Url: url,
},
},
},
},
}),
)
RequireNoErrResp(t, resp, err)
return resp.Msg.GetConnection()
}

func SetUser(ctx context.Context, t *testing.T, client mgmtv1alpha1connect.UserAccountServiceClient) string {
resp, err := client.SetUser(ctx, connect.NewRequest(&mgmtv1alpha1.SetUserRequest{}))
RequireNoErrResp(t, resp, err)
Expand Down
16 changes: 8 additions & 8 deletions cli/internal/cmds/neosync/sync/sync_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func Test_Sync(t *testing.T) {
// right now CLI sync and init schema takes everything in source and copies it to target since there are no job mappings defined by the user
// so it can't be scoped to specific schema
// t.Parallel()
err = postgres.Source.RunCreateStmtsInSchema(ctx, &testdataFolder, []string{"humanresources/create-tables.sql"}, "humanresources")
err = postgres.Source.RunCreateStmtsInSchema(ctx, testdataFolder, []string{"humanresources/create-tables.sql"}, "humanresources")
if err != nil {
t.Fatal(err)
}
alltypesSchema := "alltypes"
err = postgres.Source.RunCreateStmtsInSchema(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
err = postgres.Source.RunCreateStmtsInSchema(ctx, testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -155,12 +155,12 @@ func Test_Sync(t *testing.T) {
}

alltypesSchema := "alltypes_s3_pg"
err := postgres.Source.RunCreateStmtsInSchema(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
err := postgres.Source.RunCreateStmtsInSchema(ctx, testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
if err != nil {
t.Fatal(err)
}

err = postgres.Target.RunCreateStmtsInSchema(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
err = postgres.Target.RunCreateStmtsInSchema(ctx, testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -285,11 +285,11 @@ func Test_Sync(t *testing.T) {
// so it can't be scoped to specific schema
// t.Parallel()
alltypesSchema := "alltypes"
err = mysql.Source.RunCreateStmtsInDatabase(ctx, &testdataFolder, []string{"humanresources/create-tables.sql"}, "humanresources")
err = mysql.Source.RunCreateStmtsInDatabase(ctx, testdataFolder, []string{"humanresources/create-tables.sql"}, "humanresources")
if err != nil {
t.Fatal(err)
}
err = mysql.Source.RunCreateStmtsInDatabase(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
err = mysql.Source.RunCreateStmtsInDatabase(ctx, testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -348,12 +348,12 @@ func Test_Sync(t *testing.T) {
}

alltypesSchema := "alltypes_s3_mysql"
err := mysql.Source.RunCreateStmtsInDatabase(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
err := mysql.Source.RunCreateStmtsInDatabase(ctx, testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
if err != nil {
t.Fatal(err)
}

err = mysql.Target.RunCreateStmtsInDatabase(ctx, &testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
err = mysql.Target.RunCreateStmtsInDatabase(ctx, testdataFolder, []string{"alltypes/create-tables.sql"}, alltypesSchema)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/testutil/testcontainers/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dynamodb
package testcontainers_dynamodb

import (
"context"
Expand Down
137 changes: 137 additions & 0 deletions internal/testutil/testcontainers/mongodb/mongodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package testcontainers_mongodb

import (
"context"
"fmt"
"testing"

"github.com/testcontainers/testcontainers-go"
testmongodb "github.com/testcontainers/testcontainers-go/modules/mongodb"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/sync/errgroup"
)

type MongoDBTestSyncContainer struct {
Source *MongoDBTestContainer
Target *MongoDBTestContainer
}

func NewMongoDBTestSyncContainer(ctx context.Context, t *testing.T) (*MongoDBTestSyncContainer, error) {
tc := &MongoDBTestSyncContainer{}
errgrp := errgroup.Group{}
errgrp.Go(func() error {
d, err := NewMongoDBTestContainer(ctx, t)
if err != nil {
return err
}
tc.Source = d
return nil
})

errgrp.Go(func() error {
d, err := NewMongoDBTestContainer(ctx, t)
if err != nil {
return err
}
tc.Target = d
return nil
})

err := errgrp.Wait()
if err != nil {
return nil, err
}

return tc, nil
}

func (d *MongoDBTestSyncContainer) TearDown(ctx context.Context) error {
if d.Source != nil {
if d.Source.TestContainer != nil {
err := d.Source.TestContainer.Terminate(ctx)
if err != nil {
return err
}
}
}
if d.Target != nil {
if d.Target.TestContainer != nil {
err := d.Target.TestContainer.Terminate(ctx)
if err != nil {
return err
}
}
}
return nil
}

type MongoDBTestContainer struct {
Client *mongo.Client
URL string
TestContainer testcontainers.Container
}

func NewMongoDBTestContainer(ctx context.Context, t *testing.T) (*MongoDBTestContainer, error) {
m := &MongoDBTestContainer{}
return m.Setup(ctx, t)
}

func (m *MongoDBTestContainer) Setup(ctx context.Context, t *testing.T) (*MongoDBTestContainer, error) {
container, err := testmongodb.Run(ctx, "mongo:6")
if err != nil {
return nil, err
}

uri, err := container.ConnectionString(ctx)
if err != nil {
return nil, err
}

client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}

return &MongoDBTestContainer{
Client: client,
URL: uri,
TestContainer: container,
}, nil
}

func (m *MongoDBTestContainer) TearDown(ctx context.Context) error {
if m.Client != nil {
if err := m.Client.Disconnect(ctx); err != nil {
return err
}
}
if m.TestContainer != nil {
return m.TestContainer.Terminate(ctx)
}
return nil
}

func (m *MongoDBTestContainer) InsertMongoDbRecords(ctx context.Context, database, collection string, documents []any) (int, error) {
db := m.Client.Database(database)
col := db.Collection(collection)

result, err := col.InsertMany(ctx, documents)
if err != nil {
return 0, fmt.Errorf("failed to insert mongodb records: %v", err)
}

return len(result.InsertedIDs), nil
}

func (m *MongoDBTestContainer) DropMongoDbCollection(ctx context.Context, database, collection string) error {
db := m.Client.Database(database)
collections, err := db.ListCollectionNames(ctx, map[string]any{"name": collection})
if err != nil {
return err
}
if len(collections) == 0 {
return nil
}
return db.Collection(collection).Drop(ctx)
}
26 changes: 18 additions & 8 deletions internal/testutil/testcontainers/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,18 @@ func (m *MysqlTestContainer) RunSqlFiles(ctx context.Context, folder *string, fi
}

// Creates schema and sets USE to schema before running SQL files
func (m *MysqlTestContainer) RunCreateStmtsInDatabase(ctx context.Context, folder *string, files []string, database string) error {
func (m *MysqlTestContainer) RunCreateStmtsInDatabase(ctx context.Context, folder string, files []string, database string) error {
for _, file := range files {
filePath := file
if folder != nil && *folder != "" {
filePath = fmt.Sprintf("./%s/%s", *folder, file)
if folder != "" {
filePath = fmt.Sprintf("./%s/%s", folder, file)
}
sqlStr, err := os.ReadFile(filePath)
if err != nil {
return err
}

setSchemaSql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s; \n USE %s; \n", database, database)
setSchemaSql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`; \n USE `%s`; \n", database, database)
_, err = m.DB.ExecContext(ctx, setSchemaSql+string(sqlStr))
if err != nil {
return fmt.Errorf("unable to exec sql when running postgres sql files: %w", err)
Expand All @@ -292,11 +292,21 @@ func (m *MysqlTestContainer) RunCreateStmtsInDatabase(ctx context.Context, folde
return nil
}

func (m *MysqlTestContainer) CreateDatabases(ctx context.Context, schemas []string) error {
for _, schema := range schemas {
_, err := m.DB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", schema))
func (m *MysqlTestContainer) CreateDatabases(ctx context.Context, databases []string) error {
for _, database := range databases {
_, err := m.DB.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`;", database))
if err != nil {
return fmt.Errorf("unable to create database %s: %w", database, err)
}
}
return nil
}

func (m *MysqlTestContainer) DropDatabases(ctx context.Context, databases []string) error {
for _, database := range databases {
_, err := m.DB.ExecContext(ctx, fmt.Sprintf("DROP DATABASE IF EXISTS `%s`;", database))
if err != nil {
return fmt.Errorf("unable to create schema %s: %w", schema, err)
return fmt.Errorf("unable to drop database %s: %w", database, err)
}
}
return nil
Expand Down
20 changes: 15 additions & 5 deletions internal/testutil/testcontainers/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,17 @@ func (p *PostgresTestContainer) RunSqlFiles(ctx context.Context, folder *string,
}

// Creates schema and sets search_path to schema before running SQL files
func (p *PostgresTestContainer) RunCreateStmtsInSchema(ctx context.Context, folder *string, files []string, schema string) error {
func (p *PostgresTestContainer) RunCreateStmtsInSchema(ctx context.Context, folder string, files []string, schema string) error {
for _, file := range files {
filePath := file
if folder != nil && *folder != "" {
filePath = fmt.Sprintf("./%s/%s", *folder, file)
if folder != "" {
filePath = fmt.Sprintf("./%s/%s", folder, file)
}
sqlStr, err := os.ReadFile(filePath)
if err != nil {
return err
}
setSchemaSql := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s; \n SET search_path TO %s; \n", schema, schema)
setSchemaSql := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %q; \n SET search_path TO %q; \n", schema, schema)
_, err = p.DB.Exec(ctx, setSchemaSql+string(sqlStr))
if err != nil {
return fmt.Errorf("unable to exec sql when running postgres sql files: %w", err)
Expand All @@ -298,14 +298,24 @@ func (p *PostgresTestContainer) RunCreateStmtsInSchema(ctx context.Context, fold

func (p *PostgresTestContainer) CreateSchemas(ctx context.Context, schemas []string) error {
for _, schema := range schemas {
_, err := p.DB.Exec(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s;", schema))
_, err := p.DB.Exec(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %q;", schema))
if err != nil {
return fmt.Errorf("unable to create schema %s: %w", schema, err)
}
}
return nil
}

func (p *PostgresTestContainer) DropSchemas(ctx context.Context, schemas []string) error {
for _, schema := range schemas {
_, err := p.DB.Exec(ctx, fmt.Sprintf("DROP SCHEMA IF EXISTS %q CASCADE;", schema))
if err != nil {
return fmt.Errorf("unable to drop schema %s: %w", schema, err)
}
}
return nil
}

func (p *PostgresTestContainer) GetTableRowCount(ctx context.Context, schema, table string) (int, error) {
rows := p.DB.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %q.%q;", schema, table))
var count int
Expand Down
Loading
Loading