From 5fac967e69b00ee95cbe1c48d95fdc9dfd616b8a Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 26 Jul 2023 16:13:40 +0100 Subject: [PATCH] refactor(CosmosStore): Cleanup stored proc initialization (#414) --- src/Equinox.Core/Caching.fs | 4 +-- src/Equinox.Core/Category.fs | 7 ++-- src/Equinox.CosmosStore/CosmosStore.fs | 38 ++++++++++---------- src/Equinox.DynamoStore/DynamoStore.fs | 8 ++--- src/Equinox.EventStore/EventStore.fs | 2 +- src/Equinox.EventStoreDb/EventStoreDb.fs | 2 +- src/Equinox.MemoryStore/MemoryStore.fs | 2 +- src/Equinox.MessageDb/MessageDb.fs | 2 +- src/Equinox.SqlStreamStore/SqlStreamStore.fs | 2 +- tests/Equinox.Core.Tests/CachingTests.fs | 4 +-- 10 files changed, 35 insertions(+), 36 deletions(-) diff --git a/src/Equinox.Core/Caching.fs b/src/Equinox.Core/Caching.fs index 33997a31a..43a436f87 100644 --- a/src/Equinox.Core/Caching.fs +++ b/src/Equinox.Core/Caching.fs @@ -20,10 +20,10 @@ type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'ev | ValueNone -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) | ValueSome (struct (token, state)) -> category.Reload(log, streamName, requireLeader, token, state, ct) return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload, ct) } - member _.Sync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task { + member _.Sync(log, categoryName, streamId, streamName, context, streamToken, state, events, ct) = task { let timestamp = System.Diagnostics.Stopwatch.GetTimestamp() // NB take the timestamp before any potential write takes place let save struct (token, state) = cache.Save(createKey streamName, isStale, createOptions (), timestamp, token, state) - match! category.Sync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) with + match! category.Sync(log, categoryName, streamId, streamName, context, streamToken, state, events, ct) with | SyncResult.Written tokenAndState' -> save tokenAndState' return SyncResult.Written tokenAndState' diff --git a/src/Equinox.Core/Category.fs b/src/Equinox.Core/Category.fs index 0be19485d..1655176cb 100755 --- a/src/Equinox.Core/Category.fs +++ b/src/Equinox.Core/Category.fs @@ -17,7 +17,6 @@ type ICategory<'event, 'state, 'context> = /// NB the central precondition upon which the sync is predicated is that the stream has not diverged from the `originState` represented by `token` /// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store) abstract Sync: log: ILogger * categoryName: string * streamId: string * streamName: string * 'context - * maybeInit: (CancellationToken -> Task) voption * originToken: StreamToken * originState: 'state * events: 'event[] * CancellationToken -> Task> @@ -31,12 +30,12 @@ open Equinox.Core.Tracing type Category<'event, 'state, 'context>(categoryName, resolveStream) = /// Stores without custom routing for categoryName/streamId to Table/Container etc use this default impl - new(categoryName, inner) = Category(categoryName, fun streamId -> struct (inner, Core.StreamName.render categoryName streamId, ValueNone)) + new(categoryName, inner) = Category(categoryName, fun streamId -> struct (inner, Core.StreamName.render categoryName streamId)) /// Provides access to the low level store operations used for Loading and/or Syncing updates via the Decider /// (Normal usage is via the adjacent `module Decider` / `Stream.Resolve` helpers) member _.Stream(log: Serilog.ILogger, context: 'context, streamId: string) = - let struct (inner: Core.ICategory<'event, 'state, 'context>, streamName, init) = resolveStream streamId + let struct (inner: Core.ICategory<'event, 'state, 'context>, streamName) = resolveStream streamId { new Core.IStream<'event, 'state> with member _.Name = streamName member _.LoadEmpty() = inner.Empty @@ -48,7 +47,7 @@ type Category<'event, 'state, 'context>(categoryName, resolveStream) = use act = source.StartActivity("Sync", System.Diagnostics.ActivityKind.Client) if act <> null then act.AddStream(categoryName, streamId, streamName).AddSyncAttempt(attempt) |> ignore let log = if attempt = 1 then log else log.ForContext("attempts", attempt) - return! inner.Sync(log, categoryName, streamId, streamName, context, init, token, originState, events, ct) } } + return! inner.Sync(log, categoryName, streamId, streamName, context, token, originState, events, ct) } } [] type Stream private () = diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index b68b5683f..d6037b6f5 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -578,10 +578,12 @@ module Initialization = /// Holds Container state, coordinating initialization activities type internal ContainerInitializerGuard(container: Container, fallback: Container option, ?initContainer: Container -> CancellationToken -> Task) = let initGuard = initContainer |> Option.map (fun init -> AsyncCacheCell(init container)) - member _.Container = container member _.Fallback = fallback - member internal _.InitializationGate = match initGuard with Some g when not (g.IsValid()) -> ValueSome g.Await | _ -> ValueNone + member internal _.Initialize(ct): System.Threading.Tasks.ValueTask = + match initGuard with + | Some g when not (g.IsValid()) -> g.Await(ct) |> ValueTask.ofTask |> ValueTask.ignore + | _ -> System.Threading.Tasks.ValueTask.CompletedTask module internal Tip = @@ -1064,8 +1066,8 @@ type StoreClient(container: Container, archive: Container option, query: QueryOp Prune.until log (container, stream) query.MaxItems index ct type internal Category<'event, 'state, 'context> - ( store: StoreClient, codec: IEventCodec<'event, EventBody, 'context>, - fold: 'state -> 'event[] -> 'state, initial: 'state, isOrigin: 'event -> bool, + ( store: StoreClient, createStoredProcIfNotExistsExactlyOnce: CancellationToken -> System.Threading.Tasks.ValueTask, + codec: IEventCodec<'event, EventBody, 'context>, fold: 'state -> 'event[] -> 'state, initial: 'state, isOrigin: 'event -> bool, checkUnfolds, compressUnfolds, mapUnfolds: Choice 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) = let reload (log, streamName, (Token.Unpack pos as streamToken), state) preloaded ct: Task = task { @@ -1077,7 +1079,7 @@ type internal Category<'event, 'state, 'context> member _.Load(log, _categoryName, _streamId, stream, _maxAge, _requireLeader, ct): Task = task { let! token, events = store.Load(log, (stream, None), (codec.TryDecode, isOrigin), checkUnfolds, ct) return struct (token, fold initial events) } - member _.Sync(log, _categoryName, _streamId, streamName, ctx, maybeInit, (Token.Unpack pos as streamToken), state, events, ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack pos as streamToken), state, events, ct) = task { let state' = fold state events let exp, events, eventsEncoded, projectionsEncoded = let encode e = codec.Encode(ctx, e) @@ -1090,7 +1092,7 @@ type internal Category<'event, 'state, 'context> let renderElement = if compressUnfolds then JsonElement.undefinedToNull >> JsonElement.deflate else JsonElement.undefinedToNull let projections = projectionsEncoded |> Seq.map (Sync.mkUnfold renderElement baseIndex) let batch = Sync.mkBatch streamName eventsEncoded projections - match maybeInit with ValueNone -> () | ValueSome i -> do! i ct + do! createStoredProcIfNotExistsExactlyOnce ct match! store.Sync(log, streamName, exp, batch, ct) with | InternalSyncResult.Written token' -> return SyncResult.Written (token', state') | InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (reload (log, streamName, streamToken, state) None) @@ -1303,10 +1305,10 @@ type CosmosStoreContext(storeClient: CosmosStoreClient, tipOptions, queryOptions member val StoreClient = storeClient member val QueryOptions = queryOptions member val TipOptions = tipOptions - member internal x.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId) = + member internal x.ResolveStoreClientAndStreamNameAndInit(categoryName, streamId) = let struct (cg, streamName) = storeClient.ResolveContainerGuardAndStreamName(categoryName, streamId) let store = StoreClient(cg.Container, cg.Fallback, x.QueryOptions, x.TipOptions) - struct (store, streamName, cg.InitializationGate) + struct (store, streamName, cg.Initialize) /// For CosmosDB, caching is typically a central aspect of managing RU consumption to maintain performance and capacity. /// The cache holds the Tip document's etag, which enables use of etag-contingent Reads (which cost only 1RU in the case where the document is unchanged) @@ -1385,14 +1387,14 @@ type CosmosStoreCategory<'event, 'state, 'context> internal (name, resolveStream | CachingStrategy.SlidingWindow (cache, window) -> Some (Equinox.CachingStrategy.SlidingWindow (cache, window)) | CachingStrategy.FixedTimeSpan (cache, period) -> Some (Equinox.CachingStrategy.FixedTimeSpan (cache, period)) let categories = System.Collections.Concurrent.ConcurrentDictionary>() - let resolveInner (categoryName, container) = + let resolveInner struct (container, categoryName, init) = let createCategory _name: ICategory<_, _, 'context> = - Category<'event, 'state, 'context>(container, codec, fold, initial, isOrigin, checkUnfolds, compressUnfolds, mapUnfolds) + Category<'event, 'state, 'context>(container, init, codec, fold, initial, isOrigin, checkUnfolds, compressUnfolds, mapUnfolds) |> Caching.apply Token.isStale caching categories.GetOrAdd(categoryName, createCategory) let resolveStream streamId = - let struct (container, streamName, maybeContainerInitializationGate) = context.ResolveContainerClientAndStreamIdAndInit(name, streamId) - struct (resolveInner (name, container), streamName, maybeContainerInitializationGate) + let struct (_, streamName, _) as args = context.ResolveStoreClientAndStreamNameAndInit(name, streamId) + struct (resolveInner args, streamName) CosmosStoreCategory(name, resolveStream) namespace Equinox.CosmosStore.Core @@ -1433,11 +1435,11 @@ type EventsContext internal | Direction.Backward -> None, startPos new (context: Equinox.CosmosStore.CosmosStoreContext, log) = - let struct (store, _streamId, _init) = context.ResolveContainerClientAndStreamIdAndInit(null, null) + let struct (store, _streamId, _init) = context.ResolveStoreClientAndStreamNameAndInit(null, null) EventsContext(context, store, log) member _.ResolveStream(streamName) = - let struct (_cc, streamName, init) = context.ResolveContainerClientAndStreamIdAndInit(null, streamName) + let struct (_cc, streamName, init) = context.ResolveStoreClientAndStreamNameAndInit(null, streamName) struct (streamName, init) member x.StreamId(streamName): string = x.ResolveStream streamName |> ValueTuple.fst @@ -1480,11 +1482,9 @@ type EventsContext internal /// Callers should implement appropriate idempotent handling, or use Equinox.Decider for that purpose member x.Sync(stream, position, events: IEventData<_>[], ct): Task> = task { // Writes go through the stored proc, which we need to provision per container - // Having to do this here in this way is far from ideal, but work on caching, external snapshots and caching is likely - // to move this about before we reach a final destination in any case - match x.ResolveStream stream |> ValueTuple.snd with - | ValueNone -> () - | ValueSome init -> do! init ct + // The way this is routed is definitely hacky, but the entire existence of this API is pretty questionable, so ugliness is apppropiate + let struct (_, createStoredProcIfNotExistsExactlyOnce) = x.ResolveStream(stream) + do! createStoredProcIfNotExistsExactlyOnce ct let batch = Sync.mkBatch stream events Seq.empty match! store.Sync(log, stream, SyncExp.Version position.index, batch, ct) with | InternalSyncResult.Written (Token.Unpack pos) -> return AppendResult.Ok pos diff --git a/src/Equinox.DynamoStore/DynamoStore.fs b/src/Equinox.DynamoStore/DynamoStore.fs index 00ad7b791..979e76458 100644 --- a/src/Equinox.DynamoStore/DynamoStore.fs +++ b/src/Equinox.DynamoStore/DynamoStore.fs @@ -1113,8 +1113,8 @@ type internal StoreClient(container: Container, fallback: Container option, quer Prune.until log (container, stream) query.MaxItems index ct type internal Category<'event, 'state, 'context> - ( store: StoreClient, codec: IEventCodec<'event, EncodedBody, 'context>, - fold: 'state -> 'event[] -> 'state, initial: 'state, isOrigin: 'event -> bool, + ( store: StoreClient, + codec: IEventCodec<'event, EncodedBody, 'context>, fold: 'state -> 'event[] -> 'state, initial: 'state, isOrigin: 'event -> bool, checkUnfolds, mapUnfolds: Choice 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) = let fetch state f = task { let! token', events = f in return struct (token', fold state events) } let reload (log, streamNam, requireLeader, (Token.Unpack pos as streamToken), state) ct: Task = task { @@ -1125,7 +1125,7 @@ type internal Category<'event, 'state, 'context> member _.Empty = Token.empty, initial member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, ct) = fetch initial (store.Load(log, (streamName, None), requireLeader, (codec.TryDecode, isOrigin), checkUnfolds, ct)) - member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack pos as streamToken), state, events, ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack pos as streamToken), state, events, ct) = task { let state' = fold state events let exp, events, eventsEncoded, unfoldsEncoded = let encode e = codec.Encode(ctx, e) @@ -1340,7 +1340,7 @@ type DynamoStoreCategory<'event, 'state, 'context>(name, resolveStream) = categories.GetOrAdd(categoryName, createCategory) let resolveStream streamId = let struct (container, streamName) = context.ResolveContainerClientAndStreamName(name, streamId) - struct (resolveInner (name, container), streamName, ValueNone) + struct (resolveInner (name, container), streamName) DynamoStoreCategory(name, resolveStream) module Exceptions = diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index d57c1ac4b..7915f2583 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -458,7 +458,7 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code member _.Empty = context.TokenEmpty, initial member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, _ct) = fetch initial (loadAlgorithm log streamName requireLeader) - member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, _ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack token as streamToken), state, events, _ct) = task { let events = match access with | None | Some AccessStrategy.LatestKnownEvent -> events diff --git a/src/Equinox.EventStoreDb/EventStoreDb.fs b/src/Equinox.EventStoreDb/EventStoreDb.fs index 23304054b..df02e1d3d 100644 --- a/src/Equinox.EventStoreDb/EventStoreDb.fs +++ b/src/Equinox.EventStoreDb/EventStoreDb.fs @@ -402,7 +402,7 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code member _.Empty = context.TokenEmpty, initial member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, ct) = fetch initial (loadAlgorithm log streamName requireLeader ct) - member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack token as streamToken), state, events, ct) = task { let events = match access with | None | Some AccessStrategy.LatestKnownEvent -> events diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index af1cf744b..9416a8a8f 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -69,7 +69,7 @@ type private Category<'event, 'state, 'context, 'Format>(store: VolatileStore<'F match store.Load(streamName) with | null -> return (Token.ofEmpty, initial) | xs -> return (Token.ofValue xs, fold initial (Array.chooseV codec.TryDecode xs)) } - member _.Sync(_log, categoryName, streamId, streamName, context, _init, Token.Unpack eventCount, state, events, _ct) = task { + member _.Sync(_log, categoryName, streamId, streamName, context, Token.Unpack eventCount, state, events, _ct) = task { let inline map i (e: FsCodec.IEventData<'Format>) = FsCodec.Core.TimelineEvent.Create(int64 i, e) let encoded = Array.ofSeq events |> Array.mapi (fun i e -> map (eventCount + i) (codec.Encode(context, e))) match store.TrySync(streamName, categoryName, streamId, eventCount, encoded) with diff --git a/src/Equinox.MessageDb/MessageDb.fs b/src/Equinox.MessageDb/MessageDb.fs index 89aadd1ea..fc89429b8 100644 --- a/src/Equinox.MessageDb/MessageDb.fs +++ b/src/Equinox.MessageDb/MessageDb.fs @@ -388,7 +388,7 @@ type private Category<'event, 'state, 'context>(context: MessageDbContext, codec member _.Empty = context.TokenEmpty, initial member _.Load(log, categoryName, streamId, streamName, _maxAge, requireLeader, ct) = loadAlgorithm log categoryName streamId streamName requireLeader ct - member x.Sync(log, categoryName, streamId, streamName, ctx, _maybeInit, token, state, events, ct) = task { + member x.Sync(log, categoryName, streamId, streamName, ctx, token, state, events, ct) = task { let encode e = codec.Encode(ctx, e) let encodedEvents: IEventData[] = events |> Array.map encode match! context.TrySync(log, categoryName, streamId, streamName, token, encodedEvents, ct) with diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 6e431ad3a..2af6d2363 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -434,7 +434,7 @@ type private Category<'event, 'state, 'context>(context: SqlStreamStoreContext, member _.Empty = context.TokenEmpty, initial member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, ct) = fetch initial (loadAlgorithm log streamName requireLeader ct) - member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, ct) = task { + member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack token as streamToken), state, events, ct) = task { let events = match access with | None | Some AccessStrategy.LatestKnownEvent -> events diff --git a/tests/Equinox.Core.Tests/CachingTests.fs b/tests/Equinox.Core.Tests/CachingTests.fs index 9c56602e9..aa3d7d577 100644 --- a/tests/Equinox.Core.Tests/CachingTests.fs +++ b/tests/Equinox.Core.Tests/CachingTests.fs @@ -33,7 +33,7 @@ type SpyCategory() = Interlocked.Increment &loads |> ignore do! Task.Delay(x.Delay, ct) return struct (mkToken(), Interlocked.Increment &state) } - member _.Sync(_log, _cat, _sid, _sn, _ctx, _maybeInit, _originToken, originState, events, _ct) = task { + member _.Sync(_log, _cat, _sid, _sn, _ctx, _originToken, originState, events, _ct) = task { return Equinox.Core.SyncResult.Written (mkToken(), originState + events.Length) } interface Equinox.Core.Caching.IReloadable with @@ -49,7 +49,7 @@ let writeOriginState = 99 let expectedWriteState = 99 + 2 // events written let write sn (sut: Equinox.Core.ICategory<_, _, _>) = task { - let! wr = sut.Sync(Serilog.Log.Logger, null, null, sn, (), ValueNone, Unchecked.defaultof<_>, writeOriginState, Array.replicate 2 (), CancellationToken.None) + let! wr = sut.Sync(Serilog.Log.Logger, null, null, sn, (), Unchecked.defaultof<_>, writeOriginState, Array.replicate 2 (), CancellationToken.None) let wState' = trap <@ match wr with Equinox.Core.SyncResult.Written (_token, state') -> state' | _ -> failwith "unexpected" @> test <@ expectedWriteState = wState' @> }