Skip to content

Commit

Permalink
feat(Stores): Make caching and access mandatory (#417)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Aug 3, 2023
1 parent 537b7e0 commit ff7688f
Show file tree
Hide file tree
Showing 27 changed files with 168 additions and 194 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Stores: `*Category.Resolve`: Replace `Resolve(sn, ?ResolveOption)` with `?load = LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
- Stores: `*Category` ctor: Add mandatory `name` argument, and `Name` property [#410](https://github.com/jet/equinox/pull/410)
- Stores: `*Category` ctor: Change `caching` to be last argument, to reflect that it is applied over the top [#410](https://github.com/jet/equinox/pull/410)
- Stores: `*Category` ctor: Change `caching` and `access` to be mandatory, adding `NoCaching` and `Unoptimized` modes to represent the former defaults [#417](https://github.com/jet/equinox/pull/417)
- `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.27.0` [#310](https://github.com/jet/equinox/pull/310)
- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
Expand Down
20 changes: 10 additions & 10 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ type Store(store) =
snapshot: ('event -> bool) * ('state -> 'event)): Category<'event, 'state, unit> =
match store with
| Store.Context.Memory store ->
Equinox.MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial)
MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial)
| Store.Context.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, caching)
let accessStrategy = if unfolds then CosmosStore.AccessStrategy.Snapshot snapshot else CosmosStore.AccessStrategy.Unoptimized
CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, caching)
| Store.Context.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.DynamoStore.AccessStrategy.Snapshot snapshot else Equinox.DynamoStore.AccessStrategy.Unoptimized
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, caching)
let accessStrategy = if unfolds then DynamoStore.AccessStrategy.Snapshot snapshot else DynamoStore.AccessStrategy.Unoptimized
DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, caching)
| Store.Context.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStoreDb.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, name, codec, fold, initial, ?access = accessStrategy, ?caching = caching)
let accessStrategy = if unfolds then EventStoreDb.AccessStrategy.RollingSnapshots snapshot else EventStoreDb.AccessStrategy.Unoptimized
EventStoreDb.EventStoreCategory<'event,'state,_>(context, name, codec, fold, initial, accessStrategy, caching)
| Store.Context.Sql (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, name, codec, fold, initial, ?access = accessStrategy, ?caching = caching)
let accessStrategy = if unfolds then SqlStreamStore.AccessStrategy.RollingSnapshots snapshot else SqlStreamStore.AccessStrategy.Unoptimized
SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, name, codec, fold, initial, accessStrategy, caching)
| Store.Context.Mdb (context, caching) ->
Equinox.MessageDb.MessageDbCategory<'event,'state,_>(context, name, codec, fold, initial, ?caching = caching)
MessageDb.MessageDbCategory<'event,'state,_>(context, name, codec, fold, initial, MessageDb.AccessStrategy.Unoptimized, caching)

type ServiceBuilder(storageConfig, handlerLog) =
let store = Store storageConfig
Expand Down
19 changes: 10 additions & 9 deletions samples/Infrastructure/Store.fs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
module Samples.Infrastructure.Store

open Argu
open Equinox
open Serilog
open System

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Context =
// For MemoryStore, we keep the events as UTF8 arrays - we could use FsCodec.Codec.Box to remove the JSON encoding, which would improve perf but can conceal problems
| Memory of Equinox.MemoryStore.VolatileStore<ReadOnlyMemory<byte>>
| Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.CosmosStore.CachingStrategy * unfolds: bool
| Dynamo of Equinox.DynamoStore.DynamoStoreContext * Equinox.DynamoStore.CachingStrategy * unfolds: bool
| Es of Equinox.EventStoreDb.EventStoreContext * Equinox.CachingStrategy option * unfolds: bool
| Mdb of Equinox.MessageDb.MessageDbContext * Equinox.CachingStrategy option
| Sql of Equinox.SqlStreamStore.SqlStreamStoreContext * Equinox.CachingStrategy option * unfolds: bool
| Memory of MemoryStore.VolatileStore<ReadOnlyMemory<byte>>
| Cosmos of CosmosStore.CosmosStoreContext * CachingStrategy * unfolds: bool
| Dynamo of DynamoStore.DynamoStoreContext * CachingStrategy * unfolds: bool
| Es of EventStoreDb.EventStoreContext * CachingStrategy * unfolds: bool
| Mdb of MessageDb.MessageDbContext * CachingStrategy
| Sql of SqlStreamStore.SqlStreamStoreContext * CachingStrategy * unfolds: bool

module MemoryStore =
type [<NoEquality; NoComparison>] Parameters =
Expand Down Expand Up @@ -258,15 +259,15 @@ module EventStore =
let timeout = a.Timeout
log.Information("EventStoreDB {connectionString} {timeout}s", a.ConnectionString, timeout.TotalSeconds)
let connection = connect a.ConnectionString a.Credentials timeout
let cacheStrategy = cache |> Option.map (fun c -> Equinox.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.))
let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
Context.Es (EventStoreContext(connection, batchSize = a.BatchSize), cacheStrategy, unfolds)

// see https://github.com/jet/equinox#provisioning-mssql
module Sql =

open Equinox.SqlStreamStore

let cacheStrategy cache = cache |> Option.map (fun c -> Equinox.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.))
let cacheStrategy = function Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
module Ms =
type [<NoEquality; NoComparison>] Parameters =
| [<AltCommandLine "-c"; Mandatory>] ConnectionString of string
Expand Down Expand Up @@ -366,5 +367,5 @@ module MessageDb =
let config (log : ILogger) cache (p : ParseResults<Parameters>) =
let a = Arguments(p)
let connection = connect log a.ConnectionString
let cache = cache |> Option.map (fun c -> Equinox.CachingStrategy.SlidingWindow(c, TimeSpan.FromMinutes 20.))
let cache = match cache with Some c -> CachingStrategy.SlidingWindow(c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
Context.Mdb (MessageDbContext(connection, batchSize = a.BatchSize), cache)
8 changes: 4 additions & 4 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ let codec = Cart.Events.codec
let codecJe = Cart.Events.codecJe

let categoryGesStreamWithRollingSnapshots context =
EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot)
EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial, EventStoreDb.AccessStrategy.RollingSnapshots snapshot, CachingStrategy.NoCaching)
let categoryGesStreamWithoutCustomAccessStrategy context =
EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial)
EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial, EventStoreDb.AccessStrategy.Unoptimized, CachingStrategy.NoCaching)

let categoryCosmosStreamWithSnapshotStrategy context =
CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CosmosStore.CachingStrategy.NoCaching)
CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CachingStrategy.NoCaching)
let categoryCosmosStreamWithoutCustomAccessStrategy context =
CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CosmosStore.CachingStrategy.NoCaching)
CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CachingStrategy.NoCaching)

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Cart.Service) count =
service.ExecuteManyAsync(cartId, false, seq {
Expand Down
10 changes: 5 additions & 5 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ let Category = ContactPreferences.Category
let codec = ContactPreferences.Events.codec
let codecJe = ContactPreferences.Events.codecJe
let categoryGesWithOptimizedStorageSemantics context =
EventStoreDb.EventStoreCategory(context 1, Category, codec, fold, initial, access = EventStoreDb.AccessStrategy.LatestKnownEvent)
EventStoreDb.EventStoreCategory(context 1, Category, codec, fold, initial, EventStoreDb.AccessStrategy.LatestKnownEvent, CachingStrategy.NoCaching)
let categoryGesWithoutAccessStrategy context =
EventStoreDb.EventStoreCategory(context defaultBatchSize, Category, codec, fold, initial)
EventStoreDb.EventStoreCategory(context defaultBatchSize, Category, codec, fold, initial, EventStoreDb.AccessStrategy.Unoptimized, CachingStrategy.NoCaching)

let categoryCosmosWithLatestKnownEventSemantics context =
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.LatestKnownEvent, CosmosStore.CachingStrategy.NoCaching)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.LatestKnownEvent, CachingStrategy.NoCaching)
let categoryCosmosUnoptimized context =
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CosmosStore.CachingStrategy.NoCaching)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CachingStrategy.NoCaching)
let categoryCosmosRollingUnfolds context =
let access = CosmosStore.AccessStrategy.Custom(ContactPreferences.Fold.isOrigin, ContactPreferences.Fold.transmute)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CosmosStore.CachingStrategy.NoCaching)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CachingStrategy.NoCaching)

type Tests(testOutputHelper) =
let testOutput = TestOutput testOutputHelper
Expand Down
8 changes: 4 additions & 4 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,26 @@ let createServiceMemory log store =
let codec = Favorites.Events.codec
let codecJe = Favorites.Events.codecJe
let createServiceGes log context =
EventStoreDb.EventStoreCategory(context, Category, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot)
EventStoreDb.EventStoreCategory(context, Category, codec, fold, initial, EventStoreDb.AccessStrategy.RollingSnapshots snapshot, CachingStrategy.NoCaching)
|> Decider.forStream log
|> Favorites.create

let createServiceCosmosSnapshotsUncached log context =
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CosmosStore.CachingStrategy.NoCaching)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CachingStrategy.NoCaching)
|> Decider.forStream log
|> Favorites.create

let createServiceCosmosRollingStateUncached log context =
let access = CosmosStore.AccessStrategy.RollingState Favorites.Fold.snapshot
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CosmosStore.CachingStrategy.NoCaching)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CachingStrategy.NoCaching)
|> Decider.forStream log
|> Favorites.create

let createServiceCosmosUnoptimizedButCached log context =
let access = CosmosStore.AccessStrategy.Unoptimized
let caching =
let cache = Cache ("name", 10)
CosmosStore.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, caching)
|> Decider.forStream log
|> Favorites.create
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/AsAt.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ module Cosmos =
let connector = CosmosStoreConnector(discovery, TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., Microsoft.Azure.Cosmos.ConnectionMode.Gateway)
let storeClient = CosmosStoreClient.Connect(connector.CreateAndInitialize, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER") |> Async.RunSynchronously
let context = CosmosStoreContext(storeClient, tipMaxEvents = 10)
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let accessStrategy = AccessStrategy.Snapshot (Fold.isValid,Fold.snapshot)
let cat = CosmosStoreCategory(context, Category, Events.codecJe, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
let resolve = Equinox.Decider.forStream Log.log cat
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Cosmos.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ module Favorites =
open Equinox.CosmosStore // Everything outside of this module is completely storage agnostic so can be unit tested simply and/or bound to any store
let accessStrategy = AccessStrategy.Unoptimized // Or Snapshot etc https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#access-strategies
let category (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)

let [<Literal>] appName = "equinox-tutorial"
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/FulfilmentCenter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ module Store =
let storeClient = CosmosStoreClient.Connect(connector.CreateAndInitialize, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER") |> Async.RunSynchronously
let context = CosmosStoreContext(storeClient, tipMaxEvents = 256)
let cache = Equinox.Cache(appName, 20)
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching

open FulfilmentCenter

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Gapless.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ module Cosmos =

open Equinox.CosmosStore
let private category (context, cache, accessStrategy) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)

module Snapshot =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Index.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ module Cosmos =

open Equinox.CosmosStore
let category (context,cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let accessStrategy = AccessStrategy.RollingState Fold.snapshot
CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Sequence.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ module Cosmos =

open Equinox.CosmosStore
let private create (context, cache, accessStrategy) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)

module LatestKnownEvent =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Set.fs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ module Cosmos =

open Equinox.CosmosStore
let category (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let accessStrategy = AccessStrategy.RollingState Fold.snapshot
CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)

Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Todo.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ module Store =
let connector = CosmosStoreConnector(discovery, TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5.)
let storeClient = CosmosStoreClient.Connect(connector.CreateAndInitialize, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER") |> Async.RunSynchronously
let context = CosmosStoreContext(storeClient, tipMaxEvents = 100) // Keep up to 100 events in tip before moving events to a new document
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)

let access = AccessStrategy.Snapshot (isOrigin,snapshot)
let category = CosmosStoreCategory(context, Category, codec, fold, initial, access, cacheStrategy)
Expand Down
4 changes: 2 additions & 2 deletions samples/Tutorial/Upload.fs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ module Cosmos =

open Equinox.CosmosStore
let category (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
CosmosStoreCategory(context, Category, Events.codecJe, Fold.fold, Fold.initial, AccessStrategy.LatestKnownEvent, cacheStrategy)

module EventStore =
open Equinox.EventStoreDb
let category context =
EventStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, access = AccessStrategy.LatestKnownEvent)
EventStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, AccessStrategy.LatestKnownEvent, Equinox.CachingStrategy.NoCaching)
Loading

0 comments on commit ff7688f

Please sign in to comment.