Skip to content

Commit

Permalink
#145 Define ICache, centralize implementation in Equinox.Core (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
DSilence authored and bartelink committed Oct 13, 2019
1 parent 4c1686c commit 9136c0a
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 102 deletions.
4 changes: 2 additions & 2 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ module Cosmos =
let conn = connector.Connect("equinox-tool", discovery) |> Async.RunSynchronously
let cacheStrategy =
if cache then
let c = Caching.Cache("equinox-tool", sizeMb = 50)
let c = Equinox.Cache("equinox-tool", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.)
else CachingStrategy.NoCaching
StorageConfig.Cosmos (createGateway conn batchSize, cacheStrategy, unfolds, dName, cName)
Expand Down Expand Up @@ -135,7 +135,7 @@ module EventStore =
let conn = connect storeLog (a.Host, heartbeatTimeout, concurrentOperationsLimit) a.Credentials operationThrottling |> Async.RunSynchronously
let cacheStrategy =
if cache then
let c = Caching.Cache("equinox-tool", sizeMb = 50)
let c = Equinox.Cache("equinox-tool", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
StorageConfig.Es ((createGateway conn batchSize), cacheStrategy, unfolds)
56 changes: 56 additions & 0 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
namespace Equinox.Core

open System

type CacheItemOptions =
| AbsoluteExpiration of DateTimeOffset
| RelativeExpiration of TimeSpan

[<AllowNullLiteral>]
type CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, supersedes: StreamToken -> StreamToken -> bool) =
let mutable currentToken, currentState = initialToken, initialState
member __.UpdateIfNewer(other: CacheEntry<'state>) =
lock __ <| fun () ->
let otherToken, otherState = other.Value
if otherToken |> supersedes currentToken then
currentToken <- otherToken
currentState <- otherState
member __.Value: StreamToken * 'state =
lock __ <| fun () ->
currentToken, currentState


type ICache =
abstract member UpdateIfNewer: cacheItemOptions:CacheItemOptions -> key: string -> CacheEntry<'state> -> Async<unit>
abstract member TryGet: key: string -> Async<(StreamToken * 'state) option>

namespace Equinox

open System.Runtime.Caching
open Equinox.Core

type Cache(name, sizeMb : int) =
let cache =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
new MemoryCache(name, config)

let getPolicy (cacheItemOption: CacheItemOptions) =
match cacheItemOption with
| AbsoluteExpiration absolute -> new CacheItemPolicy(AbsoluteExpiration = absolute)
| RelativeExpiration relative -> new CacheItemPolicy(SlidingExpiration = relative)

interface ICache with
member this.UpdateIfNewer cacheItemOptions key entry = async {
let policy = getPolicy cacheItemOptions
match cache.AddOrGetExisting(key, box entry, policy) with
| null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x }

member this.TryGet key = async {
return
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x }
3 changes: 3 additions & 0 deletions src/Equinox.Core/Equinox.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<Compile Include="Stream.fs" />
<Compile Include="Retry.fs" />
<Compile Include="AsyncCacheCell.fs" />
<Compile Include="Cache.fs" />
</ItemGroup>

<ItemGroup>
Expand All @@ -28,6 +29,8 @@

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<PackageReference Include="System.Runtime.Caching" Version="4.5.0" Condition=" '$(TargetFramework)' == 'netstandard2.0' "/>
<Reference Include="System.Runtime.Caching" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
</ItemGroup>

</Project>
67 changes: 20 additions & 47 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -874,62 +874,35 @@ type private Category<'event, 'state>(gateway : Gateway, codec : IUnionEncoder<'
| InternalSyncResult.Written token' -> return SyncResult.Written (token', state') }

module Caching =
open System.Runtime.Caching
[<AllowNullLiteral>]
type CacheEntry<'state>(initialToken : StreamToken, initialState :'state) =
let mutable currentToken, currentState = initialToken, initialState
member __.UpdateIfNewer (other : CacheEntry<'state>) =
lock __ <| fun () ->
let otherToken, otherState = other.Value
if otherToken |> Token.supersedes currentToken then
currentToken <- otherToken
currentState <- otherState
member __.Value : StreamToken * 'state =
lock __ <| fun () ->
currentToken, currentState

type Cache(name, sizeMb : int) =
let cache =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
new MemoryCache(name, config)
member __.UpdateIfNewer (policy : CacheItemPolicy) (key : string) entry =
match cache.AddOrGetExisting(key, box entry, policy) with
| null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
member __.TryGet (key : string) =
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x

/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, Container*string>, tee : string -> StreamToken * 'state -> unit) =
let intercept streamName tokenAndState =
tee streamName tokenAndState
tokenAndState
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, Container*string>, tee : string -> StreamToken * 'state -> Async<unit>) =
let intercept streamName tokenAndState = async {
let! _ = tee streamName tokenAndState
return tokenAndState }
let interceptAsync load streamName = async {
let! tokenAndState = load
return intercept streamName tokenAndState }
return! intercept streamName tokenAndState }
interface ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger) : Async<StreamToken * 'state> =
interceptAsync (inner.Load containerStream log) (snd containerStream)
member __.TrySync (log : ILogger) (Token.Unpack (_container,stream,_) as streamToken,state) (events : 'event list)
: Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync log (streamToken, state) events
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict (interceptAsync resync stream)
| SyncResult.Written (token', state') ->return SyncResult.Written (intercept stream (token', state')) }
| SyncResult.Conflict resync -> return SyncResult.Conflict(interceptAsync resync stream)
| SyncResult.Written(token', state') ->
let! intercepted = intercept stream (token', state')
return SyncResult.Written(intercepted) }

