Skip to content

Commit

Permalink
test import type criteria migration
Browse files Browse the repository at this point in the history
  • Loading branch information
IngoRoessner committed Nov 27, 2024
1 parent b50b247 commit 49e1ed6
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 52 deletions.
1 change: 0 additions & 1 deletion lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type Config struct {
GroupId string `json:"group_id"`
DeviceRepoUrl string `json:"device_repo_url"`
MongoUrl string `json:"mongo_url"`
MongoReplSet bool `json:"mongo_repl_set"` //set true if mongodb is configured as replication set or mongos and is able to handle transactions
MongoTable string `json:"mongo_table"`
MongoImportTypeCollection string `json:"mongo_import_type_collection"`
Debug bool `json:"debug"`
Expand Down
34 changes: 0 additions & 34 deletions lib/database/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,40 +68,6 @@ func (this *Mongo) CreateId() string {
return uuid.NewString()
}

func (this *Mongo) Transaction(ctx context.Context) (resultCtx context.Context, close func(success bool) error, err error) {
if !this.config.MongoReplSet {
return ctx, func(bool) error { return nil }, nil
}
session, err := this.client.StartSession()
if err != nil {
return nil, nil, err
}
err = session.StartTransaction()
if err != nil {
return nil, nil, err
}

//create session context; callback is executed synchronously and the error is passed on as error of WithSession
_ = mongo.WithSession(ctx, session, func(sessionContext mongo.SessionContext) error {
resultCtx = sessionContext
return nil
})

return resultCtx, func(success bool) error {
defer session.EndSession(context.Background())
var err error
if success {
err = session.CommitTransaction(resultCtx)
} else {
err = session.AbortTransaction(resultCtx)
}
if err != nil {
log.Println("ERROR: unable to finish mongo transaction", err)
}
return err
}, nil
}

func (this *Mongo) ensureIndex(collection *mongo.Collection, indexname string, indexKey string, asc bool, unique bool) error {
ctx, _ := getTimeoutContext()
var direction int32 = -1
Expand Down
240 changes: 224 additions & 16 deletions lib/database/mongo/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,234 @@
package mongo

import (
"github.com/ory/dockertest"
"go.mongodb.org/mongo-driver/mongo"
"context"
"github.com/SENERGY-Platform/import-repository/lib/config"
"github.com/SENERGY-Platform/import-repository/lib/model"
"github.com/SENERGY-Platform/import-repository/lib/testutils/docker"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"log"
"reflect"
"slices"
"strings"
"sync"
"testing"
"time"
)

func MongoTestServer(pool *dockertest.Pool) (closer func(), hostPort string, ipAddress string, err error) {
log.Println("start mongodb")
repo, err := pool.Run("mongo", "4.1.11", []string{})
func TestMigration(t *testing.T) {
wg := &sync.WaitGroup{}
defer wg.Wait()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conf, err := config.Load("../../../config.json")
if err != nil {
t.Error(err)
return
}

_, ip, err := docker.MongoDB(ctx, wg)
if err != nil {
return func() {}, "", "", err
t.Error(err)
return
}
conf.MongoUrl = "mongodb://" + ip + ":27017"

getColorFunction := "urn:infai:ses:measuring-function:getColorFunction"
getHumidityFunction := "urn:infai:ses:measuring-function:getHumidityFunction"

testCharacteristic := "urn:infai:ses:characteristic:test"

deviceAspect := "urn:infai:ses:aspect:deviceAspect"
airAspect := "urn:infai:ses:aspect:airAspect"

it1 := model.ImportType{
Id: "it1",
Name: "it1",
Output: model.ContentVariable{
Name: "output",
SubContentVariables: []model.ContentVariable{
{
Name: "value",
SubContentVariables: []model.ContentVariable{
{
Name: "value",
CharacteristicId: testCharacteristic,
AspectId: deviceAspect,
FunctionId: getColorFunction,
},
{
Name: "foo",
AspectId: airAspect,
FunctionId: getHumidityFunction,
},
},
},
},
},
}

it2 := model.ImportType{
Id: "it2",
Name: "it2",
Output: model.ContentVariable{
Name: "output",
SubContentVariables: []model.ContentVariable{
{
Name: "value",
CharacteristicId: testCharacteristic,
AspectId: deviceAspect,
FunctionId: getColorFunction,
},
},
},
}
hostPort = repo.GetPort("27017/tcp")
err = pool.Retry(func() error {
log.Println("try mongodb connection...")
ctx, _ := getTimeoutContext()
client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:"+hostPort))
err = client.Ping(ctx, readpref.Primary())
return err

it3 := model.ImportType{
Id: "it3",
Name: "it3",
Output: model.ContentVariable{
Name: "output",
SubContentVariables: []model.ContentVariable{
{
Name: "foo",
AspectId: airAspect,
FunctionId: getHumidityFunction,
},
},
},
}

itNone := model.ImportType{
Id: "none",
Name: "none",
Output: model.ContentVariable{
Name: "output",
},
}

t.Run("create import types without criteria", func(t *testing.T) {
subCtx, cancel := context.WithCancel(ctx)
defer time.Sleep(time.Second)
defer cancel()
defer time.Sleep(time.Second)
db, err := New(conf, subCtx, wg)
if err != nil {
t.Error(err)
return
}
for _, importType := range []model.ImportType{it1, it2, it3, itNone} {
_, err := db.importTypeCollection().ReplaceOne(ctx, bson.M{idKey: importType.Id}, importType, options.Replace().SetUpsert(true))
if err != nil {
t.Error(err)
return
}
}
})

t.Run("migrate", func(t *testing.T) {
subCtx, cancel := context.WithCancel(ctx)
defer time.Sleep(time.Second)
defer cancel()
defer time.Sleep(time.Second)
db, err := New(conf, subCtx, wg)
if err != nil {
t.Error(err)
return
}

t.Run("check migrations", func(t *testing.T) {
c, err := db.importTypeCollection().Find(ctx, bson.M{})
if err != nil {
t.Error(err)
return
}
var list []ImportTypeWithCriteria
err = c.All(ctx, &list)
if err != nil {
t.Error(err)
return
}
expected := []ImportTypeWithCriteria{
{
ImportType: it1,
Criteria: contentVariableToCertList(it1.Output),
},
{
ImportType: it2,
Criteria: contentVariableToCertList(it2.Output),
},
{
ImportType: it3,
Criteria: contentVariableToCertList(it3.Output),
},
{
ImportType: itNone,
Criteria: contentVariableToCertList(itNone.Output),
},
}
slices.SortFunc(expected, func(a, b ImportTypeWithCriteria) int {
return strings.Compare(a.ImportType.Id, b.ImportType.Id)
})
slices.SortFunc(list, func(a, b ImportTypeWithCriteria) int {
return strings.Compare(a.ImportType.Id, b.ImportType.Id)
})
if !reflect.DeepEqual(list, expected) {
t.Errorf("\na=%#v\ne=%#v\n", list, expected)
}
})
})

t.Run("check migration rerun", func(t *testing.T) {
subCtx, cancel := context.WithCancel(ctx)
defer time.Sleep(time.Second)
defer cancel()
defer time.Sleep(time.Second)
db, err := New(conf, subCtx, wg)
if err != nil {
t.Error(err)
return
}

t.Run("check migrations", func(t *testing.T) {
c, err := db.importTypeCollection().Find(ctx, bson.M{})
if err != nil {
t.Error(err)
return
}
var list []ImportTypeWithCriteria
err = c.All(ctx, &list)
if err != nil {
t.Error(err)
return
}
expected := []ImportTypeWithCriteria{
{
ImportType: it1,
Criteria: contentVariableToCertList(it1.Output),
},
{
ImportType: it2,
Criteria: contentVariableToCertList(it2.Output),
},
{
ImportType: it3,
Criteria: contentVariableToCertList(it3.Output),
},
{
ImportType: itNone,
Criteria: contentVariableToCertList(itNone.Output),
},
}
slices.SortFunc(expected, func(a, b ImportTypeWithCriteria) int {
return strings.Compare(a.ImportType.Id, b.ImportType.Id)
})
slices.SortFunc(list, func(a, b ImportTypeWithCriteria) int {
return strings.Compare(a.ImportType.Id, b.ImportType.Id)
})
if !reflect.DeepEqual(list, expected) {
t.Errorf("\na=%#v\ne=%#v\n", list, expected)
}
})
})
return func() { repo.Close() }, hostPort, repo.Container.NetworkSettings.IPAddress, err
}
1 change: 0 additions & 1 deletion tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func createTestEnv(ctx context.Context, wg *sync.WaitGroup) (permv2Client permV2
log.Println("ERROR: unable to load config: ", err)
return permv2Client, conf, err
}
conf.MongoReplSet = false
conf, err = NewDockerEnv(conf, ctx, wg)
if err != nil {
log.Println("ERROR: unable to create docker env", err)
Expand Down

0 comments on commit 49e1ed6

Please sign in to comment.