Skip to content

Commit

Permalink
refactor(CosmosStore): Cleanup stored proc initialization (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Jul 26, 2023
1 parent 6df7c58 commit 5fac967
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 36 deletions.
4 changes: 2 additions & 2 deletions src/Equinox.Core/Caching.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'ev
| ValueNone -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct)
| ValueSome (struct (token, state)) -> category.Reload(log, streamName, requireLeader, token, state, ct)
return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload, ct) }
member _.Sync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task {
member _.Sync(log, categoryName, streamId, streamName, context, streamToken, state, events, ct) = task {
let timestamp = System.Diagnostics.Stopwatch.GetTimestamp() // NB take the timestamp before any potential write takes place
let save struct (token, state) = cache.Save(createKey streamName, isStale, createOptions (), timestamp, token, state)
match! category.Sync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) with
match! category.Sync(log, categoryName, streamId, streamName, context, streamToken, state, events, ct) with
| SyncResult.Written tokenAndState' ->
save tokenAndState'
return SyncResult.Written tokenAndState'
Expand Down
7 changes: 3 additions & 4 deletions src/Equinox.Core/Category.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type ICategory<'event, 'state, 'context> =
/// NB the central precondition upon which the sync is predicated is that the stream has not diverged from the `originState` represented by `token`
/// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store)
abstract Sync: log: ILogger * categoryName: string * streamId: string * streamName: string * 'context
* maybeInit: (CancellationToken -> Task<unit>) voption
* originToken: StreamToken * originState: 'state * events: 'event[]
* CancellationToken -> Task<SyncResult<'state>>

Expand All @@ -31,12 +30,12 @@ open Equinox.Core.Tracing
type Category<'event, 'state, 'context>(categoryName, resolveStream) =

/// Stores without custom routing for categoryName/streamId to Table/Container etc use this default impl
new(categoryName, inner) = Category(categoryName, fun streamId -> struct (inner, Core.StreamName.render categoryName streamId, ValueNone))
new(categoryName, inner) = Category(categoryName, fun streamId -> struct (inner, Core.StreamName.render categoryName streamId))

/// Provides access to the low level store operations used for Loading and/or Syncing updates via the Decider
/// (Normal usage is via the adjacent `module Decider` / `Stream.Resolve` helpers)
member _.Stream(log: Serilog.ILogger, context: 'context, streamId: string) =
let struct (inner: Core.ICategory<'event, 'state, 'context>, streamName, init) = resolveStream streamId
let struct (inner: Core.ICategory<'event, 'state, 'context>, streamName) = resolveStream streamId
{ new Core.IStream<'event, 'state> with
member _.Name = streamName
member _.LoadEmpty() = inner.Empty
Expand All @@ -48,7 +47,7 @@ type Category<'event, 'state, 'context>(categoryName, resolveStream) =
use act = source.StartActivity("Sync", System.Diagnostics.ActivityKind.Client)
if act <> null then act.AddStream(categoryName, streamId, streamName).AddSyncAttempt(attempt) |> ignore
let log = if attempt = 1 then log else log.ForContext("attempts", attempt)
return! inner.Sync(log, categoryName, streamId, streamName, context, init, token, originState, events, ct) } }
return! inner.Sync(log, categoryName, streamId, streamName, context, token, originState, events, ct) } }

[<AbstractClass; Sealed; System.Runtime.CompilerServices.Extension>]
type Stream private () =
Expand Down
38 changes: 19 additions & 19 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,12 @@ module Initialization =
/// Holds Container state, coordinating initialization activities
type internal ContainerInitializerGuard(container: Container, fallback: Container option, ?initContainer: Container -> CancellationToken -> Task<unit>) =
let initGuard = initContainer |> Option.map (fun init -> AsyncCacheCell<unit>(init container))

member _.Container = container
member _.Fallback = fallback
member internal _.InitializationGate = match initGuard with Some g when not (g.IsValid()) -> ValueSome g.Await | _ -> ValueNone
member internal _.Initialize(ct): System.Threading.Tasks.ValueTask =
match initGuard with
| Some g when not (g.IsValid()) -> g.Await(ct) |> ValueTask.ofTask |> ValueTask.ignore
| _ -> System.Threading.Tasks.ValueTask.CompletedTask

module internal Tip =

Expand Down Expand Up @@ -1064,8 +1066,8 @@ type StoreClient(container: Container, archive: Container option, query: QueryOp
Prune.until log (container, stream) query.MaxItems index ct

type internal Category<'event, 'state, 'context>
( store: StoreClient, codec: IEventCodec<'event, EventBody, 'context>,
fold: 'state -> 'event[] -> 'state, initial: 'state, isOrigin: 'event -> bool,
( store: StoreClient, createStoredProcIfNotExistsExactlyOnce: CancellationToken -> System.Threading.Tasks.ValueTask,
codec: IEventCodec<'event, EventBody, 'context>, fold: 'state -> 'event[] -> 'state, initial: 'state, isOrigin: 'event -> bool,
checkUnfolds, compressUnfolds, mapUnfolds: Choice<unit, 'event[] -> 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) =

let reload (log, streamName, (Token.Unpack pos as streamToken), state) preloaded ct: Task<struct (StreamToken * 'state)> = task {
Expand All @@ -1077,7 +1079,7 @@ type internal Category<'event, 'state, 'context>
member _.Load(log, _categoryName, _streamId, stream, _maxAge, _requireLeader, ct): Task<struct (StreamToken * 'state)> = task {
let! token, events = store.Load(log, (stream, None), (codec.TryDecode, isOrigin), checkUnfolds, ct)
return struct (token, fold initial events) }
member _.Sync(log, _categoryName, _streamId, streamName, ctx, maybeInit, (Token.Unpack pos as streamToken), state, events, ct) = task {
member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack pos as streamToken), state, events, ct) = task {
let state' = fold state events
let exp, events, eventsEncoded, projectionsEncoded =
let encode e = codec.Encode(ctx, e)
Expand All @@ -1090,7 +1092,7 @@ type internal Category<'event, 'state, 'context>
let renderElement = if compressUnfolds then JsonElement.undefinedToNull >> JsonElement.deflate else JsonElement.undefinedToNull
let projections = projectionsEncoded |> Seq.map (Sync.mkUnfold renderElement baseIndex)
let batch = Sync.mkBatch streamName eventsEncoded projections
match maybeInit with ValueNone -> () | ValueSome i -> do! i ct
do! createStoredProcIfNotExistsExactlyOnce ct
match! store.Sync(log, streamName, exp, batch, ct) with
| InternalSyncResult.Written token' -> return SyncResult.Written (token', state')
| InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (reload (log, streamName, streamToken, state) None)
Expand Down Expand Up @@ -1303,10 +1305,10 @@ type CosmosStoreContext(storeClient: CosmosStoreClient, tipOptions, queryOptions
member val StoreClient = storeClient
member val QueryOptions = queryOptions
member val TipOptions = tipOptions
member internal x.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId) =
member internal x.ResolveStoreClientAndStreamNameAndInit(categoryName, streamId) =
let struct (cg, streamName) = storeClient.ResolveContainerGuardAndStreamName(categoryName, streamId)
let store = StoreClient(cg.Container, cg.Fallback, x.QueryOptions, x.TipOptions)
struct (store, streamName, cg.InitializationGate)
struct (store, streamName, cg.Initialize)

/// For CosmosDB, caching is typically a central aspect of managing RU consumption to maintain performance and capacity.
/// The cache holds the Tip document's etag, which enables use of etag-contingent Reads (which cost only 1RU in the case where the document is unchanged)
Expand Down Expand Up @@ -1385,14 +1387,14 @@ type CosmosStoreCategory<'event, 'state, 'context> internal (name, resolveStream
| CachingStrategy.SlidingWindow (cache, window) -> Some (Equinox.CachingStrategy.SlidingWindow (cache, window))
| CachingStrategy.FixedTimeSpan (cache, period) -> Some (Equinox.CachingStrategy.FixedTimeSpan (cache, period))
let categories = System.Collections.Concurrent.ConcurrentDictionary<string, ICategory<'event, 'state, 'context>>()
let resolveInner (categoryName, container) =
let resolveInner struct (container, categoryName, init) =
let createCategory _name: ICategory<_, _, 'context> =
Category<'event, 'state, 'context>(container, codec, fold, initial, isOrigin, checkUnfolds, compressUnfolds, mapUnfolds)
Category<'event, 'state, 'context>(container, init, codec, fold, initial, isOrigin, checkUnfolds, compressUnfolds, mapUnfolds)
|> Caching.apply Token.isStale caching
categories.GetOrAdd(categoryName, createCategory)
let resolveStream streamId =
let struct (container, streamName, maybeContainerInitializationGate) = context.ResolveContainerClientAndStreamIdAndInit(name, streamId)
struct (resolveInner (name, container), streamName, maybeContainerInitializationGate)
let struct (_, streamName, _) as args = context.ResolveStoreClientAndStreamNameAndInit(name, streamId)
struct (resolveInner args, streamName)
CosmosStoreCategory(name, resolveStream)

namespace Equinox.CosmosStore.Core
Expand Down Expand Up @@ -1433,11 +1435,11 @@ type EventsContext internal
| Direction.Backward -> None, startPos

new (context: Equinox.CosmosStore.CosmosStoreContext, log) =
let struct (store, _streamId, _init) = context.ResolveContainerClientAndStreamIdAndInit(null, null)
let struct (store, _streamId, _init) = context.ResolveStoreClientAndStreamNameAndInit(null, null)
EventsContext(context, store, log)

member _.ResolveStream(streamName) =
let struct (_cc, streamName, init) = context.ResolveContainerClientAndStreamIdAndInit(null, streamName)
let struct (_cc, streamName, init) = context.ResolveStoreClientAndStreamNameAndInit(null, streamName)
struct (streamName, init)
member x.StreamId(streamName): string = x.ResolveStream streamName |> ValueTuple.fst

Expand Down Expand Up @@ -1480,11 +1482,9 @@ type EventsContext internal
/// Callers should implement appropriate idempotent handling, or use Equinox.Decider for that purpose
member x.Sync(stream, position, events: IEventData<_>[], ct): Task<AppendResult<Position>> = task {
// Writes go through the stored proc, which we need to provision per container
// Having to do this here in this way is far from ideal, but work on caching, external snapshots and caching is likely
// to move this about before we reach a final destination in any case
match x.ResolveStream stream |> ValueTuple.snd with
| ValueNone -> ()
| ValueSome init -> do! init ct
// The way this is routed is definitely hacky, but the entire existence of this API is pretty questionable, so ugliness is apppropiate
let struct (_, createStoredProcIfNotExistsExactlyOnce) = x.ResolveStream(stream)
do! createStoredProcIfNotExistsExactlyOnce ct
let batch = Sync.mkBatch stream events Seq.empty
match! store.Sync(log, stream, SyncExp.Version position.index, batch, ct) with
| InternalSyncResult.Written (Token.Unpack pos) -> return AppendResult.Ok pos
Expand Down
8 changes: 4 additions & 4 deletions src/Equinox.DynamoStore/DynamoStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,8 +1113,8 @@ type internal StoreClient(container: Container, fallback: Container option, quer
Prune.until log (container, stream) query.MaxItems index ct

type internal Category<'event, 'state, 'context>
( store: StoreClient, codec: IEventCodec<'event, EncodedBody, 'context>,
fold: 'state -> 'event[] -> 'state, initial: 'state, isOrigin: 'event -> bool,
( store: StoreClient,
codec: IEventCodec<'event, EncodedBody, 'context>, fold: 'state -> 'event[] -> 'state, initial: 'state, isOrigin: 'event -> bool,
checkUnfolds, mapUnfolds: Choice<unit, 'event[] -> 'state -> 'event[], 'event[] -> 'state -> 'event[] * 'event[]>) =
let fetch state f = task { let! token', events = f in return struct (token', fold state events) }
let reload (log, streamNam, requireLeader, (Token.Unpack pos as streamToken), state) ct: Task<struct (StreamToken * 'state)> = task {
Expand All @@ -1125,7 +1125,7 @@ type internal Category<'event, 'state, 'context>
member _.Empty = Token.empty, initial
member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, ct) =
fetch initial (store.Load(log, (streamName, None), requireLeader, (codec.TryDecode, isOrigin), checkUnfolds, ct))
member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack pos as streamToken), state, events, ct) = task {
member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack pos as streamToken), state, events, ct) = task {
let state' = fold state events
let exp, events, eventsEncoded, unfoldsEncoded =
let encode e = codec.Encode(ctx, e)
Expand Down Expand Up @@ -1340,7 +1340,7 @@ type DynamoStoreCategory<'event, 'state, 'context>(name, resolveStream) =
categories.GetOrAdd(categoryName, createCategory)
let resolveStream streamId =
let struct (container, streamName) = context.ResolveContainerClientAndStreamName(name, streamId)
struct (resolveInner (name, container), streamName, ValueNone)
struct (resolveInner (name, container), streamName)
DynamoStoreCategory(name, resolveStream)

module Exceptions =
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code
member _.Empty = context.TokenEmpty, initial
member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, _ct) =
fetch initial (loadAlgorithm log streamName requireLeader)
member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, _ct) = task {
member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack token as streamToken), state, events, _ct) = task {
let events =
match access with
| None | Some AccessStrategy.LatestKnownEvent -> events
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.EventStoreDb/EventStoreDb.fs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ type private Category<'event, 'state, 'context>(context: EventStoreContext, code
member _.Empty = context.TokenEmpty, initial
member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, ct) =
fetch initial (loadAlgorithm log streamName requireLeader ct)
member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, ct) = task {
member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack token as streamToken), state, events, ct) = task {
let events =
match access with
| None | Some AccessStrategy.LatestKnownEvent -> events
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type private Category<'event, 'state, 'context, 'Format>(store: VolatileStore<'F
match store.Load(streamName) with
| null -> return (Token.ofEmpty, initial)
| xs -> return (Token.ofValue xs, fold initial (Array.chooseV codec.TryDecode xs)) }
member _.Sync(_log, categoryName, streamId, streamName, context, _init, Token.Unpack eventCount, state, events, _ct) = task {
member _.Sync(_log, categoryName, streamId, streamName, context, Token.Unpack eventCount, state, events, _ct) = task {
let inline map i (e: FsCodec.IEventData<'Format>) = FsCodec.Core.TimelineEvent.Create(int64 i, e)
let encoded = Array.ofSeq events |> Array.mapi (fun i e -> map (eventCount + i) (codec.Encode(context, e)))
match store.TrySync(streamName, categoryName, streamId, eventCount, encoded) with
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.MessageDb/MessageDb.fs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ type private Category<'event, 'state, 'context>(context: MessageDbContext, codec
member _.Empty = context.TokenEmpty, initial
member _.Load(log, categoryName, streamId, streamName, _maxAge, requireLeader, ct) =
loadAlgorithm log categoryName streamId streamName requireLeader ct
member x.Sync(log, categoryName, streamId, streamName, ctx, _maybeInit, token, state, events, ct) = task {
member x.Sync(log, categoryName, streamId, streamName, ctx, token, state, events, ct) = task {
let encode e = codec.Encode(ctx, e)
let encodedEvents: IEventData<EventBody>[] = events |> Array.map encode
match! context.TrySync(log, categoryName, streamId, streamName, token, encodedEvents, ct) with
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.SqlStreamStore/SqlStreamStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ type private Category<'event, 'state, 'context>(context: SqlStreamStoreContext,
member _.Empty = context.TokenEmpty, initial
member _.Load(log, _categoryName, _streamId, streamName, _maxAge, requireLeader, ct) =
fetch initial (loadAlgorithm log streamName requireLeader ct)
member _.Sync(log, _categoryName, _streamId, streamName, ctx, _maybeInit, (Token.Unpack token as streamToken), state, events, ct) = task {
member _.Sync(log, _categoryName, _streamId, streamName, ctx, (Token.Unpack token as streamToken), state, events, ct) = task {
let events =
match access with
| None | Some AccessStrategy.LatestKnownEvent -> events
Expand Down
4 changes: 2 additions & 2 deletions tests/Equinox.Core.Tests/CachingTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type SpyCategory() =
Interlocked.Increment &loads |> ignore
do! Task.Delay(x.Delay, ct)
return struct (mkToken(), Interlocked.Increment &state) }
member _.Sync(_log, _cat, _sid, _sn, _ctx, _maybeInit, _originToken, originState, events, _ct) = task {
member _.Sync(_log, _cat, _sid, _sn, _ctx, _originToken, originState, events, _ct) = task {
return Equinox.Core.SyncResult.Written (mkToken(), originState + events.Length) }

interface Equinox.Core.Caching.IReloadable<State> with
Expand All @@ -49,7 +49,7 @@ let writeOriginState = 99
let expectedWriteState = 99 + 2 // events written

let write sn (sut: Equinox.Core.ICategory<_, _, _>) = task {
let! wr = sut.Sync(Serilog.Log.Logger, null, null, sn, (), ValueNone, Unchecked.defaultof<_>, writeOriginState, Array.replicate 2 (), CancellationToken.None)
let! wr = sut.Sync(Serilog.Log.Logger, null, null, sn, (), Unchecked.defaultof<_>, writeOriginState, Array.replicate 2 (), CancellationToken.None)
let wState' = trap <@ match wr with Equinox.Core.SyncResult.Written (_token, state') -> state' | _ -> failwith "unexpected" @>
test <@ expectedWriteState = wState' @> }

Expand Down

0 comments on commit 5fac967

Please sign in to comment.