Skip to content

Commit

Permalink
Tidy; Rename TrySync->Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 25, 2023
1 parent 3b4379f commit 0291636
Show file tree
Hide file tree
Showing 34 changed files with 134 additions and 122 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
- `Equinox.LoadOption`: Rename `AllowStale` to `AnyCachedValue` [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Decider`: `log` is now supplied via `Equinox.Category` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: Replaced `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core`: push `FsCodec` dependency out to concrete stores [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core.AsyncBatchingGate`: renamed to `Batching.Batcher` [#390](https://github.com/jet/equinox/pull/390)
Expand Down
13 changes: 12 additions & 1 deletion Equinox.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=Autoscale/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=backoffs/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=emptor/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=esdb/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=etags/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=kvps/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=nfolds/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Nullary/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=resync/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Snapshotted/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Resyncs/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=roundtripping/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Serdes/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Snapshotted/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Uncompacted/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -873,15 +873,15 @@ Ouch, not looking forward to reading all that logic :frown: ? [Have a read, it's
<a name="how-is-expectedVersion-managed"/></a>
### Help me understand how the `expectedVersion` is used with EventStoreDB - it seems very confusing :pray: [@dharmaturtle](https://github.com/dharmaturtle)
> I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Decider.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` and `TrySync` that take a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`?
> I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Decider.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` that takes a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`?
The bulk of the implementation is in [`Equinox/Decider.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs), see the `let run` function.
There are [sequence diagrams in Documentation MD](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#code-diagrams-for-equinoxeventstore--equinoxsqlstreamstore) but I'll summarize here:
- As you suggest, `Transact` is definitely the API you want to be be using
- The assumption in Equinox is that you _always_ want to do a version check - if you don't, you can't process idempotently, why incur the cost of an ordered append only store? (there is a lower `Sync` operation which does a blind write to the store in `Equinox.CosmosStore` which allows you to do a non-version-checked write in that context (its implemented and exposed as the stored procedure needs to handle the concept). For EventStoreDB, if you have such a special case, you can use its APIs directly)
- The inner API with the `Sync` and `TrySync` is the 'store interface' which represents the actual processing needed to do a version-checked write (The `Sync` one does not handle retries and is only used for the last attempt, when there are no subsequent retries)
- The inner API with the `Sync` is the 'store interface' which represents the actual processing needed to do a version-checked write (The `Sync` one does not handle retries and is only used for the last attempt, when there are no subsequent retries)
- The main reason for the separation is that no ephemeral state is held by Equinox in anything like e.g. Unit Of Work during the course of a `decide` function being invoked - the `(token,state)` tuple represents all the things known at the point of loading, and the `Sync` can use anything it stashed in there when it has proposed events passed to it, as the contract involves the caller resupplying that context.
- Another consideration is that its easy to introduce off by one errors when there's an expectedVersion in play, so encapsulating this is no bad thing (in addition to it being something that you don't want to be passing around in your domain logic)
Expand Down
2 changes: 1 addition & 1 deletion diagrams/CosmosCode.puml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Aggregate --> IStream: state
loop Optimistic Concurrency Control loop driven by consistency check on etag, attempts times; INPUT: state
Decider -> Aggregate: interpret state
Aggregate --> Decider: { result = proposedResult\n events proposedEvents }
Decider -> IStream: TrySync token state proposedEvents
Decider -> IStream: Sync token state proposedEvents
IStream -> Aggregate: fold state proposedEvents
Aggregate --> IStream: state'
IStream -> Aggregate: snapshot state'
Expand Down
2 changes: 1 addition & 1 deletion diagrams/CosmosComponent.puml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ database caches <<External>> [
apps <. caches : reference\neventually\nconsistent
apps <-> aggregate : transact\nagainst
aggregate <-> stream : Transact/\nQuery
stream -> IStream : Load/\nTrySync
stream -> IStream : Load/\nSync
eqxcosmos <-> memorycache : TryGet/\nUpdate
eqxcosmos <-> sync : ExecuteStoredProc\n[Azure.Cosmos TCP]
eqxcosmos <-> cosmos : ReadItem/\nQueryDocuments\n[Azure.Cosmos TCP]
Expand Down
2 changes: 1 addition & 1 deletion diagrams/CosmosContainer.puml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ apps <-> aggregate : transact\nagainst
apps <. caches : reference\neventually\nconsistent

aggregate <-L-> stream : Transact/\nQuery
stream -L-> eqxcosmos : Load/\nTrySync
stream -L-> eqxcosmos : Load/\nSync

eqxcosmos <-L-> memorycache : TryGet/\nUpdate
eqxcosmos <-> sync0 : ExecuteStoredProc\n[Azure.Cosmos TCP]
Expand Down
2 changes: 1 addition & 1 deletion diagrams/EventStore.puml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ apps <-R-> aggregate : transact\nagainst
apps <.L. caches : reference\neventually\nconsistent

aggregate <-R-> stream : Transact/\nQuery
stream -R-> IStream : Load/\nTrySync
stream -R-> IStream : Load/\nSync

eqxes .U.> IStream : implements
eqxes <-R-> memorycache : TryGet/\nUpdate
Expand Down
2 changes: 1 addition & 1 deletion diagrams/MemoryStore.puml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ tests -> eqxms : may validate\nby probing

aggregate <-R-> stream : Transact
aggregate2 <-R-> stream : Transact/\nQuery
stream -R-> IStream : Load/\nTrySync
stream -R-> IStream : Load/\nSync

eqxms -> IStream : implements
eqxms .> eqxmsdict : has a\nvolatile
Expand Down
10 changes: 5 additions & 5 deletions samples/Infrastructure/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,16 @@ module EventStore =
member _.Retries = a.GetResult(Retries, 1)
member _.BatchSize = a.GetResult(BatchSize, 500)

let private connect connectionString credentialsString (operationTimeout, operationRetries) =
let private connect connectionString credentialsString operationTimeout =
let cs = match credentialsString with null -> connectionString | x -> String.Join(";", connectionString, x)
let tags = ["M", Environment.MachineName; "I", Guid.NewGuid() |> string]
EventStoreConnector(reqTimeout = operationTimeout, reqRetries = operationRetries, tags = tags)
EventStoreConnector(reqTimeout = operationTimeout, tags = tags)
.Establish(appName, Discovery.ConnectionString cs, ConnectionStrategy.ClusterTwinPreferSlaveReads)
let config (log : ILogger) (cache, unfolds) (p : ParseResults<Parameters>) =
let a = Arguments(p)
let timeout, retries as operationThrottling = a.Timeout, a.Retries
log.Information("EventStoreDB {connectionString} {timeout}s retries {retries}", a.ConnectionString, timeout.TotalSeconds, retries)
let connection = connect a.ConnectionString a.Credentials operationThrottling
let timeout = a.Timeout
log.Information("EventStoreDB {connectionString} {timeout}s", a.ConnectionString, timeout.TotalSeconds)
let connection = connect a.ConnectionString a.Credentials timeout
let cacheStrategy = cache |> Option.map (fun c -> Equinox.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.))
Context.Es (EventStoreContext(connection, batchSize = a.BatchSize), cacheStrategy, unfolds)

Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Integration/EventStoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ open System

// NOTE: use `docker compose up` to establish the standard 3 node config at ports 1113/2113
let connectToLocalEventStoreNode (_log : Serilog.ILogger) =
let c = EventStoreConnector(reqTimeout=TimeSpan.FromSeconds 3., reqRetries=3, tags=["I",Guid.NewGuid() |> string])
let c = EventStoreConnector(reqTimeout = TimeSpan.FromSeconds 3., tags = ["I",Guid.NewGuid() |> string])
// Connect to the locally running EventStore Node using Gossip-driven discovery
c.Establish("Equinox-sample", Discovery.ConnectionString "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false", ConnectionStrategy.ClusterSingle EventStore.Client.NodePreference.Leader)
let defaultBatchSize = 500
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/AsAt.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// - the same general point applies to over-using querying of streams for read purposes as we do here;
// applying CQRS principles can often lead to a better model regardless of raw necessity

#if LOCAL
#if !LOCAL
// Compile Tutorial.fsproj by either a) right-clicking or b) typing
// dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter
#if VISUALSTUDIO
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Cosmos.fsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if LOCAL
#if !LOCAL
// Compile Tutorial.fsproj by either a) right-clicking or b) typing
// dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter
#if VISUALSTUDIO
Expand Down
14 changes: 7 additions & 7 deletions samples/Tutorial/Favorites.fsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if LOCAL
#if !LOCAL
// Compile Tutorial.fsproj by either a) right-clicking or b) typing
// dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter
#I "bin/Debug/net6.0/"
Expand Down Expand Up @@ -80,10 +80,10 @@ let _removeBAgainEffect = interpret (Remove "b") favesCa
b) a maximum number of attempts to make if we clash with a conflicting write *)

// Example of wrapping Decider to encapsulate stream access patterns (see DOCUMENTATION.md for reasons why this is not advised in real apps)
type Handler(decider : Equinox.Decider<Event, State>) =
member _.Execute command : Async<unit> =
type Handler(decider: Equinox.Decider<Event, State>) =
member _.Execute command: Async<unit> =
decider.Transact(interpret command)
member _.Read : Async<string list> =
member _.Read: Async<string list> =
decider.Query id

(* When we Execute a command, Equinox.Decider will use `fold` and `interpret` to Decide whether Events need to be written
Expand Down Expand Up @@ -160,12 +160,12 @@ type Service(deciderFor : string -> Handler) =
decider.Read

(* See Counter.fsx and Cosmos.fsx for a more compact representation which makes the Handler wiring less obtrusive *)
let streamFor (clientId: string) =
let handlerFor (clientId: string) =
let streamId = Equinox.StreamId.gen id clientId
let decider = Equinox.Decider.resolve log cat Category streamId
let decider = Equinox.Decider.forStream log cat streamId
Handler(decider)

let service = Service(streamFor)
let service = Service(handlerFor)

let client = "ClientB"
service.Favorite(client, "a") |> Async.RunSynchronously
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/FulfilmentCenter.fsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if LOCAL
#if !LOCAL
#I "bin/Debug/net6.0/"
#r "System.Net.Http"
#r "System.Runtime.Caching.dll"
Expand Down
14 changes: 7 additions & 7 deletions samples/Tutorial/Gapless.fs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ let decideConfirm item (state : Fold.State) : Events.Event list =
let decideRelease item (state : Fold.State) : Events.Event list =
failwith "TODO"

type Service internal (resolve : SequenceId -> Equinox.Decider<Events.Event, Fold.State>) =
type Service internal (resolve: SequenceId -> Equinox.Decider<Events.Event, Fold.State>) =

member _.ReserveMany(series,count) : Async<int64 list> =
let decider = resolve series
Expand All @@ -74,22 +74,22 @@ type Service internal (resolve : SequenceId -> Equinox.Decider<Events.Event, Fol

let [<Literal>] appName = "equinox-tutorial-gapless"

let create cat = Service(streamId >> Equinox.Decider.resolve Serilog.Log.Logger cat Category)

module Cosmos =

open Equinox.CosmosStore
let private create (context, cache, accessStrategy) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
Service(streamId >> Equinox.Decider.resolve Serilog.Log.Logger cat Category)
CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
|> create

module Snapshot =

let create (context, cache) =
let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin,Fold.snapshot)
create (context, cache, accessStrategy)
create (context, cache, AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot))

module RollingUnfolds =

let create (context, cache) =
let accessStrategy = AccessStrategy.RollingState Fold.snapshot
create (context, cache, accessStrategy)
create (context, cache, AccessStrategy.RollingState Fold.snapshot)
2 changes: 1 addition & 1 deletion samples/Tutorial/Set.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ let interpret add remove (state : Fold.State) =
[ if adds.Length <> 0 then yield Events.Added { items = adds }
if removes.Length <> 0 then yield Events.Deleted { items = removes } ]

type Service internal (decider : Equinox.Decider<Events.Event, Fold.State>) =
type Service internal (decider: Equinox.Decider<Events.Event, Fold.State>) =

member _.Add(add : string seq, remove : string seq) : Async<int*int> =
decider.Transact(interpret add remove)
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Todo.fsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if LOCAL
#if !LOCAL
// Compile Tutorial.fsproj by either a) right-clicking or b) typing
// dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter
#if VISUALSTUDIO
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.Core/Caching.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'ev
| ValueNone -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct)
| ValueSome (struct (token, state)) -> category.Reload(log, streamName, requireLeader, token, state, ct)
return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload, ct) }
member _.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task {
member _.Sync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task {
let timestamp = System.Diagnostics.Stopwatch.GetTimestamp() // NB take the timestamp before any potential write takes place
let save struct (token, state) = cache.Save(createKey streamName, isStale, createOptions (), timestamp, token, state)
match! category.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) with
match! category.Sync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) with
| SyncResult.Written tokenAndState' ->
save tokenAndState'
return SyncResult.Written tokenAndState'
Expand Down
Loading

0 comments on commit 0291636

Please sign in to comment.