diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs
index dc83b027a..74f6a860d 100644
--- a/src/Equinox.Core/Cache.fs
+++ b/src/Equinox.Core/Cache.fs
@@ -25,34 +25,32 @@ type ICache =
abstract member TryGet: key: string -> Async<(StreamToken * 'state) option>
namespace Equinox
+
open System.Runtime.Caching
open Equinox.Core
+
type Cache(name, sizeMb : int) =
let cache =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
new MemoryCache(name, config)
- let getPolicy (cacheItemOption: CacheItemOptions)=
+ let getPolicy (cacheItemOption: CacheItemOptions) =
match cacheItemOption with
| AbsoluteExpiration absolute -> new CacheItemPolicy(AbsoluteExpiration = absolute)
| RelativeExpiration relative -> new CacheItemPolicy(SlidingExpiration = relative)
interface ICache with
-
- member this.UpdateIfNewer cacheItemOptions key entry =
+ member this.UpdateIfNewer cacheItemOptions key entry = async {
let policy = getPolicy cacheItemOptions
match cache.AddOrGetExisting(key, box entry, policy) with
- | null ->
- async.Return ()
+ | null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
- async.Return ()
- | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
+ | x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x }
- member this.TryGet key =
- async.Return (
+ member this.TryGet key = async {
+ return
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
- | x -> failwithf "TryGet Incompatible cache entry %A" x
- )
+ | x -> failwithf "TryGet Incompatible cache entry %A" x }
diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj
index 4d1e371cc..6e68d95d7 100644
--- a/src/Equinox.Core/Equinox.Core.fsproj
+++ b/src/Equinox.Core/Equinox.Core.fsproj
@@ -29,7 +29,7 @@
-
+
\ No newline at end of file
diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs
index b0da359d6..6eb781e34 100644
--- a/src/Equinox.Cosmos/Cosmos.fs
+++ b/src/Equinox.Cosmos/Cosmos.fs
@@ -775,7 +775,6 @@ open Microsoft.Azure.Documents
open Serilog
open System
open System.Collections.Concurrent
-open System.Runtime.Caching
/// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts)
type Connection(client: Client.DocumentClient, []?readRetryPolicy: IRetryPolicy, []?writeRetryPolicy) =
@@ -893,11 +892,9 @@ module Caching =
let! syncRes = inner.TrySync log (streamToken, state) events
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict(interceptAsync resync stream)
- | SyncResult.Written(token', state')
- ->
- let! intercepted = intercept stream (token', state')
- return SyncResult.Written(intercepted) }
-
+ | SyncResult.Written(token', state') ->
+ let! intercepted = intercept stream (token', state')
+ return SyncResult.Written(intercepted) }
let applyCacheUpdatesWithSlidingExpiration
(cache: ICache)
@@ -905,9 +902,9 @@ module Caching =
(slidingExpiration : TimeSpan)
(category: ICategory<'event, 'state, Container*string>)
: ICategory<'event, 'state, Container*string> =
- let cacheEntryGenerator (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
+ let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let policy = CacheItemOptions.RelativeExpiration(slidingExpiration)
- let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy (prefix + streamName)
+ let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _
type private Folder<'event, 'state>
@@ -918,17 +915,16 @@ type private Folder<'event, 'state>
let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true
interface ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger): Async = async {
- let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
- let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
- match readCache with
+ let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
+ let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
+ match readCache with
+ | None -> return batched
+ | Some (cache : ICache, prefix : string) ->
+ let! cacheItem = cache.TryGet(prefix + snd containerStream)
+ match cacheItem with
| None -> return batched
- | Some (cache : ICache, prefix : string) ->
- let! cacheItem = cache.TryGet(prefix + snd containerStream)
- match cacheItem with
- | None -> return batched
- | Some tokenAndState -> return! cached tokenAndState
- }
- member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
+ | Some tokenAndState -> return! cached tokenAndState }
+ member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
: Async> = async {
let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log)
match res with
diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs
index d1f2f0ec4..9147bd233 100644
--- a/src/Equinox.EventStore/EventStore.fs
+++ b/src/Equinox.EventStore/EventStore.fs
@@ -444,8 +444,7 @@ type private Category<'event, 'state>(context : Context, codec : FsCodec.IUnionE
module Caching =
/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, string>, tee : string -> StreamToken * 'state -> Async) =
- let intercept streamName tokenAndState =
- async{
+ let intercept streamName tokenAndState = async {
let! _ = tee streamName tokenAndState
return tokenAndState
}
@@ -461,31 +460,28 @@ module Caching =
| SyncResult.Conflict resync -> return SyncResult.Conflict (interceptAsync resync stream.name)
| SyncResult.Written (token', state') -> return SyncResult.Written (token', state') }
-
let applyCacheUpdatesWithSlidingExpiration
(cache: ICache)
(prefix: string)
(slidingExpiration : TimeSpan)
(category: ICategory<'event, 'state, string>)
: ICategory<'event, 'state, string> =
- let cacheEntryGenerator (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
+ let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let policy = CacheItemOptions.RelativeExpiration(slidingExpiration)
- let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy (prefix + streamName)
+ let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _
type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, ?readCache) =
- let loadAlgorithm streamName initial log =
- async {
- let! batched = category.Load fold initial streamName log
- let cached token state = category.LoadFromToken fold state streamName token log
- match readCache with
+ let loadAlgorithm streamName initial log = async {
+ let! batched = category.Load fold initial streamName log
+ let cached token state = category.LoadFromToken fold state streamName token log
+ match readCache with
+ | None -> return batched
+ | Some (cache : ICache, prefix : string) ->
+ let! cacheItem = cache.TryGet(prefix + streamName)
+ match cacheItem with
| None -> return batched
- | Some (cache : ICache, prefix : string) ->
- let! cacheItem = cache.TryGet(prefix + streamName)
- match cacheItem with
- | None -> return batched
- | Some (token, state) -> return! cached token state
- }
+ | Some (token, state) -> return! cached token state }
interface ICategory<'event, 'state, string> with
member __.Load (streamName : string) (log : ILogger) : Async =
loadAlgorithm streamName initial log