From 9136c0a6b3fbc1c927c0b6641eb435826ba8bed1 Mon Sep 17 00:00:00 2001 From: Dzmitry Safarau Date: Sun, 13 Oct 2019 15:22:40 +0300 Subject: [PATCH] #145 Define ICache, centralize implementation in Equinox.Core (#161) --- samples/Infrastructure/Storage.fs | 4 +- src/Equinox.Core/Cache.fs | 56 ++++++++++++++++ src/Equinox.Core/Equinox.Core.fsproj | 3 + src/Equinox.Cosmos/Cosmos.fs | 67 ++++++------------- src/Equinox.Cosmos/Equinox.Cosmos.fsproj | 2 - .../Equinox.EventStore.fsproj | 2 - src/Equinox.EventStore/EventStore.fs | 63 +++++------------ .../CosmosIntegration.fs | 2 +- .../EventStoreIntegration.fs | 4 +- 9 files changed, 101 insertions(+), 102 deletions(-) create mode 100644 src/Equinox.Core/Cache.fs diff --git a/samples/Infrastructure/Storage.fs b/samples/Infrastructure/Storage.fs index 5f92df88d..909568dd1 100644 --- a/samples/Infrastructure/Storage.fs +++ b/samples/Infrastructure/Storage.fs @@ -76,7 +76,7 @@ module Cosmos = let conn = connector.Connect("equinox-tool", discovery) |> Async.RunSynchronously let cacheStrategy = if cache then - let c = Caching.Cache("equinox-tool", sizeMb = 50) + let c = Equinox.Cache("equinox-tool", sizeMb = 50) CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) else CachingStrategy.NoCaching StorageConfig.Cosmos (createGateway conn batchSize, cacheStrategy, unfolds, dName, cName) @@ -135,7 +135,7 @@ module EventStore = let conn = connect storeLog (a.Host, heartbeatTimeout, concurrentOperationsLimit) a.Credentials operationThrottling |> Async.RunSynchronously let cacheStrategy = if cache then - let c = Caching.Cache("equinox-tool", sizeMb = 50) + let c = Equinox.Cache("equinox-tool", sizeMb = 50) CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some else None StorageConfig.Es ((createGateway conn batchSize), cacheStrategy, unfolds) \ No newline at end of file diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs new file mode 100644 index 000000000..74f6a860d --- /dev/null +++ b/src/Equinox.Core/Cache.fs @@ -0,0 +1,56 @@ +namespace Equinox.Core + +open System + +type CacheItemOptions = + | AbsoluteExpiration of DateTimeOffset + | RelativeExpiration of TimeSpan + +[] +type CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, supersedes: StreamToken -> StreamToken -> bool) = + let mutable currentToken, currentState = initialToken, initialState + member __.UpdateIfNewer(other: CacheEntry<'state>) = + lock __ <| fun () -> + let otherToken, otherState = other.Value + if otherToken |> supersedes currentToken then + currentToken <- otherToken + currentState <- otherState + member __.Value: StreamToken * 'state = + lock __ <| fun () -> + currentToken, currentState + + +type ICache = + abstract member UpdateIfNewer: cacheItemOptions:CacheItemOptions -> key: string -> CacheEntry<'state> -> Async + abstract member TryGet: key: string -> Async<(StreamToken * 'state) option> + +namespace Equinox + +open System.Runtime.Caching +open Equinox.Core + +type Cache(name, sizeMb : int) = + let cache = + let config = System.Collections.Specialized.NameValueCollection(1) + config.Add("cacheMemoryLimitMegabytes", string sizeMb); + new MemoryCache(name, config) + + let getPolicy (cacheItemOption: CacheItemOptions) = + match cacheItemOption with + | AbsoluteExpiration absolute -> new CacheItemPolicy(AbsoluteExpiration = absolute) + | RelativeExpiration relative -> new CacheItemPolicy(SlidingExpiration = relative) + + interface ICache with + member this.UpdateIfNewer cacheItemOptions key entry = async { + let policy = getPolicy cacheItemOptions + match cache.AddOrGetExisting(key, box entry, policy) with + | null -> () + | :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry + | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x } + + member this.TryGet key = async { + return + match cache.Get key with + | null -> None + | :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value + | x -> failwithf "TryGet Incompatible cache entry %A" x } diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj index 5e0053ea3..e8c9ab819 100644 --- a/src/Equinox.Core/Equinox.Core.fsproj +++ b/src/Equinox.Core/Equinox.Core.fsproj @@ -16,6 +16,7 @@ + @@ -28,6 +29,8 @@ + + \ No newline at end of file diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index af1cf3b2e..7429e13b7 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -874,44 +874,14 @@ type private Category<'event, 'state>(gateway : Gateway, codec : IUnionEncoder<' | InternalSyncResult.Written token' -> return SyncResult.Written (token', state') } module Caching = - open System.Runtime.Caching - [] - type CacheEntry<'state>(initialToken : StreamToken, initialState :'state) = - let mutable currentToken, currentState = initialToken, initialState - member __.UpdateIfNewer (other : CacheEntry<'state>) = - lock __ <| fun () -> - let otherToken, otherState = other.Value - if otherToken |> Token.supersedes currentToken then - currentToken <- otherToken - currentState <- otherState - member __.Value : StreamToken * 'state = - lock __ <| fun () -> - currentToken, currentState - - type Cache(name, sizeMb : int) = - let cache = - let config = System.Collections.Specialized.NameValueCollection(1) - config.Add("cacheMemoryLimitMegabytes", string sizeMb); - new MemoryCache(name, config) - member __.UpdateIfNewer (policy : CacheItemPolicy) (key : string) entry = - match cache.AddOrGetExisting(key, box entry, policy) with - | null -> () - | :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry - | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x - member __.TryGet (key : string) = - match cache.Get key with - | null -> None - | :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value - | x -> failwithf "TryGet Incompatible cache entry %A" x - /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, Container*string>, tee : string -> StreamToken * 'state -> unit) = - let intercept streamName tokenAndState = - tee streamName tokenAndState - tokenAndState + type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, Container*string>, tee : string -> StreamToken * 'state -> Async) = + let intercept streamName tokenAndState = async { + let! _ = tee streamName tokenAndState + return tokenAndState } let interceptAsync load streamName = async { let! tokenAndState = load - return intercept streamName tokenAndState } + return! intercept streamName tokenAndState } interface ICategory<'event, 'state, Container*string> with member __.Load containerStream (log : ILogger) : Async = interceptAsync (inner.Load containerStream log) (snd containerStream) @@ -919,17 +889,20 @@ module Caching = : Async> = async { let! syncRes = inner.TrySync log (streamToken, state) events match syncRes with - | SyncResult.Conflict resync -> return SyncResult.Conflict (interceptAsync resync stream) - | SyncResult.Written (token', state') ->return SyncResult.Written (intercept stream (token', state')) } + | SyncResult.Conflict resync -> return SyncResult.Conflict(interceptAsync resync stream) + | SyncResult.Written(token', state') -> + let! intercepted = intercept stream (token', state') + return SyncResult.Written(intercepted) } let applyCacheUpdatesWithSlidingExpiration - (cache: Cache) + (cache: ICache) (prefix: string) (slidingExpiration : TimeSpan) (category: ICategory<'event, 'state, Container*string>) : ICategory<'event, 'state, Container*string> = - let policy = new CacheItemPolicy(SlidingExpiration = slidingExpiration) - let addOrUpdateSlidingExpirationCacheEntry streamName = CacheEntry >> cache.UpdateIfNewer policy (prefix + streamName) + let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) + let policy = CacheItemOptions.RelativeExpiration(slidingExpiration) + let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName) CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ type private Folder<'event, 'state> @@ -939,15 +912,15 @@ type private Folder<'event, 'state> ?readCache) = let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true interface ICategory<'event, 'state, Container*string> with - member __.Load containerStream (log : ILogger): Async = + member __.Load containerStream (log : ILogger): Async = async { let batched = category.Load inspectUnfolds containerStream fold initial isOrigin log let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log match readCache with - | None -> batched - | Some (cache : Caching.Cache, prefix : string) -> - match cache.TryGet(prefix + snd containerStream) with - | None -> batched - | Some tokenAndState -> cached tokenAndState + | None -> return! batched + | Some (cache : ICache, prefix : string) -> + match! cache.TryGet(prefix + snd containerStream) with + | None -> return! batched + | Some tokenAndState -> return! cached tokenAndState } member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list) : Async> = async { let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log) @@ -1003,7 +976,7 @@ type CachingStrategy = /// Retain a single 'state per streamName, together with the associated etag /// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to /// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in tip - | SlidingWindow of Caching.Cache * window: TimeSpan + | SlidingWindow of ICache * window: TimeSpan [] type AccessStrategy<'event,'state> = diff --git a/src/Equinox.Cosmos/Equinox.Cosmos.fsproj b/src/Equinox.Cosmos/Equinox.Cosmos.fsproj index c61897a8e..4efb4c175 100644 --- a/src/Equinox.Cosmos/Equinox.Cosmos.fsproj +++ b/src/Equinox.Cosmos/Equinox.Cosmos.fsproj @@ -29,8 +29,6 @@ - - \ No newline at end of file diff --git a/src/Equinox.EventStore/Equinox.EventStore.fsproj b/src/Equinox.EventStore/Equinox.EventStore.fsproj index 6d619a5f0..7814e6a83 100644 --- a/src/Equinox.EventStore/Equinox.EventStore.fsproj +++ b/src/Equinox.EventStore/Equinox.EventStore.fsproj @@ -28,8 +28,6 @@ - - \ No newline at end of file diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 0093cbe0c..df314b26c 100644 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -442,44 +442,14 @@ type private Category<'event, 'state>(context : Context, codec : FsCodec.IUnionE return SyncResult.Written (token', fold state (Seq.ofList events)) } module Caching = - open System.Runtime.Caching - [] - type CacheEntry<'state>(initialToken : StreamToken, initialState :'state) = - let mutable currentToken, currentState = initialToken, initialState - member __.UpdateIfNewer (other : CacheEntry<'state>) = - lock __ <| fun () -> - let otherToken, otherState = other.Value - if otherToken |> Token.supersedes currentToken then - currentToken <- otherToken - currentState <- otherState - member __.Value : StreamToken * 'state = - lock __ <| fun () -> - currentToken, currentState - - type Cache(name, sizeMb : int) = - let cache = - let config = System.Collections.Specialized.NameValueCollection(1) - config.Add("cacheMemoryLimitMegabytes", string sizeMb); - new MemoryCache(name, config) - member __.UpdateIfNewer (policy : CacheItemPolicy) (key : string) entry = - match cache.AddOrGetExisting(key, box entry, policy) with - | null -> () - | :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry - | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x - member __.TryGet (key : string) = - match cache.Get key with - | null -> None - | :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value - | x -> failwithf "TryGet Incompatible cache entry %A" x - /// Forwards all state changes in all streams of an ICategory to a `tee` function - type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, string>, tee : string -> StreamToken * 'state -> unit) = - let intercept streamName tokenAndState = - tee streamName tokenAndState - tokenAndState + type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, string>, tee : string -> StreamToken * 'state -> Async) = + let intercept streamName tokenAndState = async { + let! _ = tee streamName tokenAndState + return tokenAndState } let interceptAsync load streamName = async { let! tokenAndState = load - return intercept streamName tokenAndState } + return! intercept streamName tokenAndState } interface ICategory<'event, 'state, string> with member __.Load (streamName : string) (log : ILogger) : Async = interceptAsync (inner.Load streamName log) streamName @@ -490,25 +460,26 @@ module Caching = | SyncResult.Written (token', state') -> return SyncResult.Written (token', state') } let applyCacheUpdatesWithSlidingExpiration - (cache: Cache) + (cache: ICache) (prefix: string) (slidingExpiration : TimeSpan) (category: ICategory<'event, 'state, string>) : ICategory<'event, 'state, string> = - let policy = new CacheItemPolicy(SlidingExpiration = slidingExpiration) - let addOrUpdateSlidingExpirationCacheEntry streamName = CacheEntry >> cache.UpdateIfNewer policy (prefix + streamName) + let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) + let policy = CacheItemOptions.RelativeExpiration(slidingExpiration) + let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName) CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, ?readCache) = - let loadAlgorithm streamName initial log = + let loadAlgorithm streamName initial log = async { let batched = category.Load fold initial streamName log let cached token state = category.LoadFromToken fold state streamName token log match readCache with - | None -> batched - | Some (cache : Caching.Cache, prefix : string) -> - match cache.TryGet(prefix + streamName) with - | None -> batched - | Some (token, state) -> cached token state + | None -> return! batched + | Some (cache : ICache, prefix : string) -> + match! cache.TryGet(prefix + streamName) with + | None -> return! batched + | Some (token, state) -> return! cached token state } interface ICategory<'event, 'state, string> with member __.Load (streamName : string) (log : ILogger) : Async = loadAlgorithm streamName initial log @@ -520,9 +491,9 @@ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: ' [] type CachingStrategy = - | SlidingWindow of Caching.Cache * window: TimeSpan + | SlidingWindow of ICache * window: TimeSpan /// Prefix is used to segregate multiple folds per stream when they are stored in the cache - | SlidingWindowPrefixed of Caching.Cache * window: TimeSpan * prefix: string + | SlidingWindowPrefixed of ICache * window: TimeSpan * prefix: string type Resolver<'event,'state> ( context : Context, codec, fold, initial, diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index d27a9cf19..f12b35c22 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -344,7 +344,7 @@ type Tests(testOutputHelper) = let ``Can roundtrip against Cosmos, correctly using Snapshotting and Cache to avoid redundant reads`` context skuId = Async.RunSynchronously <| async { let! conn = connectToSpecifiedCosmosOrSimulator log let batchSize = 10 - let cache = Caching.Cache("cart", sizeMb = 50) + let cache = Equinox.Cache("cart", sizeMb = 50) let createServiceCached () = Cart.createServiceWithSnapshotStrategyAndCaching conn batchSize log cache let service1, service2 = createServiceCached (), createServiceCached () capture.Clear() diff --git a/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs b/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs index 1125a327c..3199ed8eb 100644 --- a/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs +++ b/tests/Equinox.EventStore.Integration/EventStoreIntegration.fs @@ -247,7 +247,7 @@ type Tests(testOutputHelper) = let log, capture = createLoggerWithCapture () let! conn = connectToLocalEventStoreNode log let batchSize = 10 - let cache = Caching.Cache("cart", sizeMb = 50) + let cache = Equinox.Cache("cart", sizeMb = 50) let gateway = createGesGateway conn batchSize let createServiceCached () = Cart.createServiceWithCaching log gateway cache let service1, service2 = createServiceCached (), createServiceCached () @@ -277,7 +277,7 @@ type Tests(testOutputHelper) = let batchSize = 10 let gateway = createGesGateway conn batchSize let service1 = Cart.createServiceWithCompaction log gateway - let cache = Caching.Cache("cart", sizeMb = 50) + let cache = Equinox.Cache("cart", sizeMb = 50) let gateway = createGesGateway conn batchSize let service2 = Cart.createServiceWithCompactionAndCaching log gateway cache