let applyCacheUpdatesWithSlidingExpiration
(cache: Cache)
(cache: ICache)
(prefix: string)
(slidingExpiration : TimeSpan)
(category: ICategory<'event, 'state, Container*string>)
: ICategory<'event, 'state, Container*string> =
let policy = new CacheItemPolicy(SlidingExpiration = slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = CacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let policy = CacheItemOptions.RelativeExpiration(slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

type private Folder<'event, 'state>
Expand All @@ -939,15 +912,15 @@ type private Folder<'event, 'state>
?readCache) =
let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true
interface ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger): Async<StreamToken * 'state> =
member __.Load containerStream (log : ILogger): Async<StreamToken * 'state> = async {
let batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
match readCache with
| None -> batched
| Some (cache : Caching.Cache, prefix : string) ->
match cache.TryGet(prefix + snd containerStream) with
| None -> batched
| Some tokenAndState -> cached tokenAndState
| None -> return! batched
| Some (cache : ICache, prefix : string) ->
match! cache.TryGet(prefix + snd containerStream) with
| None -> return! batched
| Some tokenAndState -> return! cached tokenAndState }
member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
: Async<SyncResult<'state>> = async {
let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log)
Expand Down Expand Up @@ -1003,7 +976,7 @@ type CachingStrategy =
/// Retain a single 'state per streamName, together with the associated etag
/// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
/// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in tip
| SlidingWindow of Caching.Cache * window: TimeSpan
| SlidingWindow of ICache * window: TimeSpan

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type AccessStrategy<'event,'state> =
Expand Down
2 changes: 0 additions & 2 deletions src/Equinox.Cosmos/Equinox.Cosmos.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
<PackageReference Include="FSharp.Control.AsyncSeq" Version="2.0.21" />
<PackageReference Include="Microsoft.Azure.DocumentDB" Version="2.0.0" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
<PackageReference Include="Microsoft.Azure.DocumentDB.Core" Version="2.0.0" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<PackageReference Include="System.Runtime.Caching" Version="4.5.0" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<Reference Include="System.Runtime.Caching" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
</ItemGroup>

</Project>
2 changes: 0 additions & 2 deletions src/Equinox.EventStore/Equinox.EventStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
<PackageReference Include="EventStore.Client" Version="5.0.1" />
<PackageReference Include="FsCodec" Version="1.0.0-pr.20.rc2.5" />
<PackageReference Include="FSharp.Control.AsyncSeq" Version="2.0.21" />
<PackageReference Include="System.Runtime.Caching" Version="4.5.0" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<Reference Include="System.Runtime.Caching" Condition=" '$(TargetFramework)' != 'netstandard2.0'" />
</ItemGroup>

