Skip to content

Commit

Permalink
Accumulator tidying followup to #97 (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Oct 7, 2019
1 parent 86bd241 commit 0df11b6
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Updated `FsCodec` to `1.0.0-pr.20.rc2.5` to pick up final name changes [#162](https://github.com/jet/equinox/pull/162)
- Replaced `TargetId.AggregateIdEmpty` with `ResolveOption.AssumeEmpty` [#163](https://github.com/jet/equinox/pull/163)
- Extracted `Equinox.Core` module [#164](https://github.com/jet/equinox/pull/164)
- Used `Transact` name consistently in `Accummulator` (follow-up to [#97](https://github.com/jet/equinox/pull/97)) [#166](https://github.com/jet/equinox/pull/166)

### Removed
### Fixed
Expand Down
26 changes: 13 additions & 13 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ type Accumulator(fold, originState) =
// Effective State including `Accumulated`
member State : State
// Execute a normal command, stashing the Events in `Accumulated`
member Execute(interpret : State -> Event list) : unit
// Execute a normal Command-interpretation, stashing the Events in `Accumulated`
member Transact(interpret : State -> Event list) : unit
// Less frequently used variant of Execute that can additionally yield a result
member Decide(decide : State -> Result * Event list) : 'result
// Less frequently used variant that can additionally yield a result
member Transact(decide : State -> 'result * Event list) : 'result
```

`Accumulator` is a small optional helper class that can be useful in certain scenarios where one is applying a sequence of Commands. One can use it within the body of a `decide` or `interpret` function as passed to `Stream.Transact`.
Expand Down Expand Up @@ -473,10 +473,10 @@ type Service(log, resolveStream, ?maxAttempts) =
```

- `handle` represents a command processing flow where we (idempotently) apply a command, but then also emit the state to the caller, as dictated by the needs of the call as specified in the TodoBackend spec. This uses the `Accumulator` helper type, which accumulates an `Event list`, and provides a way to compute the `state` incorporating the proposed events immediately.
- While we could theoretically use Projections to service queries from an eventually consistent Read Model, this is not in aligment with the Read-you-writes expectation embodied in the tests (i.e. it would not pass the tests), and, more importantly, would not work correctly as a backend for the app. Because we have more than one query required, we make a generic `query` method, even though a specific `read` method (as in the Favorite example) might make sense to expose too
- While we could theoretically use Projections to service queries from an eventually consistent Read Model, this is not in alignment with the Read-you-writes expectation embodied in the tests (i.e. it would not pass the tests), and, more importantly, would not work correctly as a backend for the app. Because we have more than one query required, we make a generic `query` method, even though a specific `read` method (as in the Favorite example) might make sense to expose too
- The main conclusion to be drawn from the Favorites and TodoBackend `Service` implementations's use of `Stream` Methods is that, while there can be commonality in terms of the sorts of transactions one might encapsulate in this manner, there's also It Depends factors; for instance:
i) the design doesnt provide complete idempotency and/or follow the CQRS style
ii) the fact that this is a toy system with lots of artificaial constraints and/or simplifications when compared to aspects that might present in a more complete implementation.
ii) the fact that this is a toy system with lots of artificial constraints and/or simplifications when compared to aspects that might present in a more complete implementation.
- the `AggregateId` and `Stream` Active Patterns provide succinct ways to map an incoming `clientId` (which is not a `string` in the real implementation but instead an id using [`FSharp.UMX`](https://github.com/fsprojects/FSharp.UMX) in an unobtrusive manner.

# Equinox Architectural Overview
Expand Down Expand Up @@ -522,15 +522,15 @@ Key aspects relevant to the Equinox programming model:

### Azure CosmosDb concerns

TL;DR caching can optimize RU consumption significantly. Due to the intrinsic ability to mutate easily, the potential to integrate rolling snapshots into core storage is clear. Providing ways to cache and snapshot matter a lot on CosmosDb, as lowest-common-demominator queries loading lots of events cost in performance and cash. The specifics of how you use the changefeed matters more than one might thing from the CosmosDb high level docs.
TL;DR caching can optimize RU consumption significantly. Due to the intrinsic ability to mutate easily, the potential to integrate rolling snapshots into core storage is clear. Providing ways to cache and snapshot matter a lot on CosmosDb, as lowest-common-denominator queries loading lots of events cost in performance and cash. The specifics of how you use the changefeed matters more than one might thing from the CosmosDb high level docs.

Overview: CosmosDb has been in production for >5 years and is a mature Document database. The initial DocumentDb offering is at this point a mere projected programming model atop a generic Document data store. Its changefeed mechanism affords a base upon which one can manage projections, but there is no directly provided mechanism that lends itself to building Projections that map directly to EventStore's facilties in this regard (i.e., there is nowhere to maintain consumer offsts in the store itself).
Overview: CosmosDb has been in production for >5 years and is a mature Document database. The initial DocumentDb offering is at this point a mere projected programming model atop a generic Document data store. Its changefeed mechanism affords a base upon which one can manage projections, but there is no directly provided mechanism that lends itself to building Projections that map directly to EventStore's facilities in this regard (i.e., there is nowhere to maintain consumer offsets in the store itself).

Key aspects relevant to the Equinox programming model:

- CosmosDb has pervasive optimization feedback per call in the form of a Request Charge attached to each and every action. Working to optimize one's request charges per scenario is critical both in terms of the effect it has on the amount of Request Units/s one you need to preprovision (which translates directly to costs on your bill), and then live predictably within if one is not to be throttled with 429 responses. In general, the request charging structure can be considered a very strong mechanical sympathy feedback signal
- CosmosDb has pervasive optimization feedback per call in the form of a Request Charge attached to each and every action. Working to optimize one's request charges per scenario is critical both in terms of the effect it has on the amount of Request Units/s one you need to pre-provision (which translates directly to costs on your bill), and then live predictably within if one is not to be throttled with 429 responses. In general, the request charging structure can be considered a very strong mechanical sympathy feedback signal
- Point reads of single documents based on their identifier are charged as 1 RU plus a price per KB and are optimal. Queries, even ones returning that same single document, have significant overhead and hence are to be avoided
- One key mechanism CosmosDb provides to allow one to work efficiently is that any point-read request where one supplies a valid `etag` is charged at 1 RU, regardless of the size one would be transferring in the case of a cache miss (the other key benefit of using this is that it avoids unecessarly clogging of the bandwidth, and optimal latencies due to no unnecessary data transfers)
- One key mechanism CosmosDb provides to allow one to work efficiently is that any point-read request where one supplies a valid `etag` is charged at 1 RU, regardless of the size one would be transferring in the case of a cache miss (the other key benefit of using this is that it avoids unnecessarily clogging of the bandwidth, and optimal latencies due to no unnecessary data transfers)
- Indexing things surfaces in terms of increased request charges; at scale, each indexing hence needs to be justified
- Similarly to EventStore, the default ARS encoding CosmosDb provides, together with interoperability concerns, means that straight json makes sense as an encoding form for events (UTF-8 arrays)
- Collectively, the above implies (arguably counterintuitively) that using the powerful generic querying facility that CosmosDb provides should actually be a last resort.
Expand Down Expand Up @@ -610,7 +610,7 @@ The dominant pattern is that reads request _Tip_ with an `IfNoneMatch` precondi
- `Found` - (if there are multiple writers and/or we don't have a cached version) - for the minimal possible cost (a point read, not a query), we have all we need to establish the state:-
`i`: a version number
`e`: events since that version number
`u`: unfolded (auxiliary) events computed at the same time as the batch of events was sent (aka inforamlly as snapshots) - (these enable us to establish the `state` without further queries or roundtrips to load and fold all preceding events)
`u`: unfolded (auxiliary) events computed at the same time as the batch of events was sent (aka informally as snapshots) - (these enable us to establish the `state` without further queries or roundtrips to load and fold all preceding events)

## Building a state from the Storage Model and/or the Cache

Expand Down Expand Up @@ -658,7 +658,7 @@ This covers what the most complete possible implementation of the JS Stored Proc

The `sync` stored procedure takes as input, a document that is almost identical to the format of the _`Tip`_ batch (in fact, if the stream is found to be empty, it is pretty much the template for the first document created in the stream). The request includes the following elements:

- `expectedVersion`: the position the requestor has based their [proposed] events on (no, [providing an `etag` to save on Request Charges is not possible in the Stored Proc](https://stackoverflow.com/questions/53355886/azure-cosmosdb-stored-procedure-ifmatch-predicate))
- `expectedVersion`: the position the requester has based their [proposed] events on (no, [providing an `etag` to save on Request Charges is not possible in the Stored Proc](https://stackoverflow.com/questions/53355886/azure-cosmosdb-stored-procedure-ifmatch-predicate))
- `e`: array of Events (see Event, above) to append iff the expectedVersion check is fulfilled
- `u`: array of `unfold`ed events (aka snapshots) that supersede items with equivalent `c`ase values
- `maxEvents`: the maximum number of events in an individual batch prior to starting a new one. For example:
Expand All @@ -667,7 +667,7 @@ The `sync` stored procedure takes as input, a document that is almost identical
- if the total length including the new `e`vents would exceed `maxEvents`, the Tip is 'renamed' (gets its `id` set to `i.toString()`) to become a batch, and the new events go into the new Tip-Batch, the _tip_ gets frozen as a `Batch`, and the new request becomes the _tip_ (as an atomic transaction on the server side)

- (PROPOSAL/FUTURE) `thirdPartyUnfoldRetention`: how many events to keep before the base (`i`) of the batch if required by lagging `u`nfolds which would otherwise fall out of scope as a result of the appends in this batch (this will default to `0`, so for example if a writer says maxEvents `10` and there is an `u`nfold based on an event more than `10` old it will be removed as part of the appending process)
- (PROPOSAL/FUTURE): adding an `expectedEtag` would enable competing writers to maintain and update `u`nfold data in a consistent fashion (backign off and retrying in the case of conflict, _without any events being written per state change_)
- (PROPOSAL/FUTURE): adding an `expectedEtag` would enable competing writers to maintain and update `u`nfold data in a consistent fashion (backing off and retrying in the case of conflict, _without any events being written per state change_)

# Equinox.Cosmos.Core.Events

Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Backend/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Service(log, resolveStream) =
stream.TransactAsync(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
let ctx = Equinox.Accumulator(Folds.fold,state)
let execute = Commands.interpret >> ctx.Execute
let execute = Commands.interpret >> ctx.Transact
let res = flow ctx execute
return res,ctx.Accumulated })
let read (Stream stream) : Async<Folds.State> =
Expand Down
2 changes: 1 addition & 1 deletion samples/TodoBackend/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Service(log, resolveStream, ?maxAttempts) =
let handle (Stream stream) command =
stream.Transact(fun state ->
let ctx = Equinox.Accumulator(Folds.fold, state)
ctx.Execute (Commands.interpret command)
ctx.Transact (Commands.interpret command)
ctx.State.items,ctx.Accumulated)

member __.List(clientId) : Async<Todo seq> =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/Todo.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Service(log, resolveStream, ?maxAttempts) =
let handle (Stream stream) command : Async<Todo list> =
stream.Transact(fun state ->
let ctx = Equinox.Accumulator(fold, state)
ctx.Execute (interpret command)
ctx.Transact (interpret command)
ctx.State.items,ctx.Accumulated)
let query (Stream stream) (projection : State -> 't) : Async<'t> =
stream.Query projection
Expand Down
18 changes: 9 additions & 9 deletions src/Equinox/Accumulator.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Equinox

/// Maintains a rolling folded State while Accumulating Events decided upon as part of a decision flow
/// Maintains a rolling folded State while Accumulating Events pended as part of a decision flow
type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originState : 'state) =
let accumulated = ResizeArray<'event>()

Expand All @@ -13,19 +13,19 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt
accumulated |> fold originState

/// Invoke a decision function, gathering the events (if any) that it decides are necessary into the `Accumulated` sequence
member __.Execute(decide : 'state -> 'event list) : unit =
decide __.State |> accumulated.AddRange
member __.Transact(interpret : 'state -> 'event list) : unit =
interpret __.State |> accumulated.AddRange
/// Invoke an Async decision function, gathering the events (if any) that it decides are necessary into the `Accumulated` sequence
member __.ExecuteAsync(decide : 'state -> Async<'event list>) : Async<unit> = async {
let! events = decide __.State
member __.TransactAsync(interpret : 'state -> Async<'event list>) : Async<unit> = async {
let! events = interpret __.State
accumulated.AddRange events }
/// As per `Execute`, invoke a decision function, while also propagating a result yielded as the fst of an (result, events) pair
member __.Decide(decide : 'state -> 'result * 'event list) : 'result =
/// Invoke a decision function, while also propagating a result yielded as the fst of an (result, events) pair
member __.Transact(decide : 'state -> 'result * 'event list) : 'result =
let result, newEvents = decide __.State
accumulated.AddRange newEvents
result
/// As per `ExecuteAsync`, invoke a decision function, while also propagating a result yielded as the fst of an (result, events) pair
member __.DecideAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = async {
/// Invoke a decision function, while also propagating a result yielded as the fst of an (result, events) pair
member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = async {
let! result, newEvents = decide __.State
accumulated.AddRange newEvents
return result }

0 comments on commit 0df11b6

Please sign in to comment.