From 49e1ed6533bc7f13b1c599b02b09ea1f6bb33ef1 Mon Sep 17 00:00:00 2001 From: Ingo Date: Wed, 27 Nov 2024 11:19:01 +0100 Subject: [PATCH] test import type criteria migration --- lib/config/config.go | 1 - lib/database/mongo/mongo.go | 34 ----- lib/database/mongo/mongo_test.go | 240 ++++++++++++++++++++++++++++--- tests/main_test.go | 1 - 4 files changed, 224 insertions(+), 52 deletions(-) diff --git a/lib/config/config.go b/lib/config/config.go index 3365892..04458f1 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -34,7 +34,6 @@ type Config struct { GroupId string `json:"group_id"` DeviceRepoUrl string `json:"device_repo_url"` MongoUrl string `json:"mongo_url"` - MongoReplSet bool `json:"mongo_repl_set"` //set true if mongodb is configured as replication set or mongos and is able to handle transactions MongoTable string `json:"mongo_table"` MongoImportTypeCollection string `json:"mongo_import_type_collection"` Debug bool `json:"debug"` diff --git a/lib/database/mongo/mongo.go b/lib/database/mongo/mongo.go index df5f147..ac4753a 100644 --- a/lib/database/mongo/mongo.go +++ b/lib/database/mongo/mongo.go @@ -68,40 +68,6 @@ func (this *Mongo) CreateId() string { return uuid.NewString() } -func (this *Mongo) Transaction(ctx context.Context) (resultCtx context.Context, close func(success bool) error, err error) { - if !this.config.MongoReplSet { - return ctx, func(bool) error { return nil }, nil - } - session, err := this.client.StartSession() - if err != nil { - return nil, nil, err - } - err = session.StartTransaction() - if err != nil { - return nil, nil, err - } - - //create session context; callback is executed synchronously and the error is passed on as error of WithSession - _ = mongo.WithSession(ctx, session, func(sessionContext mongo.SessionContext) error { - resultCtx = sessionContext - return nil - }) - - return resultCtx, func(success bool) error { - defer session.EndSession(context.Background()) - var err error - if success { - err = session.CommitTransaction(resultCtx) - } else { - err = session.AbortTransaction(resultCtx) - } - if err != nil { - log.Println("ERROR: unable to finish mongo transaction", err) - } - return err - }, nil -} - func (this *Mongo) ensureIndex(collection *mongo.Collection, indexname string, indexKey string, asc bool, unique bool) error { ctx, _ := getTimeoutContext() var direction int32 = -1 diff --git a/lib/database/mongo/mongo_test.go b/lib/database/mongo/mongo_test.go index 1bed33d..09f4795 100644 --- a/lib/database/mongo/mongo_test.go +++ b/lib/database/mongo/mongo_test.go @@ -17,26 +17,234 @@ package mongo import ( - "github.com/ory/dockertest" - "go.mongodb.org/mongo-driver/mongo" + "context" + "github.com/SENERGY-Platform/import-repository/lib/config" + "github.com/SENERGY-Platform/import-repository/lib/model" + "github.com/SENERGY-Platform/import-repository/lib/testutils/docker" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/readpref" - "log" + "reflect" + "slices" + "strings" + "sync" + "testing" + "time" ) -func MongoTestServer(pool *dockertest.Pool) (closer func(), hostPort string, ipAddress string, err error) { - log.Println("start mongodb") - repo, err := pool.Run("mongo", "4.1.11", []string{}) +func TestMigration(t *testing.T) { + wg := &sync.WaitGroup{} + defer wg.Wait() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conf, err := config.Load("../../../config.json") + if err != nil { + t.Error(err) + return + } + + _, ip, err := docker.MongoDB(ctx, wg) if err != nil { - return func() {}, "", "", err + t.Error(err) + return + } + conf.MongoUrl = "mongodb://" + ip + ":27017" + + getColorFunction := "urn:infai:ses:measuring-function:getColorFunction" + getHumidityFunction := "urn:infai:ses:measuring-function:getHumidityFunction" + + testCharacteristic := "urn:infai:ses:characteristic:test" + + deviceAspect := "urn:infai:ses:aspect:deviceAspect" + airAspect := "urn:infai:ses:aspect:airAspect" + + it1 := model.ImportType{ + Id: "it1", + Name: "it1", + Output: model.ContentVariable{ + Name: "output", + SubContentVariables: []model.ContentVariable{ + { + Name: "value", + SubContentVariables: []model.ContentVariable{ + { + Name: "value", + CharacteristicId: testCharacteristic, + AspectId: deviceAspect, + FunctionId: getColorFunction, + }, + { + Name: "foo", + AspectId: airAspect, + FunctionId: getHumidityFunction, + }, + }, + }, + }, + }, + } + + it2 := model.ImportType{ + Id: "it2", + Name: "it2", + Output: model.ContentVariable{ + Name: "output", + SubContentVariables: []model.ContentVariable{ + { + Name: "value", + CharacteristicId: testCharacteristic, + AspectId: deviceAspect, + FunctionId: getColorFunction, + }, + }, + }, } - hostPort = repo.GetPort("27017/tcp") - err = pool.Retry(func() error { - log.Println("try mongodb connection...") - ctx, _ := getTimeoutContext() - client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:"+hostPort)) - err = client.Ping(ctx, readpref.Primary()) - return err + + it3 := model.ImportType{ + Id: "it3", + Name: "it3", + Output: model.ContentVariable{ + Name: "output", + SubContentVariables: []model.ContentVariable{ + { + Name: "foo", + AspectId: airAspect, + FunctionId: getHumidityFunction, + }, + }, + }, + } + + itNone := model.ImportType{ + Id: "none", + Name: "none", + Output: model.ContentVariable{ + Name: "output", + }, + } + + t.Run("create import types without criteria", func(t *testing.T) { + subCtx, cancel := context.WithCancel(ctx) + defer time.Sleep(time.Second) + defer cancel() + defer time.Sleep(time.Second) + db, err := New(conf, subCtx, wg) + if err != nil { + t.Error(err) + return + } + for _, importType := range []model.ImportType{it1, it2, it3, itNone} { + _, err := db.importTypeCollection().ReplaceOne(ctx, bson.M{idKey: importType.Id}, importType, options.Replace().SetUpsert(true)) + if err != nil { + t.Error(err) + return + } + } + }) + + t.Run("migrate", func(t *testing.T) { + subCtx, cancel := context.WithCancel(ctx) + defer time.Sleep(time.Second) + defer cancel() + defer time.Sleep(time.Second) + db, err := New(conf, subCtx, wg) + if err != nil { + t.Error(err) + return + } + + t.Run("check migrations", func(t *testing.T) { + c, err := db.importTypeCollection().Find(ctx, bson.M{}) + if err != nil { + t.Error(err) + return + } + var list []ImportTypeWithCriteria + err = c.All(ctx, &list) + if err != nil { + t.Error(err) + return + } + expected := []ImportTypeWithCriteria{ + { + ImportType: it1, + Criteria: contentVariableToCertList(it1.Output), + }, + { + ImportType: it2, + Criteria: contentVariableToCertList(it2.Output), + }, + { + ImportType: it3, + Criteria: contentVariableToCertList(it3.Output), + }, + { + ImportType: itNone, + Criteria: contentVariableToCertList(itNone.Output), + }, + } + slices.SortFunc(expected, func(a, b ImportTypeWithCriteria) int { + return strings.Compare(a.ImportType.Id, b.ImportType.Id) + }) + slices.SortFunc(list, func(a, b ImportTypeWithCriteria) int { + return strings.Compare(a.ImportType.Id, b.ImportType.Id) + }) + if !reflect.DeepEqual(list, expected) { + t.Errorf("\na=%#v\ne=%#v\n", list, expected) + } + }) + }) + + t.Run("check migration rerun", func(t *testing.T) { + subCtx, cancel := context.WithCancel(ctx) + defer time.Sleep(time.Second) + defer cancel() + defer time.Sleep(time.Second) + db, err := New(conf, subCtx, wg) + if err != nil { + t.Error(err) + return + } + + t.Run("check migrations", func(t *testing.T) { + c, err := db.importTypeCollection().Find(ctx, bson.M{}) + if err != nil { + t.Error(err) + return + } + var list []ImportTypeWithCriteria + err = c.All(ctx, &list) + if err != nil { + t.Error(err) + return + } + expected := []ImportTypeWithCriteria{ + { + ImportType: it1, + Criteria: contentVariableToCertList(it1.Output), + }, + { + ImportType: it2, + Criteria: contentVariableToCertList(it2.Output), + }, + { + ImportType: it3, + Criteria: contentVariableToCertList(it3.Output), + }, + { + ImportType: itNone, + Criteria: contentVariableToCertList(itNone.Output), + }, + } + slices.SortFunc(expected, func(a, b ImportTypeWithCriteria) int { + return strings.Compare(a.ImportType.Id, b.ImportType.Id) + }) + slices.SortFunc(list, func(a, b ImportTypeWithCriteria) int { + return strings.Compare(a.ImportType.Id, b.ImportType.Id) + }) + if !reflect.DeepEqual(list, expected) { + t.Errorf("\na=%#v\ne=%#v\n", list, expected) + } + }) }) - return func() { repo.Close() }, hostPort, repo.Container.NetworkSettings.IPAddress, err } diff --git a/tests/main_test.go b/tests/main_test.go index 9e58139..59ae5fd 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -123,7 +123,6 @@ func createTestEnv(ctx context.Context, wg *sync.WaitGroup) (permv2Client permV2 log.Println("ERROR: unable to load config: ", err) return permv2Client, conf, err } - conf.MongoReplSet = false conf, err = NewDockerEnv(conf, ctx, wg) if err != nil { log.Println("ERROR: unable to create docker env", err)