</Project>
63 changes: 17 additions & 46 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -442,44 +442,14 @@ type private Category<'event, 'state>(context : Context, codec : FsCodec.IUnionE
return SyncResult.Written (token', fold state (Seq.ofList events)) }

module Caching =
open System.Runtime.Caching
[<AllowNullLiteral>]
type CacheEntry<'state>(initialToken : StreamToken, initialState :'state) =
let mutable currentToken, currentState = initialToken, initialState
member __.UpdateIfNewer (other : CacheEntry<'state>) =
lock __ <| fun () ->
let otherToken, otherState = other.Value
if otherToken |> Token.supersedes currentToken then
currentToken <- otherToken
currentState <- otherState
member __.Value : StreamToken * 'state =
lock __ <| fun () ->
currentToken, currentState

type Cache(name, sizeMb : int) =
let cache =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
new MemoryCache(name, config)
member __.UpdateIfNewer (policy : CacheItemPolicy) (key : string) entry =
match cache.AddOrGetExisting(key, box entry, policy) with
| null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
member __.TryGet (key : string) =
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x

/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, string>, tee : string -> StreamToken * 'state -> unit) =
let intercept streamName tokenAndState =
tee streamName tokenAndState
tokenAndState
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, string>, tee : string -> StreamToken * 'state -> Async<unit>) =
let intercept streamName tokenAndState = async {
let! _ = tee streamName tokenAndState
return tokenAndState }
let interceptAsync load streamName = async {
let! tokenAndState = load
return intercept streamName tokenAndState }
return! intercept streamName tokenAndState }
interface ICategory<'event, 'state, string> with
member __.Load (streamName : string) (log : ILogger) : Async<StreamToken * 'state> =
interceptAsync (inner.Load streamName log) streamName
Expand All @@ -490,25 +460,26 @@ module Caching =
| SyncResult.Written (token', state') -> return SyncResult.Written (token', state') }

let applyCacheUpdatesWithSlidingExpiration
(cache: Cache)
(cache: ICache)
(prefix: string)
(slidingExpiration : TimeSpan)
(category: ICategory<'event, 'state, string>)
: ICategory<'event, 'state, string> =
let policy = new CacheItemPolicy(SlidingExpiration = slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = CacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
let mkCacheEntry (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let policy = CacheItemOptions.RelativeExpiration(slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = mkCacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, ?readCache) =
let loadAlgorithm streamName initial log =
let loadAlgorithm streamName initial log = async {
let batched = category.Load fold initial streamName log
let cached token state = category.LoadFromToken fold state streamName token log
match readCache with
| None -> batched
| Some (cache : Caching.Cache, prefix : string) ->
match cache.TryGet(prefix + streamName) with
| None -> batched
| Some (token, state) -> cached token state
| None -> return! batched
| Some (cache : ICache, prefix : string) ->
match! cache.TryGet(prefix + streamName) with
| None -> return! batched
| Some (token, state) -> return! cached token state }
interface ICategory<'event, 'state, string> with
member __.Load (streamName : string) (log : ILogger) : Async<StreamToken * 'state> =
loadAlgorithm streamName initial log
Expand All @@ -520,9 +491,9 @@ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: '

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CachingStrategy =
| SlidingWindow of Caching.Cache * window: TimeSpan
| SlidingWindow of ICache * window: TimeSpan
/// Prefix is used to segregate multiple folds per stream when they are stored in the cache
| SlidingWindowPrefixed of Caching.Cache * window: TimeSpan * prefix: string
| SlidingWindowPrefixed of ICache * window: TimeSpan * prefix: string

type Resolver<'event,'state>
( context : Context, codec, fold, initial,
Expand Down
2 changes: 1 addition & 1 deletion tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ type Tests(testOutputHelper) =
let ``Can roundtrip against Cosmos, correctly using Snapshotting and Cache to avoid redundant reads`` context skuId = Async.RunSynchronously <| async {
let! conn = connectToSpecifiedCosmosOrSimulator log
let batchSize = 10
let cache = Caching.Cache("cart", sizeMb = 50)
let cache = Equinox.Cache("cart", sizeMb = 50)
let createServiceCached () = Cart.createServiceWithSnapshotStrategyAndCaching conn batchSize log cache
let service1, service2 = createServiceCached (), createServiceCached ()
capture.Clear()
Expand Down
4 changes: 2 additions & 2 deletions tests/Equinox.EventStore.Integration/EventStoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ type Tests(testOutputHelper) =
let log, capture = createLoggerWithCapture ()
let! conn = connectToLocalEventStoreNode log
let batchSize = 10
let cache = Caching.Cache("cart", sizeMb = 50)
let cache = Equinox.Cache("cart", sizeMb = 50)
let gateway = createGesGateway conn batchSize
let createServiceCached () = Cart.createServiceWithCaching log gateway cache
let service1, service2 = createServiceCached (), createServiceCached ()
Expand Down Expand Up @@ -277,7 +277,7 @@ type Tests(testOutputHelper) =
let batchSize = 10
let gateway = createGesGateway conn batchSize
let service1 = Cart.createServiceWithCompaction log gateway
let cache = Caching.Cache("cart", sizeMb = 50)
let cache = Equinox.Cache("cart", sizeMb = 50)
let gateway = createGesGateway conn batchSize
let service2 = Cart.createServiceWithCompactionAndCaching log gateway cache

Expand Down

0 comments on commit 9136c0a

Please sign in to comment.