Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH96: read and cache revisions #97

Merged
merged 9 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added unit tests and resolve related bugs
  • Loading branch information
Alexander van Delft committed Mar 6, 2020
commit a736b9833008309eb6c6c75a4bea49b56e968a06
109 changes: 107 additions & 2 deletions CDP4Dal.Tests/AssemblerTestFixture.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// --------------------------------------------------------------------------------------------------------------------
// <copyright file="AssemblerTestFixture.cs" company="RHEA System S.A.">
// Copyright (c) 2015-2019 RHEA System S.A.
// Copyright (c) 2015-2020 RHEA System S.A.
//
// Author: Sam Gerené, Merlin Bieze, Alex Vorobiev, Naron Phou
// Author: Sam Gerené, Merlin Bieze, Alex Vorobiev, Naron Phou, Alexander van Delft
//
// This file is part of CDP4-SDK Community Edition
//
Expand All @@ -28,12 +28,16 @@ namespace CDP4Dal.Tests
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using CDP4Common.CommonData;
using CDP4Common.EngineeringModelData;
using CDP4Common.SiteDirectoryData;
using CDP4Common.Types;

using CDP4Dal.Events;

using NUnit.Framework;

using Dto = CDP4Common.DTO;

[TestFixture]
Expand Down Expand Up @@ -183,6 +187,107 @@ public async Task AssertThatAssemblerSynchronizationWorks()
// checks that the removed category is no longer in the cache
Assert.AreEqual(6, assembler.Cache.Count);
Assert.IsFalse(assembler.Cache.TryGetValue(new CacheKey(categoryToRemove.Iid, null), out lazyCat));
}

