diff --git a/CHANGELOG.md b/CHANGELOG.md index fb17fd826..12dfed798 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308) - `Equinox.LoadOption`: Rename `AllowStale` to `AnyCachedValue` [#386](https://github.com/jet/equinox/pull/386) - `Equinox.Decider`: `log` is now supplied via `Equinox.Category` [#337](https://github.com/jet/equinox/pull/337) -- `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337) +- `Equinox.Decider`: Replaced `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337) - `Equinox`: push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337) - `Equinox.Core`: push `FsCodec` dependency out to concrete stores [#337](https://github.com/jet/equinox/pull/337) - `Equinox.Core.AsyncBatchingGate`: renamed to `Batching.Batcher` [#390](https://github.com/jet/equinox/pull/390) diff --git a/Equinox.sln.DotSettings b/Equinox.sln.DotSettings index 89d8f3eeb..5840cf6dc 100644 --- a/Equinox.sln.DotSettings +++ b/Equinox.sln.DotSettings @@ -1,4 +1,15 @@  + True + True + True + True True + True + True + True True - True \ No newline at end of file + True + True + True + True + True \ No newline at end of file diff --git a/README.md b/README.md index 67c5c35f6..7ebc14a02 100644 --- a/README.md +++ b/README.md @@ -873,7 +873,7 @@ Ouch, not looking forward to reading all that logic :frown: ? [Have a read, it's ### Help me understand how the `expectedVersion` is used with EventStoreDB - it seems very confusing :pray: [@dharmaturtle](https://github.com/dharmaturtle) -> I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Decider.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` and `TrySync` that take a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`? +> I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Decider.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` that takes a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`? The bulk of the implementation is in [`Equinox/Decider.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs), see the `let run` function. @@ -881,7 +881,7 @@ There are [sequence diagrams in Documentation MD](https://github.com/jet/equinox - As you suggest, `Transact` is definitely the API you want to be be using - The assumption in Equinox is that you _always_ want to do a version check - if you don't, you can't process idempotently, why incur the cost of an ordered append only store? (there is a lower `Sync` operation which does a blind write to the store in `Equinox.CosmosStore` which allows you to do a non-version-checked write in that context (its implemented and exposed as the stored procedure needs to handle the concept). For EventStoreDB, if you have such a special case, you can use its APIs directly) -- The inner API with the `Sync` and `TrySync` is the 'store interface' which represents the actual processing needed to do a version-checked write (The `Sync` one does not handle retries and is only used for the last attempt, when there are no subsequent retries) +- The inner API with the `Sync` is the 'store interface' which represents the actual processing needed to do a version-checked write (The `Sync` one does not handle retries and is only used for the last attempt, when there are no subsequent retries) - The main reason for the separation is that no ephemeral state is held by Equinox in anything like e.g. Unit Of Work during the course of a `decide` function being invoked - the `(token,state)` tuple represents all the things known at the point of loading, and the `Sync` can use anything it stashed in there when it has proposed events passed to it, as the contract involves the caller resupplying that context. - Another consideration is that its easy to introduce off by one errors when there's an expectedVersion in play, so encapsulating this is no bad thing (in addition to it being something that you don't want to be passing around in your domain logic) diff --git a/diagrams/CosmosCode.puml b/diagrams/CosmosCode.puml index 521d77f74..a7c7c7654 100644 --- a/diagrams/CosmosCode.puml +++ b/diagrams/CosmosCode.puml @@ -87,7 +87,7 @@ Aggregate --> IStream: state loop Optimistic Concurrency Control loop driven by consistency check on etag, attempts times; INPUT: state Decider -> Aggregate: interpret state Aggregate --> Decider: { result = proposedResult\n events proposedEvents } -Decider -> IStream: TrySync token state proposedEvents +Decider -> IStream: Sync token state proposedEvents IStream -> Aggregate: fold state proposedEvents Aggregate --> IStream: state' IStream -> Aggregate: snapshot state' diff --git a/diagrams/CosmosComponent.puml b/diagrams/CosmosComponent.puml index 646a806f8..7226b66ee 100644 --- a/diagrams/CosmosComponent.puml +++ b/diagrams/CosmosComponent.puml @@ -82,7 +82,7 @@ database caches <> [ apps <. caches : reference\neventually\nconsistent apps <-> aggregate : transact\nagainst aggregate <-> stream : Transact/\nQuery -stream -> IStream : Load/\nTrySync +stream -> IStream : Load/\nSync eqxcosmos <-> memorycache : TryGet/\nUpdate eqxcosmos <-> sync : ExecuteStoredProc\n[Azure.Cosmos TCP] eqxcosmos <-> cosmos : ReadItem/\nQueryDocuments\n[Azure.Cosmos TCP] diff --git a/diagrams/CosmosContainer.puml b/diagrams/CosmosContainer.puml index e02ca4fbe..7dac2df49 100644 --- a/diagrams/CosmosContainer.puml +++ b/diagrams/CosmosContainer.puml @@ -93,7 +93,7 @@ apps <-> aggregate : transact\nagainst apps <. caches : reference\neventually\nconsistent aggregate <-L-> stream : Transact/\nQuery -stream -L-> eqxcosmos : Load/\nTrySync +stream -L-> eqxcosmos : Load/\nSync eqxcosmos <-L-> memorycache : TryGet/\nUpdate eqxcosmos <-> sync0 : ExecuteStoredProc\n[Azure.Cosmos TCP] diff --git a/diagrams/EventStore.puml b/diagrams/EventStore.puml index ae743c61e..24bbc9410 100644 --- a/diagrams/EventStore.puml +++ b/diagrams/EventStore.puml @@ -58,7 +58,7 @@ apps <-R-> aggregate : transact\nagainst apps <.L. caches : reference\neventually\nconsistent aggregate <-R-> stream : Transact/\nQuery -stream -R-> IStream : Load/\nTrySync +stream -R-> IStream : Load/\nSync eqxes .U.> IStream : implements eqxes <-R-> memorycache : TryGet/\nUpdate diff --git a/diagrams/MemoryStore.puml b/diagrams/MemoryStore.puml index a75b8f7e4..cdd8dd8a2 100644 --- a/diagrams/MemoryStore.puml +++ b/diagrams/MemoryStore.puml @@ -63,7 +63,7 @@ tests -> eqxms : may validate\nby probing aggregate <-R-> stream : Transact aggregate2 <-R-> stream : Transact/\nQuery -stream -R-> IStream : Load/\nTrySync +stream -R-> IStream : Load/\nSync eqxms -> IStream : implements eqxms .> eqxmsdict : has a\nvolatile diff --git a/samples/Infrastructure/Store.fs b/samples/Infrastructure/Store.fs index ad68edd51..c97e56877 100644 --- a/samples/Infrastructure/Store.fs +++ b/samples/Infrastructure/Store.fs @@ -248,16 +248,16 @@ module EventStore = member _.Retries = a.GetResult(Retries, 1) member _.BatchSize = a.GetResult(BatchSize, 500) - let private connect connectionString credentialsString (operationTimeout, operationRetries) = + let private connect connectionString credentialsString operationTimeout = let cs = match credentialsString with null -> connectionString | x -> String.Join(";", connectionString, x) let tags = ["M", Environment.MachineName; "I", Guid.NewGuid() |> string] - EventStoreConnector(reqTimeout = operationTimeout, reqRetries = operationRetries, tags = tags) + EventStoreConnector(reqTimeout = operationTimeout, tags = tags) .Establish(appName, Discovery.ConnectionString cs, ConnectionStrategy.ClusterTwinPreferSlaveReads) let config (log : ILogger) (cache, unfolds) (p : ParseResults) = let a = Arguments(p) - let timeout, retries as operationThrottling = a.Timeout, a.Retries - log.Information("EventStoreDB {connectionString} {timeout}s retries {retries}", a.ConnectionString, timeout.TotalSeconds, retries) - let connection = connect a.ConnectionString a.Credentials operationThrottling + let timeout = a.Timeout + log.Information("EventStoreDB {connectionString} {timeout}s", a.ConnectionString, timeout.TotalSeconds) + let connection = connect a.ConnectionString a.Credentials timeout let cacheStrategy = cache |> Option.map (fun c -> Equinox.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.)) Context.Es (EventStoreContext(connection, batchSize = a.BatchSize), cacheStrategy, unfolds) diff --git a/samples/Store/Integration/EventStoreIntegration.fs b/samples/Store/Integration/EventStoreIntegration.fs index e495bad98..2c1bfa638 100644 --- a/samples/Store/Integration/EventStoreIntegration.fs +++ b/samples/Store/Integration/EventStoreIntegration.fs @@ -6,7 +6,7 @@ open System // NOTE: use `docker compose up` to establish the standard 3 node config at ports 1113/2113 let connectToLocalEventStoreNode (_log : Serilog.ILogger) = - let c = EventStoreConnector(reqTimeout=TimeSpan.FromSeconds 3., reqRetries=3, tags=["I",Guid.NewGuid() |> string]) + let c = EventStoreConnector(reqTimeout = TimeSpan.FromSeconds 3., tags = ["I",Guid.NewGuid() |> string]) // Connect to the locally running EventStore Node using Gossip-driven discovery c.Establish("Equinox-sample", Discovery.ConnectionString "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false", ConnectionStrategy.ClusterSingle EventStore.Client.NodePreference.Leader) let defaultBatchSize = 500 diff --git a/samples/Tutorial/AsAt.fsx b/samples/Tutorial/AsAt.fsx index e64de7ddd..996d54a59 100644 --- a/samples/Tutorial/AsAt.fsx +++ b/samples/Tutorial/AsAt.fsx @@ -11,7 +11,7 @@ // - the same general point applies to over-using querying of streams for read purposes as we do here; // applying CQRS principles can often lead to a better model regardless of raw necessity -#if LOCAL +#if !LOCAL // Compile Tutorial.fsproj by either a) right-clicking or b) typing // dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter #if VISUALSTUDIO diff --git a/samples/Tutorial/Cosmos.fsx b/samples/Tutorial/Cosmos.fsx index 4401efec1..e4c58b359 100644 --- a/samples/Tutorial/Cosmos.fsx +++ b/samples/Tutorial/Cosmos.fsx @@ -1,4 +1,4 @@ -#if LOCAL +#if !LOCAL // Compile Tutorial.fsproj by either a) right-clicking or b) typing // dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter #if VISUALSTUDIO diff --git a/samples/Tutorial/Favorites.fsx b/samples/Tutorial/Favorites.fsx index 42085b380..b13e89a2c 100644 --- a/samples/Tutorial/Favorites.fsx +++ b/samples/Tutorial/Favorites.fsx @@ -1,4 +1,4 @@ -#if LOCAL +#if !LOCAL // Compile Tutorial.fsproj by either a) right-clicking or b) typing // dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter #I "bin/Debug/net6.0/" @@ -80,10 +80,10 @@ let _removeBAgainEffect = interpret (Remove "b") favesCa b) a maximum number of attempts to make if we clash with a conflicting write *) // Example of wrapping Decider to encapsulate stream access patterns (see DOCUMENTATION.md for reasons why this is not advised in real apps) -type Handler(decider : Equinox.Decider) = - member _.Execute command : Async = +type Handler(decider: Equinox.Decider) = + member _.Execute command: Async = decider.Transact(interpret command) - member _.Read : Async = + member _.Read: Async = decider.Query id (* When we Execute a command, Equinox.Decider will use `fold` and `interpret` to Decide whether Events need to be written @@ -160,12 +160,12 @@ type Service(deciderFor : string -> Handler) = decider.Read (* See Counter.fsx and Cosmos.fsx for a more compact representation which makes the Handler wiring less obtrusive *) -let streamFor (clientId: string) = +let handlerFor (clientId: string) = let streamId = Equinox.StreamId.gen id clientId - let decider = Equinox.Decider.resolve log cat Category streamId + let decider = Equinox.Decider.forStream log cat streamId Handler(decider) -let service = Service(streamFor) +let service = Service(handlerFor) let client = "ClientB" service.Favorite(client, "a") |> Async.RunSynchronously diff --git a/samples/Tutorial/FulfilmentCenter.fsx b/samples/Tutorial/FulfilmentCenter.fsx index f9f1d1ab8..ba4896926 100644 --- a/samples/Tutorial/FulfilmentCenter.fsx +++ b/samples/Tutorial/FulfilmentCenter.fsx @@ -1,4 +1,4 @@ -#if LOCAL +#if !LOCAL #I "bin/Debug/net6.0/" #r "System.Net.Http" #r "System.Runtime.Caching.dll" diff --git a/samples/Tutorial/Gapless.fs b/samples/Tutorial/Gapless.fs index e22b3c085..ddb5529de 100644 --- a/samples/Tutorial/Gapless.fs +++ b/samples/Tutorial/Gapless.fs @@ -54,7 +54,7 @@ let decideConfirm item (state : Fold.State) : Events.Event list = let decideRelease item (state : Fold.State) : Events.Event list = failwith "TODO" -type Service internal (resolve : SequenceId -> Equinox.Decider) = +type Service internal (resolve: SequenceId -> Equinox.Decider) = member _.ReserveMany(series,count) : Async = let decider = resolve series @@ -74,22 +74,22 @@ type Service internal (resolve : SequenceId -> Equinox.Decider] appName = "equinox-tutorial-gapless" +let create cat = Service(streamId >> Equinox.Decider.resolve Serilog.Log.Logger cat Category) + module Cosmos = open Equinox.CosmosStore let private create (context, cache, accessStrategy) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching - let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) - Service(streamId >> Equinox.Decider.resolve Serilog.Log.Logger cat Category) + CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + |> create module Snapshot = let create (context, cache) = - let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin,Fold.snapshot) - create (context, cache, accessStrategy) + create (context, cache, AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot)) module RollingUnfolds = let create (context, cache) = - let accessStrategy = AccessStrategy.RollingState Fold.snapshot - create (context, cache, accessStrategy) + create (context, cache, AccessStrategy.RollingState Fold.snapshot) diff --git a/samples/Tutorial/Set.fs b/samples/Tutorial/Set.fs index 2efc657ae..d69e83d6b 100644 --- a/samples/Tutorial/Set.fs +++ b/samples/Tutorial/Set.fs @@ -38,7 +38,7 @@ let interpret add remove (state : Fold.State) = [ if adds.Length <> 0 then yield Events.Added { items = adds } if removes.Length <> 0 then yield Events.Deleted { items = removes } ] -type Service internal (decider : Equinox.Decider) = +type Service internal (decider: Equinox.Decider) = member _.Add(add : string seq, remove : string seq) : Async = decider.Transact(interpret add remove) diff --git a/samples/Tutorial/Todo.fsx b/samples/Tutorial/Todo.fsx index 0986c06b9..4bff1964e 100644 --- a/samples/Tutorial/Todo.fsx +++ b/samples/Tutorial/Todo.fsx @@ -1,4 +1,4 @@ -#if LOCAL +#if !LOCAL // Compile Tutorial.fsproj by either a) right-clicking or b) typing // dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter #if VISUALSTUDIO diff --git a/src/Equinox.Core/Caching.fs b/src/Equinox.Core/Caching.fs index b5d640919..6cfdf2878 100644 --- a/src/Equinox.Core/Caching.fs +++ b/src/Equinox.Core/Caching.fs @@ -22,10 +22,10 @@ type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'ev | ValueNone -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) | ValueSome (struct (token, state)) -> category.Reload(log, streamName, requireLeader, token, state, ct) return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload, ct) } - member _.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task { + member _.Sync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task { let timestamp = System.Diagnostics.Stopwatch.GetTimestamp() // NB take the timestamp before any potential write takes place let save struct (token, state) = cache.Save(createKey streamName, isStale, createOptions (), timestamp, token, state) - match! category.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) with + match! category.Sync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) with | SyncResult.Written tokenAndState' -> save tokenAndState' return SyncResult.Written tokenAndState' diff --git a/src/Equinox.Core/Category.fs b/src/Equinox.Core/Category.fs index fa7c2dc4c..78cc89c71 100755 --- a/src/Equinox.Core/Category.fs +++ b/src/Equinox.Core/Category.fs @@ -16,10 +16,10 @@ type ICategory<'event, 'state, 'context> = /// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State /// NB the central precondition upon which the sync is predicated is that the stream has not diverged from the `originState` represented by `token` /// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store) - abstract TrySync: log: ILogger * categoryName: string * streamId: string * streamName: string * 'context - * maybeInit: (CancellationToken -> Task) voption - * originToken: StreamToken * originState: 'state * events: 'event[] - * CancellationToken -> Task> + abstract Sync: log: ILogger * categoryName: string * streamId: string * streamName: string * 'context + * maybeInit: (CancellationToken -> Task) voption + * originToken: StreamToken * originState: 'state * events: 'event[] + * CancellationToken -> Task> // Low level stream impl, used by Store-specific Category types that layer policies such as Caching in namespace Equinox @@ -35,21 +35,22 @@ type Category<'event, 'state, 'context> ( resolveInner: string -> string-> struct (Core.ICategory<'event, 'state, 'context> * string * (CancellationToken -> Task) voption), empty: struct (Core.StreamToken * 'state)) = /// Provides access to the low level store operations used for Loading and/or Syncing updates via the Decider - /// (Normal usage is via the adjacent `module Decider` / `DeciderExtensions.Resolve` helpers) + /// (Normal usage is via the adjacent `module Decider` / `Factory.Resolve` helpers) member _.Stream(log: Serilog.ILogger, context: 'context, categoryName, streamId) = let struct (inner, streamName, init) = resolveInner categoryName streamId { new Core.IStream<'event, 'state> with + member _.Name = streamName member _.LoadEmpty() = empty member _.Load(maxAge, requireLeader, ct) = task { use act = source.StartActivity("Load", ActivityKind.Client) if act <> null then act.AddStream(categoryName, streamId, streamName).AddLeader(requireLeader).AddStale(maxAge) |> ignore return! inner.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) } - member _.TrySync(attempt, (token, originState), events, ct) = task { - use act = source.StartActivity("TrySync", ActivityKind.Client) + member _.Sync(attempt, token, originState, events, ct) = task { + use act = source.StartActivity("Sync", ActivityKind.Client) if act <> null then act.AddStream(categoryName, streamId, streamName).AddSyncAttempt(attempt) |> ignore let log = if attempt = 1 then log else log.ForContext("attempts", attempt) - return! inner.TrySync(log, categoryName, streamId, streamName, context, init, token, originState, events, ct) } } + return! inner.Sync(log, categoryName, streamId, streamName, context, init, token, originState, events, ct) } } [] type private Stream private () = diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index a5f097546..4f7e062a8 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1078,7 +1078,7 @@ type internal Category<'event, 'state, 'context> member _.Load(log, _categoryName, _streamId, stream, _maxAge, _requireLeader, ct): Task = task { let! token, events = store.Load(log, (stream, None), (codec.TryDecode, isOrigin), checkUnfolds, ct) return struct (token, fold initial events) } - member _.TrySync(log, _categoryName, _streamId, streamName, ctx, maybeInit, (Token.Unpack pos as streamToken), state, events, ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, maybeInit, (Token.Unpack pos as streamToken), state, events, ct) = task { let state' = fold state events let exp, events, eventsEncoded, projectionsEncoded = let encode e = codec.Encode(ctx, e) @@ -1400,7 +1400,6 @@ type CosmosStoreCategory<'event, 'state, 'context> internal (resolveInner, empty namespace Equinox.CosmosStore.Core open System.Collections.Generic -open System.Threading open System.Threading.Tasks open Equinox.Core open FsCodec diff --git a/src/Equinox.DynamoStore/DynamoStore.fs b/src/Equinox.DynamoStore/DynamoStore.fs index e73bd0cbb..4f40c58c9 100644 --- a/src/Equinox.DynamoStore/DynamoStore.fs +++ b/src/Equinox.DynamoStore/DynamoStore.fs @@ -1126,7 +1126,7 @@ type internal Category<'event, 'state, 'context> interface ICategory<'event, 'state, 'context> with member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, ct) = fetch initial (store.Load(log, (streamName, None), requireLeader, (codec.TryDecode, isOrigin), checkUnfolds, ct)) - member _.TrySync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack pos as streamToken), state, events, ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack pos as streamToken), state, events, ct) = task { let state' = fold state events let exp, events, eventsEncoded, unfoldsEncoded = let encode e = codec.Encode(ctx, e) diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 525e42132..0cf1b464f 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -137,7 +137,7 @@ module private Write = try let! wr = conn.AppendToStreamAsync(streamName, version, events) return EsSyncResult.Written wr with :? EventStore.ClientAPI.Exceptions.WrongExpectedVersionException as ex -> - log.Information(ex, "Ges TrySync WrongExpectedVersionException writing {EventTypes}, actual {ActualVersion}", + log.Information(ex, "Ges Sync WrongExpectedVersionException writing {EventTypes}, actual {ActualVersion}", [| for x in events -> x.Type |], ex.ActualVersion) return EsSyncResult.Conflict (let v = ex.ActualVersion in v.Value) } @@ -194,7 +194,7 @@ module private Read = : IAsyncEnumerable = let rec loop batchCount pos: IAsyncEnumerable = taskSeq { match maxPermittedBatchReads with - | Some mpbr when batchCount >= mpbr -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" + | Some limit when batchCount >= limit -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" | _ -> () let batchLog = log |> Log.prop "batchIndex" batchCount @@ -276,8 +276,8 @@ module private Read = let log = log |> Log.prop "batchSize" batchSize |> Log.prop "stream" streamName let startPosition = int64 StreamPosition.End let direction = Direction.Backward - let readlog = log |> Log.prop "direction" direction - let batchesBackward _ct: IAsyncEnumerable = readBatches readlog retryingLoggingReadSlice maxPermittedBatchReads startPosition + let readLog = log |> Log.prop "direction" direction + let batchesBackward _ct: IAsyncEnumerable = readBatches readLog retryingLoggingReadSlice maxPermittedBatchReads startPosition let! t, (version, events) = (batchesBackward >> mergeFromCompactionPointOrStartFromBackwardsStream log) |> Stopwatch.time CancellationToken.None log |> logBatchRead direction streamName t (Array.map ValueTuple.fst events) batchSize version return version, events } @@ -315,13 +315,13 @@ module Token = create None None streamVersion // headroom before compaction is necessary given the stated knowledge of the last (if known) `compactionEventNumberOption` - let private batchCapacityLimit compactedEventNumberOption unstoredEventsPending (batchSize: int) (streamVersion: int64): int = + let private batchCapacityLimit compactedEventNumberOption eventsPending (batchSize: int) (streamVersion: int64): int = match compactedEventNumberOption with - | Some (compactionEventNumber: int64) -> (batchSize - unstoredEventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 - | None -> (batchSize - unstoredEventsPending) - (int streamVersion + 1) - 1 |> max 0 + | Some (compactionEventNumber: int64) -> (batchSize - eventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 + | None -> (batchSize - eventsPending) - (int streamVersion + 1) - 1 |> max 0 - let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamVersion: StreamToken = - let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion + let (*private*) ofCompactionEventNumber compactedEventNumberOption eventsPending batchSize streamVersion: StreamToken = + let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption eventsPending batchSize streamVersion create compactedEventNumberOption (Some batchCapacityLimit) streamVersion /// Assume we have not seen any compaction events; use the batchSize and version to infer headroom @@ -395,7 +395,7 @@ type EventStoreContext(connection: EventStoreConnection, batchOptions: BatchOpti | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batchOptions.BatchSize version, Array.chooseV tryDecode events | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batchOptions.BatchSize version, Array.chooseV tryDecode events } - member _.TrySync(log, streamName, (Token.Unpack token as streamToken), (events, encodedEvents: EventData[]), isCompactionEventType): Task = task { + member _.Sync(log, streamName, (Token.Unpack token as streamToken), (events, encodedEvents: EventData[]), isCompactionEventType): Task = task { let streamVersion = token.streamVersion match! Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName streamVersion encodedEvents with | EsSyncResult.Written wr -> @@ -459,7 +459,7 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code interface ICategory<'event, 'state, 'context> with member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, _ct) = fetch initial (loadAlgorithm log streamName requireLeader) - member _.TrySync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, _ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, _ct) = task { let events = match access with | None | Some AccessStrategy.LatestKnownEvent -> events @@ -468,7 +468,7 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code if cc.IsCompactionDue then Array.append events (fold state events |> compact |> Array.singleton) else events let encode e = codec.Encode(ctx, e) let encodedEvents: EventData[] = events |> Array.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) - match! context.TrySync(log, streamName, streamToken, (events, encodedEvents), compactionPredicate) with + match! context.Sync(log, streamName, streamToken, (events, encodedEvents), compactionPredicate) with | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state events) | GatewaySyncResult.ConflictUnknown _ -> return SyncResult.Conflict (fun _ct -> reload (log, streamName, true, streamToken, state)) } interface Caching.IReloadable<'state> with member _.Reload(log, sn, leader, token, state, _ct) = reload (log, sn, leader, token, state) diff --git a/src/Equinox.EventStoreDb/EventStoreDb.fs b/src/Equinox.EventStoreDb/EventStoreDb.fs index 3bbc8490f..74cc290af 100644 --- a/src/Equinox.EventStoreDb/EventStoreDb.fs +++ b/src/Equinox.EventStoreDb/EventStoreDb.fs @@ -131,7 +131,7 @@ module private Write = let private writeEventsAsync (log: ILogger) (conn: EventStoreClient) (streamName: string) version (events: EventData[]) ct: Task = task { let! wr = conn.ConditionalAppendToStreamAsync(streamName, StreamRevision.FromInt64 version, events, cancellationToken = ct) if wr.Status = ConditionalWriteStatus.VersionMismatch then - log.Information("Esdb TrySync VersionMismatch writing {EventTypes}, actual {ActualVersion}", + log.Information("Esdb Sync VersionMismatch writing {EventTypes}, actual {ActualVersion}", [| for x in events -> x.Type |], wr.NextExpectedVersion) return EsSyncResult.Conflict wr.NextExpectedVersion elif wr.Status = ConditionalWriteStatus.StreamDeleted then return failwithf "Unexpected write to deleted stream %s" streamName @@ -257,13 +257,13 @@ module Token = create None None streamVersion // headroom before compaction is necessary given the stated knowledge of the last (if known) `compactionEventNumberOption` - let private batchCapacityLimit compactedEventNumberOption unstoredEventsPending (batchSize: int) (streamVersion: int64): int = + let private batchCapacityLimit compactedEventNumberOption eventsPending (batchSize: int) (streamVersion: int64): int = match compactedEventNumberOption with - | Some (compactionEventNumber: int64) -> (batchSize - unstoredEventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 - | None -> (batchSize - unstoredEventsPending) - (int streamVersion + 1) - 1 |> max 0 + | Some (compactionEventNumber: int64) -> (batchSize - eventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 + | None -> (batchSize - eventsPending) - (int streamVersion + 1) - 1 |> max 0 - let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamVersion: StreamToken = - let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion + let (*private*) ofCompactionEventNumber compactedEventNumberOption eventsPending batchSize streamVersion: StreamToken = + let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption eventsPending batchSize streamVersion create compactedEventNumberOption (Some batchCapacityLimit) streamVersion /// Assume we have not seen any compaction events; use the batchSize and version to infer headroom @@ -336,7 +336,7 @@ type EventStoreContext(connection: EventStoreConnection, batchOptions: BatchOpti | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batchOptions.BatchSize version, Array.chooseV tryDecode events | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batchOptions.BatchSize version, Array.chooseV tryDecode events } - member internal _.TrySync(log, streamName, streamToken, events, encodedEvents: EventData[], isCompactionEventType, ct): Task = task { + member internal _.Sync(log, streamName, streamToken, events, encodedEvents: EventData[], isCompactionEventType, ct): Task = task { let streamVersion = let (Token.Unpack token) = streamToken in token.streamVersion let! wr = Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName streamVersion encodedEvents ct match wr with @@ -403,7 +403,7 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code interface ICategory<'event, 'state, 'context> with member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, ct) = fetch initial (loadAlgorithm log streamName requireLeader ct) - member _.TrySync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, ct) = task { let events = match access with | None | Some AccessStrategy.LatestKnownEvent -> events @@ -412,7 +412,7 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code if cc.IsCompactionDue then Array.append events (fold state events |> compact |> Array.singleton) else events let encode e = codec.Encode(ctx, e) let encodedEvents: EventData[] = events |> Array.map (encode >> ClientCodec.eventData) - match! context.TrySync(log, streamName, streamToken, events, encodedEvents, compactionPredicate, ct) with + match! context.Sync(log, streamName, streamToken, events, encodedEvents, compactionPredicate, ct) with | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state events) | GatewaySyncResult.ConflictUnknown _ -> return SyncResult.Conflict (reload (log, streamName, (*requireLeader*)true, streamToken, state)) } @@ -448,7 +448,7 @@ type ConnectionStrategy = | ClusterSingle of NodePreference type EventStoreConnector - ( reqTimeout: TimeSpan, reqRetries: int, + ( reqTimeout: TimeSpan, [] ?readRetryPolicy, [] ?writeRetryPolicy, [] ?tags, [] ?customize: EventStoreClientSettings -> unit) = diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index 619615467..0db1fb767 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -4,7 +4,6 @@ namespace Equinox.MemoryStore open Equinox.Core -open System.Threading.Tasks /// Each Sync that results in an append to the store is notified via the Store's `Committed` event type Commit<'Format> = (struct (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[])) @@ -16,11 +15,11 @@ type VolatileStore<'Format> private (lock) = let streams = System.Collections.Concurrent.ConcurrentDictionary[]>() let seedStream _streamName struct (_expectedCount, events) = events - let updateValue _streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) struct (expectedCount, events) = + let updateValue _streamName (currentValue: FsCodec.ITimelineEvent<'Format>[]) struct (expectedCount, events) = if currentValue.Length <> expectedCount then currentValue // note we don't publish here, as this function can be potentially invoked multiple times where there is a race else Array.append currentValue events - let trySync streamName expectedCount events : struct (bool * FsCodec.ITimelineEvent<'Format>[]) = + let trySync streamName expectedCount events: struct (bool * FsCodec.ITimelineEvent<'Format>[]) = let res = streams.AddOrUpdate(streamName, seedStream, updateValue, (expectedCount, events)) (obj.ReferenceEquals(Array.last res, Array.last events), res) @@ -29,7 +28,7 @@ type VolatileStore<'Format> private (lock) = /// Commits are guaranteed to be notified in correct order at stream level under concurrent Equinox Transact calls. /// NOTE the caller should inspect and/or copy the event efficiently and immediately /// NOTE blocking and/or running reactions synchronously will hamper test performance and/or may result in deadlock - [] member _.Committed : IEvent> = committed.Publish + [] member _.Committed: IEvent> = committed.Publish /// Loads events from a given stream, null if none yet written member _.Load(streamName) = @@ -38,7 +37,7 @@ type VolatileStore<'Format> private (lock) = events /// Attempts a synchronization operation - yields conflicting value if expectedCount does not match - member _.TrySync(streamName, _categoryName, _streamId, expectedCount, events) : struct (bool * FsCodec.ITimelineEvent<'Format>[]) = + member _.TrySync(streamName, _categoryName, _streamId, expectedCount, events): struct (bool * FsCodec.ITimelineEvent<'Format>[]) = // Where attempts overlap on the same stream, there's a race to raise the Committed event for each 'commit' // If we don't serialize the publishing of the events, its possible for handlers to observe the Events out of order // NOTE while a Channels based impl might offer better throughput at load, in practical terms serializing all Committed event notifications @@ -61,32 +60,32 @@ type Token = int /// Internal implementation detail of MemoryStore module private Token = - let private streamTokenOfEventCount (eventCount : int) : StreamToken = + let private streamTokenOfEventCount (eventCount: int): StreamToken = // TOCONSIDER Could implement streamBytes tracking based on a supplied event size function (store is agnostic to format) { value = box eventCount; version = int64 eventCount; streamBytes = -1 } - let (|Unpack|) (token : StreamToken) : int = unbox token.value + let (|Unpack|) (token: StreamToken): int = unbox token.value /// Represent a stream known to be empty let ofEmpty = streamTokenOfEventCount 0 - let ofValue (value : 'event[]) = streamTokenOfEventCount value.Length + let ofValue (value: 'event[]) = streamTokenOfEventCount value.Length /// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!). type private Category<'event, 'state, 'context, 'Format>(store: VolatileStore<'Format>, codec: FsCodec.IEventCodec<'event, 'Format, 'context>, fold, initial) = interface ICategory<'event, 'state, 'context> with - member _.Load(_log, _categoryName, _streamId, streamName, _maxAge, _requireLeader, _ct) = + member _.Load(_log, _categoryName, _streamId, streamName, _maxAge, _requireLeader, _ct) = task { match store.Load(streamName) with - | null -> struct (Token.ofEmpty, initial) |> Task.FromResult - | xs -> struct (Token.ofValue xs, fold initial (Seq.chooseV codec.TryDecode xs)) |> Task.FromResult - member _.TrySync(_log, categoryName, streamId, streamName, context, _init, Token.Unpack eventCount, state, events, _ct) = - let inline map i (e : FsCodec.IEventData<'Format>) = FsCodec.Core.TimelineEvent.Create(int64 i, e) + | null -> return (Token.ofEmpty, initial) + | xs -> return (Token.ofValue xs, fold initial (Seq.chooseV codec.TryDecode xs)) } + member _.Sync(_log, categoryName, streamId, streamName, context, _init, Token.Unpack eventCount, state, events, _ct) = task { + let inline map i (e: FsCodec.IEventData<'Format>) = FsCodec.Core.TimelineEvent.Create(int64 i, e) let encoded = Array.ofSeq events |> Array.mapi (fun i e -> map (eventCount + i) (codec.Encode(context, e))) match store.TrySync(streamName, categoryName, streamId, eventCount, encoded) with | true, streamEvents' -> - SyncResult.Written (Token.ofValue streamEvents', fold state events) |> Task.FromResult + return SyncResult.Written (Token.ofValue streamEvents', fold state events) | false, conflictingEvents -> - let resync _ct = + let resync _ct = task { let token' = Token.ofValue conflictingEvents - struct (token', fold state (conflictingEvents |> Seq.skip eventCount |> Seq.chooseV codec.TryDecode)) |> Task.FromResult - SyncResult.Conflict resync |> Task.FromResult + return struct (token', fold state (conflictingEvents |> Seq.skip eventCount |> Seq.chooseV codec.TryDecode)) } + return SyncResult.Conflict resync } type MemoryStoreCategory<'event, 'state, 'Format, 'context> internal (resolveInner, empty) = inherit Equinox.Category<'event, 'state, 'context>(resolveInner, empty) diff --git a/src/Equinox.MessageDb/MessageDb.fs b/src/Equinox.MessageDb/MessageDb.fs index 7c93cf018..5ebfd85a6 100644 --- a/src/Equinox.MessageDb/MessageDb.fs +++ b/src/Equinox.MessageDb/MessageDb.fs @@ -200,7 +200,7 @@ module Read = let mutable state = originState let rec loop () : Task = task { match maxPermittedBatchReads with - | Some mpbr when batchCount >= mpbr -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" + | Some limit when batchCount >= limit -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" | _ -> () let batchLog = log |> Log.prop "batchIndex" batchCount @@ -389,7 +389,7 @@ type private Category<'event, 'state, 'context>(context: MessageDbContext, codec interface ICategory<'event, 'state, 'context> with member _.Load(log, categoryName, streamId, streamName, _maxAge, requireLeader, ct) = loadAlgorithm log categoryName streamId streamName requireLeader ct - member x.TrySync(log, categoryName, streamId, streamName, ctx, _maybeInit, token, state, events, ct) = task { + member x.Sync(log, categoryName, streamId, streamName, ctx, _maybeInit, token, state, events, ct) = task { let encode e = codec.Encode(ctx, e) let encodedEvents: IEventData[] = events |> Array.map encode match! context.TrySync(log, categoryName, streamId, streamName, token, encodedEvents, ct) with diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 0f30ef69f..3437cd04d 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -142,7 +142,7 @@ module private Write = try let! wr = conn.AppendToStream(StreamId streamName, (if version = -1L then ExpectedVersion.NoStream else int version), events, ct) return EsSyncResult.Written wr with :? WrongExpectedVersionException as ex -> - log.Information(ex, "SqlEs TrySync WrongExpectedVersionException writing {EventTypes}, expected {ExpectedVersion}", + log.Information(ex, "SqlEs Sync WrongExpectedVersionException writing {EventTypes}, expected {ExpectedVersion}", [| for x in events -> x.Type |], version) return EsSyncResult.ConflictUnknown } let eventDataBytes events = @@ -192,7 +192,7 @@ module private Read = : IAsyncEnumerable = let rec loop batchCount pos: IAsyncEnumerable = taskSeq { match maxPermittedBatchReads with - | Some mpbr when batchCount >= mpbr -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" + | Some limit when batchCount >= limit -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" | _ -> () let batchLog = log |> Log.prop "batchIndex" batchCount @@ -267,8 +267,8 @@ module private Read = let log = log |> Log.prop "batchSize" batchSize |> Log.prop "stream" streamName let startPosition = int64 Position.End let direction = Direction.Backward - let readlog = log |> Log.prop "direction" direction - let batchesBackward ct: IAsyncEnumerable = readBatches readlog retryingLoggingReadSlice maxPermittedBatchReads startPosition ct + let readLog = log |> Log.prop "direction" direction + let batchesBackward ct: IAsyncEnumerable = readBatches readLog retryingLoggingReadSlice maxPermittedBatchReads startPosition ct let! t, (version, events) = (batchesBackward >> mergeFromCompactionPointOrStartFromBackwardsStream log) |> Stopwatch.time ct log |> logBatchRead direction streamName t (Array.map ValueTuple.fst events) batchSize version return version, events } @@ -308,12 +308,12 @@ module Token = let ofNonCompacting streamVersion: StreamToken = create None None streamVersion // headroom before compaction is necessary given the stated knowledge of the last (if known) `compactionEventNumberOption` - let private batchCapacityLimit compactedEventNumberOption unstoredEventsPending (batchSize: int) (streamVersion: int64): int = + let private batchCapacityLimit compactedEventNumberOption eventsPending (batchSize: int) (streamVersion: int64): int = match compactedEventNumberOption with - | Some (compactionEventNumber: int64) -> (batchSize - unstoredEventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 - | None -> (batchSize - unstoredEventsPending) - (int streamVersion + 1) - 1 |> max 0 - let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamVersion: StreamToken = - let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion + | Some (compactionEventNumber: int64) -> (batchSize - eventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 + | None -> (batchSize - eventsPending) - (int streamVersion + 1) - 1 |> max 0 + let (*private*) ofCompactionEventNumber compactedEventNumberOption eventsPending batchSize streamVersion: StreamToken = + let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption eventsPending batchSize streamVersion create compactedEventNumberOption (Some batchCapacityLimit) streamVersion /// Assume we have not seen any compaction events; use the batchSize and version to infer headroom let ofUncompactedVersion batchSize streamVersion: StreamToken = @@ -382,7 +382,7 @@ type SqlStreamStoreContext(connection: SqlStreamStoreConnection, batchOptions: B match events |> Array.tryFindBack (fun re -> match tryDecode re with ValueSome e -> isCompactionEvent e | _ -> false) with | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batchOptions.BatchSize version, Array.chooseV tryDecode events | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batchOptions.BatchSize version, Array.chooseV tryDecode events } - member internal _.TrySync(log, streamName, (Token.Unpack pos as streamToken), events, encodedEvents: EventData[], isCompactionEventType, ct): Task = task { + member internal _.Sync(log, streamName, (Token.Unpack pos as streamToken), events, encodedEvents: EventData[], isCompactionEventType, ct): Task = task { match! Write.writeEvents log connection.WriteRetryPolicy connection.WriteConnection streamName pos.streamVersion encodedEvents ct with | EsSyncResult.Written wr -> let version' = wr.CurrentVersion |> int64 @@ -435,7 +435,7 @@ type private Category<'event, 'state, 'context>(context: SqlStreamStoreContext, interface ICategory<'event, 'state, 'context> with member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, ct) = fetch initial (loadAlgorithm log streamName requireLeader ct) - member _.TrySync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, ct) = task { let events = match access with | None | Some AccessStrategy.LatestKnownEvent -> events @@ -444,7 +444,7 @@ type private Category<'event, 'state, 'context>(context: SqlStreamStoreContext, if cc.IsCompactionDue then Array.append events (fold state events |> compact |> Array.singleton) else events let encode e = codec.Encode(ctx, e) let encodedEvents: EventData[] = events |> Array.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) - match! context.TrySync(log, streamName, streamToken, events, encodedEvents, compactionPredicate, ct) with + match! context.Sync(log, streamName, streamToken, events, encodedEvents, compactionPredicate, ct) with | GatewaySyncResult.Written token' -> return SyncResult.Written (token', fold state events) | GatewaySyncResult.ConflictUnknown -> return SyncResult.Conflict (reload (log, streamName, (*requireLeader*)true, streamToken, state)) } interface Caching.IReloadable<'state> with member _.Reload(log, sn, leader, token, state, ct) = reload (log, sn, leader, token, state) ct diff --git a/src/Equinox/Core.fs b/src/Equinox/Core.fs index 5c03b5965..43d9644da 100755 --- a/src/Equinox/Core.fs +++ b/src/Equinox/Core.fs @@ -9,6 +9,9 @@ open System.Threading.Tasks /// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code. type IStream<'event, 'state> = + /// The StreamName, derived from the Name of the Category, and the StreamId supplied to Category.Stream + abstract Name: string + /// Generate a stream token that represents a stream one believes to be empty to use as a Null Object when optimizing out the initial load roundtrip abstract LoadEmpty: unit -> struct (StreamToken * 'state) @@ -18,13 +21,13 @@ type IStream<'event, 'state> = /// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream /// SyncResult.Written: implies the state is now the value represented by the Result's value /// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry - abstract TrySync: attempt: int * originTokenAndState: struct (StreamToken * 'state) * events: 'event[] * CancellationToken -> Task> + abstract Sync: attempt: int * token: StreamToken * state: 'state * events: 'event[] * CancellationToken -> Task> -/// Internal type used to represent the outcome of a TrySync operation +/// Internal type used to represent the outcome of a Sync and [] SyncResult<'state> = /// The write succeeded (the supplied token and state can be used to efficiently continue the processing if, and only if, desired) | Written of struct (StreamToken * 'state) - /// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store + /// The set of changes supplied Sync conflict with the present state of the underlying stream based on the configured policy for that store /// The inner is Async as some stores (and/or states) are such that determining the conflicting state (if, and only if, required) needs an extra trip to obtain | Conflict of (CancellationToken -> Task) @@ -38,12 +41,12 @@ type internal Impl() = (validateResync: int -> unit) (mapResult: Func<'r, struct (StreamToken * 's), 'v>) originTokenAndState ct: Task<'v> = - let rec loop attempt tokenAndState: Task<'v> = task { + let rec loop attempt (struct (token, state) as tokenAndState): Task<'v> = task { let! result, events = decide.Invoke(tokenAndState, ct) match Array.ofSeq events with | [||] -> return mapResult.Invoke(result, tokenAndState) | events -> - match! stream.TrySync(attempt, tokenAndState, events, ct) with + match! stream.Sync(attempt, token, state, events, ct) with | SyncResult.Written tokenAndState' -> return mapResult.Invoke(result, tokenAndState') | SyncResult.Conflict resync -> diff --git a/src/Equinox/Decider.fs b/src/Equinox/Decider.fs index 04bb6d4e8..5f90ef6e9 100755 --- a/src/Equinox/Decider.fs +++ b/src/Equinox/Decider.fs @@ -243,7 +243,7 @@ and internal LoadPolicy() = | Some AssumeEmpty -> fun stream _ct -> Task.FromResult(stream.LoadEmpty()) | Some (FromMemento (streamToken, state)) -> fun _stream _ct -> Task.FromResult(streamToken, state) -(* Retry / Attempts policy used to define policy for resyncing state when there's an Append conflict (default 3 retries) *) +(* Retry / Attempts policy used to define policy for retrying based on the conflicting state when there's an Append conflict (default 3 retries) *) and [] Attempts = | Max of count: int diff --git a/tests/Equinox.Core.Tests/CachingTests.fs b/tests/Equinox.Core.Tests/CachingTests.fs index 6b57e6075..59df423d1 100644 --- a/tests/Equinox.Core.Tests/CachingTests.fs +++ b/tests/Equinox.Core.Tests/CachingTests.fs @@ -33,7 +33,7 @@ type SpyCategory() = do! Task.Delay(x.Delay, ct) return struct (mkToken(), Interlocked.Increment &state) } - member _.TrySync(_log, _cat, _sid, _sn, _ctx, _maybeInit, _originToken, originState, events, _ct) = task { + member _.Sync(_log, _cat, _sid, _sn, _ctx, _maybeInit, _originToken, originState, events, _ct) = task { return Equinox.Core.SyncResult.Written (mkToken(), originState + events.Length) } @@ -51,10 +51,9 @@ let writeOriginState = 99 let expectedWriteState = 99 + 2 // events written let write sn (sut: Equinox.Core.ICategory<_, _, _>) = task { - let! wr = sut.TrySync(Serilog.Log.Logger, null, null, sn, (), ValueNone, Unchecked.defaultof<_>, writeOriginState, Array.replicate 2 (), CancellationToken.None) + let! wr = sut.Sync(Serilog.Log.Logger, null, null, sn, (), ValueNone, Unchecked.defaultof<_>, writeOriginState, Array.replicate 2 (), CancellationToken.None) let wState' = trap <@ match wr with Equinox.Core.SyncResult.Written (_token, state') -> state' | _ -> failwith "unexpected" @> - test <@ expectedWriteState = wState' @> -} + test <@ expectedWriteState = wState' @> } // Pinning the fact that the algorithm is not sensitive to the reuse of the initial value of a cache entry let [] ``AsyncLazy.Empty is a true singleton, does not allocate`` () = diff --git a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs index 824e20bc8..fce4ea82b 100644 --- a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs +++ b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs @@ -95,7 +95,7 @@ module Props = let [] maxTest = 5 #endif type FsCheckAttribute() = - inherit AutoDataAttribute(MaxTest = maxTest, Arbitrary=[|typeof|]) + inherit AutoDataAttribute(MaxTest = maxTest, Arbitrary = [| typeof |]) [] type UnoptimizedTipReadingCorrectness(testOutputHelper) = diff --git a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs index d2c864a13..71fca2baa 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs @@ -143,7 +143,7 @@ type Tests(testOutputHelper) = pos =! res // Demonstrate benefit/mechanism for using the Position-based API to avail of the etag tracking - let stream = ctx.StreamId streamName + let stream = ctx.StreamId streamName let extrasCount = match extras with x when x > 50 -> 5000 | x when x < 1 -> 1 | x -> x*100 let! _pos = Async.call (fun ct -> ctx.NonIdempotentAppend(stream, TestEvents.Create (int pos,extrasCount), ct)) diff --git a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs index 3fd5ebf21..52aaa6f9b 100644 --- a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs @@ -535,7 +535,7 @@ type Tests(testOutputHelper) = test <@ match res with | Choice2Of2 e -> e.Message.StartsWith "Origin event not found; no Archive Container supplied" || e.Message.StartsWith "Origin event not found; no Archive Table supplied" - | x -> failwithf "Unexpected %A" x @> + | x -> failwithf $"Unexpected %A{x}" @> test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> verifyRequestChargesMax 3 // 2.99 diff --git a/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs b/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs index 4ed04e091..67aacb5b5 100644 --- a/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs +++ b/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs @@ -38,7 +38,7 @@ open Equinox.SqlStreamStore open Equinox.SqlStreamStore.MySql let connectToLocalStore (_: ILogger) = - Connector(sprintf "Server=localhost;User=root;Database=EQUINOX_TEST_DB",autoCreate=true).Establish() + Connector("Server=localhost;User=root;Database=EQUINOX_TEST_DB", autoCreate = true).Establish() type Context = SqlStreamStoreContext type Category<'event, 'state, 'context> = SqlStreamStoreCategory<'event, 'state, 'context> @@ -61,7 +61,7 @@ open Equinox.EventStoreDb /// Connect directly to a locally running EventStoreDB Node using gRPC, without using Gossip-driven discovery let connectToLocalStore (_log: ILogger) = async { - let c = EventStoreConnector(reqTimeout=TimeSpan.FromSeconds 3., reqRetries=3, (*, log=Logger.SerilogVerbose log,*) tags=["I",Guid.NewGuid() |> string]) + let c = EventStoreConnector(reqTimeout = TimeSpan.FromSeconds 3., (*, log = Logger.SerilogVerbose log,*) tags = ["I",Guid.NewGuid() |> string]) let conn = c.Establish("Equinox-integration", Discovery.ConnectionString "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false", ConnectionStrategy.ClusterSingle EventStore.Client.NodePreference.Leader) return conn } #endif @@ -98,7 +98,7 @@ module SimplestThing = interface TypeShape.UnionContract.IUnionContract let codec = EventCodec.gen - let evolve (state: Event) (event: Event) = event + let evolve (_state: Event) (event: Event) = event let fold = Seq.fold evolve let initial = StuffHappened let resolve log context = diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 172401e55..3f4757fd2 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -463,13 +463,13 @@ module CosmosStats = let client = sa.Connector.CreateClient() let! t = Equinox.DynamoStore.Core.Initialization.describe client sa.Table match t.BillingModeSummary, t.ProvisionedThroughput, Equinox.DynamoStore.Core.Initialization.tryGetActiveStreamsArn t with - | null, p, sarn when p <> null -> + | null, p, streamsArn when p <> null -> log.Information("DynamoStore Table {table} Provisioned with {read}R/{write}WCU Provisioned capacity; Streams ARN {streaming}", - sa.Table, p.ReadCapacityUnits, p.WriteCapacityUnits, sarn) - | bms, _, sarn when bms.BillingMode = Amazon.DynamoDBv2.BillingMode.PAY_PER_REQUEST -> - log.Information("DynamoStore Table {table} Provisioned with On-Demand capacity management; Streams ARN {streaming}", sa.Table, sarn) - | _, _, sarn -> - log.Information("DynamoStore Table {table} Provisioning Unknown; Streams ARN {streaming}", sa.Table, sarn) } + sa.Table, p.ReadCapacityUnits, p.WriteCapacityUnits, streamsArn) + | bms, _, streamsArn when bms.BillingMode = Amazon.DynamoDBv2.BillingMode.PAY_PER_REQUEST -> + log.Information("DynamoStore Table {table} Provisioned with On-Demand capacity management; Streams ARN {streaming}", sa.Table, streamsArn) + | _, _, streamsArn -> + log.Information("DynamoStore Table {table} Provisioning Unknown; Streams ARN {streaming}", sa.Table, streamsArn) } | x -> Store.missingArg $"unexpected subcommand %A{x}" module Dump =