Skip to content

Commit

Permalink
🚤 TLDR for @fluxlife
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 23, 2020
1 parent 880b2a3 commit 338012e
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 52 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ Some aspects of the implementation are distilled from [`Jet.com` systems dating
- `MemoryStore`: In-memory store (volatile, for unit or integration test purposes). Fulfils the full contract Equinox imposes on a store, but without I/O costs [(it's ~100 LOC wrapping a `ConcurrentDictionary`)](https://github.com/jet/equinox/blob/master/src/Equinox.MemoryStore/MemoryStore.fs), and the ability to [take serialization/deserialization cost out of the picture](https://github.com/jet/FsCodec#boxcodec).
- [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore): Bindings for the powerful and widely used SQL-backed Event Storage system. [See SqlStreamStore docs](https://sqlstreamstore.readthedocs.io/en/latest/#introduction). :pray: [@rajivhost](https://github.com/rajivhost)

<a name="tldr"></a>
# TL;DR :fast_forward:

- **Dev that wants a slab of code instead of a wall of text and will guess the rest** ? :point_right: <100 LOC end to end 'tutorial' using CosmosDB: https://github.com/jet/equinox/blob/master/samples/Tutorial/Cosmos.fsx#L36

# Features

- Designed not to invade application code; Domain tests can be written directly against your models without any need to involve or understand Equinox assemblies or constructs as part of writing those tests.
Expand Down
137 changes: 85 additions & 52 deletions samples/Tutorial/Cosmos.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,87 +19,120 @@
#r "Serilog.Sinks.Seq.dll"
#r "Equinox.Cosmos.dll"

let Category = "Favorites"
let streamName clientId = FsCodec.StreamName.create Category clientId
module Log =

open Serilog
open Serilog.Events
let verbose = true // false will remove lots of noise
let log =
let c = LoggerConfiguration()
let c = if verbose then c.MinimumLevel.Debug() else c
let c = c.WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump
let c = c.WriteTo.Seq("http://localhost:5341") // https://getseq.net
let c = c.WriteTo.Console(if verbose then LogEventLevel.Debug else LogEventLevel.Information)
c.CreateLogger()
let dumpMetrics () = Equinox.Cosmos.Store.Log.InternalMetrics.dump log

module Favorites =

type Item = { sku : string }
type Event =
| Added of Item
| Removed of Item
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
type State = string list
let initial : State = []
let evolve state = function
| Added {sku = sku } -> sku :: state
| Removed {sku = sku } -> state |> List.filter (fun x -> x <> sku)
let fold s xs = Seq.fold evolve s xs
let Category = "Favorites"
let streamName clientId = FsCodec.StreamName.create Category clientId

module Events =

type Item = { sku : string }
type Event =
| Added of Item
| Removed of Item
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>() // Coming soon, replace Newtonsoft with SystemTextJson and works same

module Fold =

type State = string list
let initial : State = []
let evolve state = function
| Events.Added {sku = sku } -> sku :: state
| Events.Removed {sku = sku } -> state |> List.filter (fun x -> x <> sku)
let fold s xs = Seq.fold evolve s xs

type Command =
| Add of string
| Remove of string
let interpret command state =
match command with
| Add sku -> if state |> List.contains sku then [] else [Added {sku = sku}]
| Remove sku -> if state |> List.contains sku then [Removed {sku = sku}] else []
| Add sku -> if state |> List.contains sku then [] else [ Events.Added {sku = sku}]
| Remove sku -> if state |> List.contains sku then [ Events.Removed {sku = sku}] else []

type Service internal (resolve : string -> Equinox.Stream<Event, State>) =
type Service internal (resolve : string -> Equinox.Stream<Events.Event, Fold.State>) =

let execute clientId command : Async<unit> =
member __.Favorite(clientId, sku) =
let stream = resolve clientId
stream.Transact(interpret command)

member __.Favorite(clientId, sku) = execute clientId (Add sku)
member __.Unfavorite(clientId, skus) = execute clientId (Remove skus)
stream.Transact(interpret (Add sku))
member __.Unfavorite(clientId, skus) =
let stream = resolve clientId
stream.Transact(interpret (Remove skus))
member __.List clientId: Async<string list> =
let stream = resolve clientId
stream.Query id

module Log =
open Serilog
open Serilog.Events
let verbose = true // false will remove lots of noise
let log =
let c = LoggerConfiguration()
let c = if verbose then c.MinimumLevel.Debug() else c
let c = c.WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump
let c = c.WriteTo.Seq("http://localhost:5341") // https://getseq.net
let c = c.WriteTo.Console(if verbose then LogEventLevel.Debug else LogEventLevel.Information)
c.CreateLogger()
let dumpMetrics () = Equinox.Cosmos.Store.Log.InternalMetrics.dump log
let create resolve =
let resolve clientId = Equinox.Stream(Log.log, resolve (streamName clientId), maxAttempts = 3)
Service(resolve)

module Cosmos =

open Equinox.Cosmos // Everything outside of this module is completely storage agnostic so can be unit tested simply and/or bound to any store
let accessStrategy = AccessStrategy.Unoptimized // Or Snapshot etc https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#access-strategies
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create resolver.Resolve

let [<Literal>] appName = "equinox-tutorial"
let cache = Equinox.Cache(appName, 20)

open Equinox.Cosmos
open System
module Store =
let read key = System.Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get

let connector = Connector(TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., log=Log.log)
let conn = connector.Connect(appName, Discovery.FromConnectionString (read "EQUINOX_COSMOS_CONNECTION")) |> Async.RunSynchronously
let gateway = Gateway(conn, BatchingPolicy())
let connector = Equinox.Cosmos.Connector(System.TimeSpan.FromSeconds 5., 2, System.TimeSpan.FromSeconds 5., log=Log.log)
let conn = connector.Connect(appName, Equinox.Cosmos.Discovery.FromConnectionString (read "EQUINOX_COSMOS_CONNECTION")) |> Async.RunSynchronously
let createContext () = Equinox.Cosmos.Context(conn, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")

let context = Context(gateway, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching

module FavoritesCategory =
let resolver = Resolver(Store.context, Favorites.codec, Favorites.fold, Favorites.initial, Store.cacheStrategy, AccessStrategy.Unoptimized)
let resolve clientId =
let streamName = streamName clientId
Equinox.Stream(Log.log, resolver.Resolve streamName, maxAttempts = 3)

let service =
Favorites.Service(FavoritesCategory.resolve)
let context = Store.createContext ()
let cache = Equinox.Cache(appName, 20)
let service = Favorites.Cosmos.create (context, cache)

let client = "ClientJ"

service.Favorite(client, "a") |> Async.RunSynchronously
service.Favorite(client, "b") |> Async.RunSynchronously
service.List(client) |> Async.RunSynchronously

service.Unfavorite(client, "b") |> Async.RunSynchronously
service.List(client) |> Async.RunSynchronously

Log.dumpMetrics ()
Log.dumpMetrics ()

(* EXAMPLE OUTPUT
[13:48:33 INF] EqxCosmos Response 5/5 Backward 189ms i=0 rc=3.43
[13:48:33 INF] EqxCosmos QueryB Favorites-ClientJ v5 5/1 190ms rc=3.43
[13:48:33 DBG] No events generated
[13:48:33 INF] EqxCosmos Tip 200 90ms rc=1
[13:48:33 INF] EqxCosmos Response 0/0 Forward 179ms i=null rc=3.62
[13:48:33 INF] EqxCosmos QueryF Favorites-ClientJ v0 0/1 179ms rc=3.62
[13:48:33 INF] EqxCosmos Sync: Conflict writing ["Added"]
[13:48:33 INF] EqxCosmos Sync 1+0 90ms rc=5.4
[13:48:33 INF] EqxCosmos Tip 200 86ms rc=1
[13:48:33 INF] EqxCosmos Response 5/5 Forward 184ms i=0 rc=4.37
[13:48:33 INF] EqxCosmos QueryF Favorites-ClientJ v5 5/1 185ms rc=4.37
[13:48:33 DBG] Resyncing and retrying
[13:48:33 INF] EqxCosmos Sync 1+0 96ms rc=37.67
[13:48:34 INF] EqxCosmos Tip 302 90ms rc=1
[13:48:34 INF] EqxCosmos Tip 302 92ms rc=1
[13:48:34 INF] EqxCosmos Sync 1+0 96ms rc=37.33
[13:48:34 INF] EqxCosmos Tip 302 87ms rc=1
[13:48:34 INF] Read: 8 requests costing 16 RU (average: 2.05); Average latency: 125ms
[13:48:34 INF] Write: 3 requests costing 80 RU (average: 26.80); Average latency: 94ms
[13:48:34 INF] TOTAL: 11 requests costing 97 RU (average: 8.80); Average latency: 116ms
[13:48:34 INF] rps 2 = ~21 RU *)

0 comments on commit 338012e

Please sign in to comment.