[Test]
public async Task AssertThatRevisionsAreCachedCorrectly()
{
var assembler = new Assembler(this.uri);

var parameterIid = Guid.NewGuid();
var parameterRevision1 = new Dto.Parameter(parameterIid, 1); //The Parameter's 1st Revision
var parameterRevision2 = new Dto.Parameter(parameterIid, 2); //The Parameter's 2nd Revision
var parameterRevision3 = new Dto.Parameter(parameterIid, 3); //The Parameter's 3rd Revision

var valueSet1 = new Dto.ParameterValueSet(Guid.NewGuid(), 1); //ValueSet that belongs to the parameter's 1st Revision
var valueSet2 = new Dto.ParameterValueSet(Guid.NewGuid(), 1); //ValueSet that belongs to the parameter's 2nd Revision
var valueSet3 = new Dto.ParameterValueSet(Guid.NewGuid(), 1); //ValueSet that belongs to the parameter's 3rd Revision

parameterRevision1.ValueSet.Add(valueSet1.Iid);
parameterRevision2.ValueSet.Add(valueSet2.Iid);
parameterRevision3.ValueSet.Add(valueSet3.Iid);

//******************************************************************************************************************
// 1st call of Synchronize for Revision 2, which is the currently active revision
//******************************************************************************************************************
await assembler.Synchronize(new List<Dto.Thing> { parameterRevision2, valueSet2 });

//Cache should not be empty
Assert.IsNotEmpty(assembler.Cache);

//Cache should contain 2 items
Assert.AreEqual(2, assembler.Cache.Count);

//Get the cached version of the parameter
var cachedParameter = assembler.Cache.First(x => x.Value.Value.Iid == parameterRevision2.Iid).Value.Value as Parameter;

//Revision number should be 2 now
Assert.AreEqual(parameterRevision2.RevisionNumber, cachedParameter.RevisionNumber);

//Parameter should contain a ValueSet
Assert.AreEqual(1, cachedParameter.ValueSet.Count);

//Parameter should contain the correct ValueSet
Assert.AreEqual(cachedParameter.ValueSet.First().Iid, valueSet2.Iid);

//******************************************************************************************************************
// 2st call of Synchronize which introduces a newer revision: Revision nr. 3.
//******************************************************************************************************************
await assembler.Synchronize(new List<Dto.Thing> { parameterRevision3, valueSet3 });

//Cache should still contain 2 things, because parameterRevision2 is removed from cache together with valueSet2
//parameterRevision2 now is contained in the Revisions property of the cached version of the parameter
Assert.AreEqual(2, assembler.Cache.Count);

cachedParameter = assembler.Cache.First(x => x.Value.Value.Iid == parameterRevision3.Iid).Value.Value as Parameter;

//Current cached parameter version is Revision 3
Assert.AreEqual(parameterRevision3.RevisionNumber, cachedParameter.RevisionNumber);

//cached parameter should contain a ValueSet
Assert.AreEqual(1, cachedParameter.ValueSet.Count);

//cached parameter should contain exactly 1 revision
Assert.AreEqual(1, cachedParameter.Revisions.Count);

//cached parameter should contain the correct ValueSet
Assert.AreEqual(cachedParameter.ValueSet.First().Iid, valueSet3.Iid);

//Revisions property of current cached item should contain the right revision number
Assert.AreEqual(cachedParameter.Revisions.First().Value.RevisionNumber, parameterRevision2.RevisionNumber);

//******************************************************************************************************************
// 3rd call of Synchronize with older revision, that should be added as a revision to an existing cached poco
//******************************************************************************************************************
await assembler.Synchronize(new List<Dto.Thing> { parameterRevision1, valueSet1 });

//Cache should still contain 2 things, because parameterRevision1 is added to the Revisions property of the current cached parameter
Assert.AreEqual(2, assembler.Cache.Count);

cachedParameter = assembler.Cache.First(x => x.Value.Value.Iid == parameterRevision1.Iid).Value.Value as Parameter;

//parameterRevision3 is still the current cached version
Assert.AreEqual(parameterRevision3.RevisionNumber, cachedParameter.RevisionNumber);

//cached parameter should contain a ValueSet
Assert.AreEqual(1, cachedParameter.ValueSet.Count);

//cached parameter should contain the correct ValueSet
Assert.AreEqual(cachedParameter.ValueSet.First().Iid, valueSet3.Iid);

//cached parameter should contain exactly 2 revisions
Assert.AreEqual(2, cachedParameter.Revisions.Count);

var revisionParameter1 = cachedParameter.Revisions.Single(x => x.Value.Iid == parameterRevision1.Iid && x.Value.RevisionNumber == parameterRevision1.RevisionNumber).Value as Parameter;
var revisionParameter2 = cachedParameter.Revisions.Single(x => x.Value.Iid == parameterRevision2.Iid && x.Value.RevisionNumber == parameterRevision2.RevisionNumber).Value as Parameter;

//Should be empty, because an older revision than the one currently in the cache was asked for
//In that case the ValueSet belonging to the Parameter doens't get cloned (because it is unknown at that moment)
Assert.AreEqual(0, revisionParameter1.ValueSet.Count);

//Should be 1, because the ValueSet2 was cloned and added to the Parameter added to the Revisions property of the cached parameter
//when revision 3 was added to the cache
Assert.AreEqual(1, revisionParameter2.ValueSet.Count);
}

