Skip to content

Commit

Permalink
feat!: Push StreamId out to FsCodec (#419)
Browse files Browse the repository at this point in the history
* Target packaged FsCodec update featuring polished Core.StreamId
* Move Category into Equinox
* Rename Core.fs -> Stream.fs, move Impl in
* Unmangle table/container mapping in doc stores
  • Loading branch information
bartelink authored Aug 8, 2023
1 parent 5fd9456 commit caee095
Show file tree
Hide file tree
Showing 62 changed files with 636 additions and 779 deletions.
9 changes: 4 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `Equinox`: `Decider.Transact`, `TransactAsync`, `TransactExAsync` overloads [#325](https://github.com/jet/equinox/pull/325)
- `Equinox`: `StreamId` replaces usage of `FsCodec.StreamName` [#353](https://github.com/jet/equinox/pull/353) [#378](https://github.com/jet/equinox/pull/378)
- `Equinox.LoadOption.RequireLeader`: support for requesting a consistent read of a stream [#341](https://github.com/jet/equinox/pull/341)
- `Equinox.LoadOption.AllowStale`: Read mode that limits reads to a maximum of one retrieval per the defined time window [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Core`: `Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337)
Expand All @@ -21,6 +20,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `CosmosStore.Exceptions`: Active patterns to simplify classification in the context of Propulsion handlers [#416](https://github.com/jet/equinox/pull/416)
- `CosmosStore.Prometheus`: Add `rut` tag to enable filtering/grouping by Read vs Write activity as per `DynamoStore` [#321](https://github.com/jet/equinox/pull/321)
- `DynamoStore`/`DynamoStore.Prometheus`: Implements the majority of the `CosmosStore` functionality via `FSharp.AWS.DynamoDB` [#321](https://github.com/jet/equinox/pull/321)
- `EventStore`: Revise test rig to target a Docker-hosted cluster [#317](https://github.com/jet/equinox/pull/317)
- `EventStoreDb`: As per `EventStore` module, but using the modern `EventStore.Client.Grpc.Streams` client [#196](https://github.com/jet/equinox/pull/196)
- `MessageDb`: Implements a [message-db](http://docs.eventide-project.org/user-guide/message-db/) storage backend [#339](https://github.com/jet/equinox/pull/339) with OpenTelemetry tracing and snapshotting support [#348](https://github.com/jet/equinox/pull/348) :pray: [@nordfjord](https://github.com/nordfjord)
- `eqx dump`: `-s` flag is now optional
Expand All @@ -31,16 +31,16 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Change surface APIs that use`'event list` or `'event seq` to `'event[]` [#411](https://github.com/jet/equinox/pull/411)
- Raise `FSharp.Core` req to `6.0.7`, framework req to `net6.0` [#310](https://github.com/jet/equinox/pull/310) [#337](https://github.com/jet/equinox/pull/337) [#33](https://github.com/jet/equinox/pull/33) [#411](https://github.com/jet/equinox/pull/411)
- Replace `AsyncSeq` usage with `FSharp.Control.TaskSeq` v `0.4.0` [#361](https://github.com/jet/equinox/pull/361) [#391](https://github.com/jet/equinox/pull/391)
- `Equinox`: Push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: Move `Serilog` dependency from `Decider` constructor to `Category`/`Decider.forStream` [#337](https://github.com/jet/equinox/pull/337) [#419](https://github.com/jet/equinox/pull/419)
- `Equinox`: `FsCodec.StreamId` replaces usage of `FsCodec.StreamName` [#353](https://github.com/jet/equinox/pull/353) [#378](https://github.com/jet/equinox/pull/378) [#419](https://github.com/jet/equinox/pull/419)
- `Equinox.ResolveOption`: rename to `LoadOption` [#308](https://github.com/jet/equinox/pull/308) [#413](https://github.com/jet/equinox/pull/413)
- `Equinox.LoadOption`: Rename `AllowStale` to `AnyCachedValue` [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Decider`: `log` is now supplied via `Equinox.Category` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: Replace `'event list` with `'event[]` [#411](https://github.com/jet/equinox/pull/411)
- `Equinox.Decider`: Replace `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core`: push `FsCodec` dependency out to concrete stores [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Core.AsyncBatchingGate`: renamed to `Batching.Batcher` [#390](https://github.com/jet/equinox/pull/390)
- Stores: Change Event Body types, requiring `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory<byte>` and/or `JsonElement` see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323)
- Stores: `*Category.Resolve`: Replace `Resolve(sn, ?ResolveOption)` with `?load = LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
- Stores: `*Category.Resolve`: Replace `Resolve(sn, ?ResolveOption, ?context)` with `?load = LoadOption` parameter on all `Transact` and `Query` methods, and `Decider.forStream`/`Decider.forContext` to convey context [#308](https://github.com/jet/equinox/pull/308)
- Stores: `*Category` ctor: Add mandatory `name` argument, and `Name` property [#410](https://github.com/jet/equinox/pull/410)
- Stores: `*Category` ctor: Change `caching` to be last argument, to reflect that it is applied over the top [#410](https://github.com/jet/equinox/pull/410)
- Stores: `*Category` ctor: Change `caching` and `access` to be mandatory, adding `NoCaching` and `Unoptimized` modes to represent the former defaults [#417](https://github.com/jet/equinox/pull/417)
Expand All @@ -62,7 +62,6 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Fixed

- `EventStore`: Revise test rig to target a Docker-hosted cluster [#317](https://github.com/jet/equinox/pull/317)
- `EventStore/SqlStreamStore`: rename `Equinox.XXXStore.Log.Event` -> `Metric` to match `CosmosStore` [#311](https://github.com/jet/equinox/pull/311)
- `SqlStreamStore`: Fix `Metric` key to be `ssEvt` (was `esEvt`) [#311](https://github.com/jet/equinox/pull/311)

Expand Down
41 changes: 21 additions & 20 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,9 @@ highly recommended to use the following canonical skeleton layout:
```fsharp
module Aggregate
(* StreamName section *)
let [<Literal>] Category = "category"
let streamId = Equinox.StreamId.gen Id.toString
module Stream =
let [<Literal>] Category = "category"
let id = FsCodec.StreamId.gen Id.toString
(* Optionally, Helpers/Types *)
Expand Down Expand Up @@ -386,7 +385,7 @@ type Service internal (resolve: Id -> Equinox.Decider<Events.Event, Fold.State)
let decider = resolve id
decider.Transact(decideX inputs)
let create category = Service(streamId >> Equinox.Decider.forStream Serilog.Log.Logger category)
let create category = Service(Stream.id >> Equinox.Decider.forStream Serilog.Log.Logger category)
```

- `Service`'s constructor is `internal`; `create` is the main way in which one
Expand Down Expand Up @@ -417,12 +416,12 @@ let cacheStrategy = Equinox.CosmosStore.CachingStrategy.SlidingWindow (cache, de
module EventStore =
let accessStrategy = Equinox.EventStoreDb.AccessStrategy.RollingSnapshots (Fold.isOrigin, Fold.snapshot)
let category (context, cache) =
Equinox.EventStore.EventStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
Equinox.EventStore.EventStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
module Cosmos =
let accessStrategy = Equinox.CosmosStore.AccessStrategy.Snapshot Fold.Snapshot.config
let category (context, cache) =
Equinox.CosmosStore.CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
Equinox.CosmosStore.CosmosStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
### `MemoryStore` Storage Binding Module
Expand All @@ -445,10 +444,10 @@ In F#, independent of the Store being used, the Equinox programming model
involves (largely by convention, see [FAQ](README.md#FAQ)), per aggregation of
events on a given category of stream:

- `Category`: the common part of the [Stream Name](https://github.com/fscodec#streamname),
- `Stream.Category`: the common part of the [Stream Name](https://github.com/fscodec#streamname),
i.e., the `"Favorites"` part of the `"Favorites-clientId"`

- `streamId`: function responsible for mapping from the input elements that define the Aggregate's identity
- `Stream.id`: function responsible for mapping from the input elements that define the Aggregate's identity
to the `streamId` portion of the `{categoryName}-{streamId}` StreamName that's used within the concrete store.
In general, the inputs should be [strongly typed ids](https://github.com/jet/FsCodec#strongly-typed-stream-ids-using-fsharpumx)

Expand Down Expand Up @@ -542,8 +541,9 @@ brevity, that implements all the relevant functions above:
```fsharp
(* Event stream naming + schemas *)
let [<Literal>] Category = "Favorites"
let streamId = Equinox.StreamId.gen ClientId.toString
module Stream =
let [<Literal>] Category = "Favorites"
let id = FsCodec.StreamId.gen ClientId.toString
type Item = { id: int; name: string; added: DateTimeOffset }
type Event =
Expand Down Expand Up @@ -589,7 +589,7 @@ let toSnapshot state = [| Event.Snapshotted (Array.ofList state) |]
(*
* The Service defines operations in business terms, neutral to any concrete
* store selection or implementation supplied only a `resolve` function that can
* be used to map from ids (as supplied to the `streamId` function) to an
* be used to map from ids (as supplied to the `Stream.id` function) to an
* Equinox.Decider; Typically the service should be a stateless Singleton
*)
Expand All @@ -613,7 +613,7 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S
read clientId
let create resolve: Service =
Service(streamId >> resolve Category)
Service(Stream.id >> resolve)
```

<a name="api"></a>
Expand Down Expand Up @@ -692,13 +692,13 @@ Equinox’s Command Handling consists of < 200 lines including interfaces and
comments in https://github.com/jet/equinox/tree/master/src/Equinox - the
elements you'll touch in a normal application are:

- [`module Impl`](https://github.com/jet/equinox/blob/master/src/Equinox/Core.fs#L33) -
- [`module Stream`](https://github.com/jet/equinox/blob/master/src/Equinox/Stream.fs#L30) -
internal implementation of Optimistic Concurrency Control / retry loop used
by `Decider`. It's recommended to at least scan this file as it defines the
Transaction semantics that are central to Equinox and the overall `Decider` concept.
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L7) -
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L11) -
surface API one uses to `Transact` or `Query` against a specific stream's state
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L110) -
- [`type LoadOption` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L218) -
used to specify optimization overrides to be applied when a `Decider`'s `Query` or `Transact` operations establishes the state of the stream

Its recommended to read the examples in conjunction with perusing the code in
Expand Down Expand Up @@ -828,8 +828,9 @@ context
#### `Decider` usage

```fsharp
let [<Literal>] Category = "Favorites"
let streamId = Equinox.StreamId.gen ClientId.toString
module Stream =
let [<Literal>] Category = "Favorites"
let id = FsCodec.StreamId.gen ClientId.toString
type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.State>) =
Expand All @@ -841,7 +842,7 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S
let decider = resolve clientId
decider.Query id
let create resolve = Service(streamId >> resolve Category)
let create resolve = Service(Stream.id >> resolve)
```

`Read` above will do a roundtrip to the Store in order to fetch the most recent
Expand Down Expand Up @@ -1082,7 +1083,7 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S
and/or simplifications when compared to aspects that might present in a
more complete implementation.

- the `streamId` helper (and optional [`Parse` Active Patterns](https://github.com/jet/fscodec#adding-matchers-to-the-event-contract))
- the `Stream.id` helper (and optional [`Parse` Active Patterns](https://github.com/jet/fscodec#adding-matchers-to-the-event-contract))
provide succinct ways to map an incoming `clientId` (which is not a `string`
in the real implementation but instead an id using
[`FSharp.UMX`](https://github.com/fsprojects/FSharp.UMX) in an unobtrusive
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

## Core library

- `Equinox` [![NuGet](https://img.shields.io/nuget/v/Equinox.svg)](https://www.nuget.org/packages/Equinox/): Store-agnostic decision flow runner that manages the optimistic concurrency protocol and application-level API surface. ([depends](https://www.fuget.org/packages/Equinox) only on `FSharp.Core` v `6.0.7`, `FSharp.UMX` v `1.1.0`
- `Equinox` [![NuGet](https://img.shields.io/nuget/v/Equinox.svg)](https://www.nuget.org/packages/Equinox/): Store-agnostic decision flow runner that manages the optimistic concurrency protocol and application-level API surface. ([depends](https://www.fuget.org/packages/Equinox) only on `FSharp.Core` v `6.0.7`, `FsCodec` v `3.0.0`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc)

## Serialization support

Expand All @@ -168,7 +168,7 @@ The components within this repository are delivered as multi-targeted Nuget pack

## Data Store libraries

- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.MemoryCache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`, `Serilog` (but not specific Serilog sinks, i.e. you configure to emit to `NLog` etc))
- `Equinox.Core` [![NuGet](https://img.shields.io/nuget/v/Equinox.Core.svg)](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in the concrete Store implementations, together with the default [`System.Runtime.Caching.MemoryCache`-based] `Cache` implementation. Hosts generic utility types frequently useful alongside Equinox: [`AsyncCacheCell`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs#L36), [`Batcher`, `BatcherCache`, `BatcherDictionary`](https://github.com/jet/equinox/blob/master/src/Equinox.Core/Batching.fs#L44). ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`)
- `Equinox.MemoryStore` [![MemoryStore NuGet](https://img.shields.io/nuget/v/Equinox.MemoryStore.svg)](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox.Core`, `FsCodec`)
- `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox.Core`, `Microsoft.Azure.Cosmos` >= `3.27`, `FsCodec`, `System.Text.Json`, `FSharp.Control.TaskSeq`)
- `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`)
Expand Down Expand Up @@ -875,7 +875,7 @@ Ouch, not looking forward to reading all that logic :frown: ? [Have a read, it's
> I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Decider.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` that takes a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`?
The bulk of the implementation is in [`Equinox/Decider.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs), see the `let run` function.
The bulk of the implementation is in [`Equinox/Stream.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Stream.fs#L32), see the `let run` function.
There are [sequence diagrams in Documentation MD](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#code-diagrams-for-equinoxeventstore--equinoxsqlstreamstore) but I'll summarize here:
Expand Down Expand Up @@ -958,7 +958,7 @@ As teased in both, there will hopefully eventually (but hopefully not [inevitabl
#### In Equinox
The Equinox `type Decider` exposes an [API that covers the needs of making Consistent Decisions against a State derived from Events on a Stream](
https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L22-L56). At a high level, we have:
https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L11-L96). At a high level, we have:
- `Transact*` functions - these run a decision function that may result in a change to the State, including management of the retry cycle when a consistency violation occurs during the syncing of the state with the backing store (See [Optmimistic Concurrency Control](https://en.wikipedia.org/wiki/Optimistic_concurrency_control)). Some variants can also yield an outcome to the caller after the syncing to the store has taken place.
- `Query*` functions - these run a render function projecting from the State that the Decider manages (but can't mutate it or trigger changes). The concept of [CQRS](https://martinfowler.com/bliki/CQRS.html) is a consideration here - using the Decider to read state should not be a default approach (but equally should not be considered off limits).
Expand Down
8 changes: 4 additions & 4 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@ type ServiceBuilder(storageConfig, handlerLog) =
member _.CreateFavoritesService() =
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.Snapshot.config
store.Category(Favorites.Category, Favorites.Events.codec, fold, initial, snapshot)
store.Category(Favorites.Stream.Category, Favorites.Events.codec, fold, initial, snapshot)
|> Decider.forStream handlerLog
|> Favorites.create

member _.CreateSaveForLaterService() =
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
let snapshot = SavedForLater.Fold.isOrigin, SavedForLater.Fold.compact
store.Category(SavedForLater.Category, SavedForLater.Events.codec, fold, initial, snapshot)
let snapshot = SavedForLater.Fold.Snapshot.config
store.Category(SavedForLater.Stream.Category, SavedForLater.Events.codec, fold, initial, snapshot)
|> Decider.forStream handlerLog
|> SavedForLater.create 50

member _.CreateTodosService() =
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
let snapshot = TodoBackend.Fold.Snapshot.config
store.Category(TodoBackend.Category, TodoBackend.Events.codec, fold, initial, snapshot)
store.Category(TodoBackend.Stream.Category, TodoBackend.Events.codec, fold, initial, snapshot)
|> Decider.forStream handlerLog
|> TodoBackend.create

Expand Down
Loading

0 comments on commit caee095

Please sign in to comment.