From c19f646dd6f96f55e4c7c44e6e97d571b78af35b Mon Sep 17 00:00:00 2001 From: Dzmitry Safarau Date: Sun, 13 Oct 2019 13:25:00 +0300 Subject: [PATCH] Fixed styling issues according to code review. --- src/Equinox.Core/Cache.fs | 20 ++++++++--------- src/Equinox.Core/Equinox.Core.fsproj | 2 +- src/Equinox.Cosmos/Cosmos.fs | 32 ++++++++++++---------------- src/Equinox.EventStore/EventStore.fs | 28 +++++++++++------------- 4 files changed, 36 insertions(+), 46 deletions(-) diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index dc83b027a..74f6a860d 100644 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -25,34 +25,32 @@ type ICache = abstract member TryGet: key: string -> Async<(StreamToken * 'state) option> namespace Equinox + open System.Runtime.Caching open Equinox.Core + type Cache(name, sizeMb : int) = let cache = let config = System.Collections.Specialized.NameValueCollection(1) config.Add("cacheMemoryLimitMegabytes", string sizeMb); new MemoryCache(name, config) - let getPolicy (cacheItemOption: CacheItemOptions)= + let getPolicy (cacheItemOption: CacheItemOptions) = match cacheItemOption with | AbsoluteExpiration absolute -> new CacheItemPolicy(AbsoluteExpiration = absolute) | RelativeExpiration relative -> new CacheItemPolicy(SlidingExpiration = relative) interface ICache with - - member this.UpdateIfNewer cacheItemOptions key entry = + member this.UpdateIfNewer cacheItemOptions key entry = async { let policy = getPolicy cacheItemOptions match cache.AddOrGetExisting(key, box entry, policy) with - | null -> - async.Return () + | null -> () | :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry - async.Return () - | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x + | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x } - member this.TryGet key = - async.Return ( + member this.TryGet key = async { + return match cache.Get key with | null -> None | :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value - | x -> failwithf "TryGet Incompatible cache entry %A" x - ) + | x -> failwithf "TryGet Incompatible cache entry %A" x } diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj index 4d1e371cc..6e68d95d7 100644 --- a/src/Equinox.Core/Equinox.Core.fsproj +++ b/src/Equinox.Core/Equinox.Core.fsproj @@ -29,7 +29,7 @@ - + \ No newline at end of file diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index b0da359d6..6eb781e34 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -775,7 +775,6 @@ open Microsoft.Azure.Documents open Serilog open System open System.Collections.Concurrent -open System.Runtime.Caching /// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts) type Connection(client: Client.DocumentClient, []?readRetryPolicy: IRetryPolicy, []?writeRetryPolicy) = @@ -893,11 +892,9 @@ module Caching = let! syncRes = inner.TrySync log (streamToken, state) events match syncRes with | SyncResult.Conflict resync -> return SyncResult.Conflict(interceptAsync resync stream) - | SyncResult.Written(token', state') - -> - let! intercepted = intercept stream (token', state') - return SyncResult.Written(intercepted) } - + | SyncResult.Written(token', state') -> + let! intercepted = intercept stream (token', state') + return SyncResult.Written(intercepted) } let applyCacheUpdatesWithSlidingExpiration (cache: ICache) @@ -905,9 +902,9 @@ module Caching = (slidingExpiration : TimeSpan) (category: ICategory<'event, 'state, Container*string>) : ICategory<'event, 'state, Container*string> = - let cacheEntryGenerator (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) + let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) let policy = CacheItemOptions.RelativeExpiration(slidingExpiration) - let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy (prefix + streamName) + let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName) CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ type private Folder<'event, 'state> @@ -918,17 +915,16 @@ type private Folder<'event, 'state> let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true interface ICategory<'event, 'state, Container*string> with member __.Load containerStream (log : ILogger): Async = async { - let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log - let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log - match readCache with + let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log + let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log + match readCache with + | None -> return batched + | Some (cache : ICache, prefix : string) -> + let! cacheItem = cache.TryGet(prefix + snd containerStream) + match cacheItem with | None -> return batched - | Some (cache : ICache, prefix : string) -> - let! cacheItem = cache.TryGet(prefix + snd containerStream) - match cacheItem with - | None -> return batched - | Some tokenAndState -> return! cached tokenAndState - } - member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list) + | Some tokenAndState -> return! cached tokenAndState } + member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list) : Async> = async { let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log) match res with diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index d1f2f0ec4..9147bd233 100644 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -444,8 +444,7 @@ type private Category<'event, 'state>(context : Context, codec : FsCodec.IUnionE module Caching = /// Forwards all state changes in all streams of an ICategory to a `tee` function type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, string>, tee : string -> StreamToken * 'state -> Async) = - let intercept streamName tokenAndState = - async{ + let intercept streamName tokenAndState = async { let! _ = tee streamName tokenAndState return tokenAndState } @@ -461,31 +460,28 @@ module Caching = | SyncResult.Conflict resync -> return SyncResult.Conflict (interceptAsync resync stream.name) | SyncResult.Written (token', state') -> return SyncResult.Written (token', state') } - let applyCacheUpdatesWithSlidingExpiration (cache: ICache) (prefix: string) (slidingExpiration : TimeSpan) (category: ICategory<'event, 'state, string>) : ICategory<'event, 'state, string> = - let cacheEntryGenerator (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) + let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes) let policy = CacheItemOptions.RelativeExpiration(slidingExpiration) - let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy (prefix + streamName) + let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName) CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, ?readCache) = - let loadAlgorithm streamName initial log = - async { - let! batched = category.Load fold initial streamName log - let cached token state = category.LoadFromToken fold state streamName token log - match readCache with + let loadAlgorithm streamName initial log = async { + let! batched = category.Load fold initial streamName log + let cached token state = category.LoadFromToken fold state streamName token log + match readCache with + | None -> return batched + | Some (cache : ICache, prefix : string) -> + let! cacheItem = cache.TryGet(prefix + streamName) + match cacheItem with | None -> return batched - | Some (cache : ICache, prefix : string) -> - let! cacheItem = cache.TryGet(prefix + streamName) - match cacheItem with - | None -> return batched - | Some (token, state) -> return! cached token state - } + | Some (token, state) -> return! cached token state } interface ICategory<'event, 'state, string> with member __.Load (streamName : string) (log : ILogger) : Async = loadAlgorithm streamName initial log