[Test]
Expand Down
62 changes: 45 additions & 17 deletions CDP4Dal/Assembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
namespace CDP4Dal
{
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
Expand All @@ -40,7 +39,9 @@ namespace CDP4Dal
using CDP4Common.Helpers;
using CDP4Common.SiteDirectoryData;
using CDP4Common.Types;

using Events;

using NLog;

using Dto = CDP4Common.DTO.Thing;
Expand Down Expand Up @@ -79,7 +80,12 @@ public class Assembler
/// The <see cref="List{Dto}"/> not completely resolved that are in the cache
/// </summary>
private List<Dto> unresolvedDtos;


/// <summary>
/// The <see cref="List{Dto}"/> that temporarily contains older revisions of already cached POCO's
/// </summary>
private List<Dto> olderRevisionDtos;

/// <summary>
/// Initializes a new instance of the <see cref="Assembler"/> class.
/// </summary>
Expand Down Expand Up @@ -124,13 +130,17 @@ public async Task Synchronize(IEnumerable<CDP4Common.DTO.Thing> dtoThings, bool
}

await this.threadLock.WaitAsync().ConfigureAwait(false);

try
{
var synchronizeStopWatch = Stopwatch.StartNew();

logger.Info("Start Synchronization of {0}", this.IDalUri);

this.olderRevisionDtos = new List<Dto>();

this.UpdateThingRevisions(dtoThings);
var updatedAndNewThingRevisions = dtoThings.Except(this.olderRevisionDtos).ToList();

this.thingsMarkedForDeletion = new List<Thing>();

Expand Down Expand Up @@ -160,13 +170,13 @@ public async Task Synchronize(IEnumerable<CDP4Common.DTO.Thing> dtoThings, bool

logger.Trace("Start Updating cache");
startwatch = Stopwatch.StartNew();
this.AddOrUpdateTheCache(this.DtoThingToUpdate);
this.AddOrUpdateTheCache(updatedAndNewThingRevisions);
startwatch.Stop();
logger.Trace("Updating cache took {0} [ms]", startwatch.ElapsedMilliseconds);

logger.Trace("Start Resolving properties");
startwatch = Stopwatch.StartNew();
PocoThingFactory.ResolveDependencies(this.DtoThingToUpdate, this.Cache);
PocoThingFactory.ResolveDependencies(updatedAndNewThingRevisions, this.Cache);
startwatch.Stop();
logger.Trace("Resolving properties took {0} [ms]", startwatch.ElapsedMilliseconds);

Expand All @@ -175,9 +185,8 @@ public async Task Synchronize(IEnumerable<CDP4Common.DTO.Thing> dtoThings, bool
startwatch = Stopwatch.StartNew();
foreach (var dtoThing in this.DtoThingToUpdate)
{
Lazy<Thing> updatedLazyThing;
var cacheKey = new CacheKey(dtoThing.Iid, dtoThing.IterationContainerId);
var succeed = this.Cache.TryGetValue(cacheKey, out updatedLazyThing);
var succeed = this.Cache.TryGetValue(cacheKey, out var updatedLazyThing);

if (succeed)
{
Expand All @@ -191,6 +200,7 @@ public async Task Synchronize(IEnumerable<CDP4Common.DTO.Thing> dtoThings, bool
}
}
}

startwatch.Stop();
logger.Trace("Validating {0} Things took {1} [ms]", this.DtoThingToUpdate.Count, startwatch.ElapsedMilliseconds);

Expand All @@ -204,25 +214,39 @@ public async Task Synchronize(IEnumerable<CDP4Common.DTO.Thing> dtoThings, bool

foreach (var dtoThing in this.DtoThingToUpdate)
{
Lazy<Thing> updatedLazyThing;
var cacheKey = new CacheKey(dtoThing.Iid, dtoThing.IterationContainerId);
var succeed = this.Cache.TryGetValue(cacheKey, out updatedLazyThing);
var succeed = this.Cache.TryGetValue(cacheKey, out var updatedLazyThing);

if (succeed)
{
var thingObject = updatedLazyThing.Value;
var cacheId = new CacheKey(dtoThing.Iid, dtoThing.IterationContainerId);

if (!existentGuid.Select(x => x.Item1).Contains(cacheId))
{
CDPMessageBus.Current.SendObjectChangeEvent(thingObject, EventKind.Added);
messageCounter++;
}
else if (dtoThing.RevisionNumber > existentGuid.Single(x => x.Item1.Equals(cacheId)).Item2)
else
{
// send event only if revision number has increased from the old cached version
CDPMessageBus.Current.SendObjectChangeEvent(thingObject, EventKind.Updated);
messageCounter++;
}
var cacheThingRevisionNumber = existentGuid.Single(x => x.Item1.Equals(cacheId)).Item2;

if (dtoThing.RevisionNumber > cacheThingRevisionNumber)
{
// send event only if revision number has increased from the old cached version
CDPMessageBus.Current.SendObjectChangeEvent(thingObject, EventKind.Updated);
messageCounter++;
}
else if (dtoThing.RevisionNumber < cacheThingRevisionNumber)
{
if (this.Cache.TryGetValue(cacheId, out var cacheThing))
{
// send event if revision number is lower. That means that the original cached item was changed (revision was added!) CDPMessageBus.Current.SendObjectChangeEvent(cacheThing.Value, EventKind.Updated);
CDPMessageBus.Current.SendObjectChangeEvent(cacheThing.Value, EventKind.Updated);
messageCounter++;
}
}
}
}
}

Expand All @@ -232,6 +256,7 @@ public async Task Synchronize(IEnumerable<CDP4Common.DTO.Thing> dtoThings, bool

logger.Trace("Start Deleting things");
startwatch = Stopwatch.StartNew();

foreach (var markedThing in this.thingsMarkedForDeletion.Where(x => x.ChangeKind == ChangeKind.Delete))
{
this.RemoveThingFromCache(markedThing);
Expand Down Expand Up @@ -283,14 +308,15 @@ public async Task Synchronize(IEnumerable<CDP4Common.DTO.Thing> dtoThings, bool

/// <summary>
/// For each DTO that is coming from the data-source, create a clone of the associated cached POCO
/// and store this clone in the <see cref="Thing.Revisions"/> dictionary
/// and store this clone in the <see cref="Thing.Revisions"/> dictionary if it is a newer revision.
/// For older revisions, store the revision in the cached POCO's <see cref="Thing.Revisions"/> property'
/// </summary>
/// <param name="dtoThings">
/// the DTO's coming from the data-source
/// </param>
/// <remarks>
/// If the revision of the DTO is smaller that the revision of the the cached POCO, it is a DTO that represents
/// the state of a DTO from the past.
/// the state of a DTO from the past. It will be added to the cached POCO's <see cref="Thing.Revisions"/> property
/// </remarks>
private void UpdateThingRevisions(IEnumerable<CDP4Common.DTO.Thing> dtoThings)
{
Expand Down Expand Up @@ -332,6 +358,8 @@ private void UpdateThingRevisions(IEnumerable<CDP4Common.DTO.Thing> dtoThings)
{
if (!currentThing.Revisions.ContainsKey(dto.RevisionNumber))
{
this.olderRevisionDtos.Add(dto);

//Add the found DTO to the currentThing's Revisions property
var poco = dto.InstantiatePoco(this.Cache, this.IDalUri);
PocoThingFactory.ResolveDependencies(dto, poco);
Expand Down Expand Up @@ -721,8 +749,7 @@ private void RecursivelyMarksForRemoval(Thing thingToRemove)
/// <returns>True if the operation succeeded</returns>
private bool RemoveThingFromCache(Thing thingToRemove)
{
Lazy<Thing> outLazy;
var succeed = this.Cache.TryRemove(thingToRemove.CacheKey, out outLazy);
var succeed = this.Cache.TryRemove(thingToRemove.CacheKey, out var outLazy);
if (succeed)
{
if (outLazy.Value is Relationship relationship)
Expand Down Expand Up @@ -753,6 +780,7 @@ private void AddOrUpdateTheCache(IEnumerable<CDP4Common.DTO.Thing> dtoThings)
}

var cacheKey = new CacheKey(dto.Iid, dto.IterationContainerId);

this.Cache.AddOrUpdate(cacheKey, new Lazy<Thing>(() => dto.InstantiatePoco(this.Cache, this.IDalUri)), (key, oldValue) => oldValue);
}
}
Expand Down