Skip to content

Commit

Permalink
feat!(Stores): Category ctors take name + caching argument goes last (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Jul 26, 2023
1 parent 602a64c commit ef1ca1e
Show file tree
Hide file tree
Showing 37 changed files with 228 additions and 261 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Performance: Switch surface APIs to `struct` Tuples and Options where relevant, some due to `struct` changes in [`FsCodec` #82](https://github.com/jet/FsCodec/pull/82), and use `task` in hot paths [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308)
- `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
- `Equinox.*.*Category`: Added mandatory `name` argument, and `Name` property [#410](https://github.com/jet/equinox/pull/410)
- `Equinox.*.*Category`: Changed caching to be last argument, to reflect that it is applied over the top [#410](https://github.com/jet/equinox/pull/410)
- `Equinox.LoadOption`: Rename `AllowStale` to `AnyCachedValue` [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Decider`: `log` is now supplied via `Equinox.Category` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: Replaced `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
Expand Down
27 changes: 14 additions & 13 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,50 +8,51 @@ open System

type Store(store) =
member _.Category
( codec: FsCodec.IEventCodec<'event, ReadOnlyMemory<byte>, unit>,
( name,
codec: FsCodec.IEventCodec<'event, ReadOnlyMemory<byte>, unit>,
fold: 'state -> 'event seq -> 'state,
initial: 'state,
snapshot: ('event -> bool) * ('state -> 'event)): Category<'event, 'state, unit> =
match store with
| Store.Context.Memory store ->
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial)
Equinox.MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial)
| Store.Context.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy)
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, caching)
| Store.Context.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.DynamoStore.AccessStrategy.Snapshot snapshot else Equinox.DynamoStore.AccessStrategy.Unoptimized
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, caching, accessStrategy)
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, caching)
| Store.Context.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStoreDb.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy)
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, name, codec, fold, initial, ?access = accessStrategy, ?caching = caching)
| Store.Context.Sql (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy)
Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, name, codec, fold, initial, ?access = accessStrategy, ?caching = caching)
| Store.Context.Mdb (context, caching) ->
Equinox.MessageDb.MessageDbCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching)
Equinox.MessageDb.MessageDbCategory<'event,'state,_>(context, name, codec, fold, initial, ?caching = caching)

type ServiceBuilder(storageConfig, handlerLog) =
let store = Store storageConfig

member _.CreateFavoritesService() =
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin, Favorites.Fold.snapshot
store.Category(Favorites.Events.codec, fold, initial, snapshot)
|> Decider.resolve handlerLog
store.Category(Favorites.Category, Favorites.Events.codec, fold, initial, snapshot)
|> Decider.forStream handlerLog
|> Favorites.create

member _.CreateSaveForLaterService() =
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
let snapshot = SavedForLater.Fold.isOrigin, SavedForLater.Fold.compact
store.Category(SavedForLater.Events.codec, fold, initial, snapshot)
|> Decider.resolve handlerLog
store.Category(SavedForLater.Category, SavedForLater.Events.codec, fold, initial, snapshot)
|> Decider.forStream handlerLog
|> SavedForLater.create 50

member _.CreateTodosService() =
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot
store.Category(TodoBackend.Events.codec, fold, initial, snapshot)
|> Decider.resolve handlerLog
store.Category(TodoBackend.Category, TodoBackend.Events.codec, fold, initial, snapshot)
|> Decider.forStream handlerLog
|> TodoBackend.create

let register (services : IServiceCollection, storageConfig, handlerLog) =
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,4 @@ type Service internal (resolve: CartId -> Equinox.Decider<Events.Event, Fold.Sta
decider.Query(id, Equinox.LoadOption.AnyCachedValue)

let create resolve =
Service(streamId >> resolve Category)
Service(streamId >> resolve)
2 changes: 1 addition & 1 deletion samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S
decider.Query(id, Equinox.AnyCachedValue)

let create resolve =
Service(streamId >> resolve Category)
Service(streamId >> resolve)
2 changes: 1 addition & 1 deletion samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S
decider.TransactEx((fun c -> (), decideUnfavorite sku c.State), fun () c -> c.Version)

let create resolve =
Service(streamId >> resolve Category)
Service(streamId >> resolve)
2 changes: 1 addition & 1 deletion samples/Store/Domain/SavedForLater.fs
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,4 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S
return! execute targetId (Merge state) }

let create maxSavedItems resolve =
Service(streamId >> resolve Category, maxSavedItems)
Service(streamId >> resolve, maxSavedItems)
16 changes: 8 additions & 8 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot

let createMemoryStore () = MemoryStore.VolatileStore<ReadOnlyMemory<byte>>()
let createServiceMemory log store =
MemoryStore.MemoryStoreCategory(store, Cart.Events.codec, fold, initial)
|> Decider.resolve log
MemoryStore.MemoryStoreCategory(store, Cart.Category, Cart.Events.codec, fold, initial)
|> Decider.forStream log
|> Cart.create

let codec = Cart.Events.codec
let codecJe = Cart.Events.codecJe

let categoryGesStreamWithRollingSnapshots context =
EventStoreDb.EventStoreCategory(context, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot)
EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot)
let categoryGesStreamWithoutCustomAccessStrategy context =
EventStoreDb.EventStoreCategory(context, codec, fold, initial)
EventStoreDb.EventStoreCategory(context, Cart.Category, codec, fold, initial)

let categoryCosmosStreamWithSnapshotStrategy context =
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot)
CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CosmosStore.CachingStrategy.NoCaching)
let categoryCosmosStreamWithoutCustomAccessStrategy context =
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized)
CosmosStore.CosmosStoreCategory(context, Cart.Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CosmosStore.CachingStrategy.NoCaching)

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Cart.Service) count =
service.ExecuteManyAsync(cartId, false, seq {
Expand Down Expand Up @@ -56,7 +56,7 @@ type Tests(testOutputHelper) =
let arrangeEs connect choose createCategory = async {
let client = connect log
let context = choose client defaultBatchSize
return Cart.create (createCategory context |> Decider.resolve log) }
return Cart.create (createCategory context |> Decider.forStream log) }

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = async {
Expand All @@ -72,7 +72,7 @@ type Tests(testOutputHelper) =

let arrangeCosmos connect createCategory =
let context : CosmosStore.CosmosStoreContext = connect log defaultQueryMaxItems
Cart.create (createCategory context |> Decider.resolve log)
Cart.create (createCategory context |> Decider.forStream log)

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = async {
Expand Down
19 changes: 10 additions & 9 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@ let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initia

let createMemoryStore () = MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
MemoryStore.MemoryStoreCategory(store, FsCodec.Box.Codec.Create(), fold, initial)
|> Decider.resolve log
MemoryStore.MemoryStoreCategory(store, ContactPreferences.Category, FsCodec.Box.Codec.Create(), fold, initial)
|> Decider.forStream log
|> ContactPreferences.create

let Category = ContactPreferences.Category
let codec = ContactPreferences.Events.codec
let codecJe = ContactPreferences.Events.codecJe
let categoryGesWithOptimizedStorageSemantics context =
EventStoreDb.EventStoreCategory(context 1, codec, fold, initial, access = EventStoreDb.AccessStrategy.LatestKnownEvent)
EventStoreDb.EventStoreCategory(context 1, Category, codec, fold, initial, access = EventStoreDb.AccessStrategy.LatestKnownEvent)
let categoryGesWithoutAccessStrategy context =
EventStoreDb.EventStoreCategory(context defaultBatchSize, codec, fold, initial)
EventStoreDb.EventStoreCategory(context defaultBatchSize, Category, codec, fold, initial)

let categoryCosmosWithLatestKnownEventSemantics context =
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.LatestKnownEvent)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.LatestKnownEvent, CosmosStore.CachingStrategy.NoCaching)
let categoryCosmosUnoptimized context =
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Unoptimized, CosmosStore.CachingStrategy.NoCaching)
let categoryCosmosRollingUnfolds context =
let access = CosmosStore.AccessStrategy.Custom(ContactPreferences.Fold.isOrigin, ContactPreferences.Fold.transmute)
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, access)
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CosmosStore.CachingStrategy.NoCaching)

type Tests(testOutputHelper) =
let testOutput = TestOutput testOutputHelper
Expand All @@ -48,7 +49,7 @@ type Tests(testOutputHelper) =
let arrangeEs connect choose createCategory = async {
let client = connect log
let context = choose client
return ContactPreferences.create (createCategory context |> Decider.resolve log) }
return ContactPreferences.create (createCategory context |> Decider.forStream log) }

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with normal semantics`` args = async {
Expand All @@ -64,7 +65,7 @@ type Tests(testOutputHelper) =

let arrangeCosmos connect createCategory queryMaxItems =
let context: CosmosStore.CosmosStoreContext = connect log queryMaxItems
ContactPreferences.create (createCategory context |> Decider.resolve log)
ContactPreferences.create (createCategory context |> Decider.forStream log)

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with Unoptimized semantics`` args = async {
Expand Down
21 changes: 11 additions & 10 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,41 @@ open Equinox
open Equinox.CosmosStore.Integration.CosmosFixtures
open Swensen.Unquote

let [<Literal>] Category = Favorites.Category
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin, Favorites.Fold.snapshot

let createMemoryStore () = MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
MemoryStore.MemoryStoreCategory(store, FsCodec.Box.Codec.Create(), fold, initial)
|> Decider.resolve log
MemoryStore.MemoryStoreCategory(store, Category, FsCodec.Box.Codec.Create(), fold, initial)
|> Decider.forStream log
|> Favorites.create

let codec = Favorites.Events.codec
let codecJe = Favorites.Events.codecJe
let createServiceGes log context =
EventStoreDb.EventStoreCategory(context, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot)
|> Decider.resolve log
EventStoreDb.EventStoreCategory(context, Category, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot)
|> Decider.forStream log
|> Favorites.create

let createServiceCosmosSnapshotsUncached log context =
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot)
|> Decider.resolve log
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, CosmosStore.AccessStrategy.Snapshot snapshot, CosmosStore.CachingStrategy.NoCaching)
|> Decider.forStream log
|> Favorites.create

let createServiceCosmosRollingStateUncached log context =
let access = CosmosStore.AccessStrategy.RollingState Favorites.Fold.snapshot
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, access)
|> Decider.resolve log
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, CosmosStore.CachingStrategy.NoCaching)
|> Decider.forStream log
|> Favorites.create

let createServiceCosmosUnoptimizedButCached log context =
let access = CosmosStore.AccessStrategy.Unoptimized
let caching =
let cache = Cache ("name", 10)
CosmosStore.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, caching, access)
|> Decider.resolve log
CosmosStore.CosmosStoreCategory(context, Category, codecJe, fold, initial, access, caching)
|> Decider.forStream log
|> Favorites.create

type Command =
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type Tests(testOutputHelper) =
let log = createLoggerWithMetricsExtraction buffer.Enqueue
let client = connectToLocalEventStoreNode log
let context = createContext client batchSize
let service = Cart.create (CartIntegration.categoryGesStreamWithRollingSnapshots context |> Equinox.Decider.resolve log)
let service = Cart.create (CartIntegration.categoryGesStreamWithRollingSnapshots context |> Equinox.Decider.forStream log)
let itemCount = batchSize / 2 + 1
let cartId = % Guid.NewGuid()
do! act buffer service itemCount ctx cartId skuId "ReadStreamAsyncB-Duration"
Expand All @@ -121,7 +121,7 @@ type Tests(testOutputHelper) =
let buffer = ConcurrentQueue<string>()
let log = createLoggerWithMetricsExtraction buffer.Enqueue
let context = createPrimaryContext log queryMaxItems
let service = Cart.create (CartIntegration.categoryCosmosStreamWithSnapshotStrategy context |> Equinox.Decider.resolve log)
let service = Cart.create (CartIntegration.categoryCosmosStreamWithSnapshotStrategy context |> Equinox.Decider.forStream log)
let itemCount = queryMaxItems / 2 + 1
let cartId = % Guid.NewGuid()
do! act buffer service itemCount ctx cartId skuId "EqxCosmos Tip " // one is a 404, one is a 200
Expand Down
2 changes: 1 addition & 1 deletion samples/TodoBackend/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S
let! state' = handle clientId (Command.Update item)
return List.find (fun x -> x.id = item.id) state' }

let create resolve = Service(streamId >> resolve Category)
let create resolve = Service(streamId >> resolve)
10 changes: 5 additions & 5 deletions samples/Tutorial/AsAt.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ module EventStore =
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
// rig snapshots to be injected as events into the stream every `snapshotWindow` events
let accessStrategy = AccessStrategy.RollingSnapshots (Fold.isValid,Fold.snapshot)
let cat = EventStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.resolve Log.log cat
let cat = EventStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.forStream Log.log cat

module Cosmos =

Expand All @@ -174,10 +174,10 @@ module Cosmos =
let context = CosmosStoreContext(storeClient, tipMaxEvents = 10)
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let accessStrategy = AccessStrategy.Snapshot (Fold.isValid,Fold.snapshot)
let cat = CosmosStoreCategory(context, Events.codecJe, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.resolve Log.log cat
let cat = CosmosStoreCategory(context, Category, Events.codecJe, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.forStream Log.log cat

let service = Service(streamId >> EventStore.resolve Category)
let service = Service(streamId >> EventStore.resolve)
//let service= Service(streamId >> Cosmos.resolve)

let client = "ClientA"
Expand Down
Loading

0 comments on commit ef1ca1e

Please sign in to comment.