diff --git a/README.md b/README.md index 311fac735..1e6bb8b4e 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,11 @@ Some aspects of the implementation are distilled from [`Jet.com` systems dating - `MemoryStore`: In-memory store (volatile, for unit or integration test purposes). Fulfils the full contract Equinox imposes on a store, but without I/O costs [(it's ~100 LOC wrapping a `ConcurrentDictionary`)](https://github.com/jet/equinox/blob/master/src/Equinox.MemoryStore/MemoryStore.fs), and the ability to [take serialization/deserialization cost out of the picture](https://github.com/jet/FsCodec#boxcodec). - [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore): Bindings for the powerful and widely used SQL-backed Event Storage system. [See SqlStreamStore docs](https://sqlstreamstore.readthedocs.io/en/latest/#introduction). :pray: [@rajivhost](https://github.com/rajivhost) + +# TL;DR :fast_forward: + +- **Dev that wants a slab of code instead of a wall of text and will guess the rest** ? :point_right: <100 LOC end to end 'tutorial' using CosmosDB: https://github.com/jet/equinox/blob/master/samples/Tutorial/Cosmos.fsx#L36 + # Features - Designed not to invade application code; Domain tests can be written directly against your models without any need to involve or understand Equinox assemblies or constructs as part of writing those tests. diff --git a/samples/Tutorial/Cosmos.fsx b/samples/Tutorial/Cosmos.fsx index 36ca14d24..8c9b7942f 100644 --- a/samples/Tutorial/Cosmos.fsx +++ b/samples/Tutorial/Cosmos.fsx @@ -19,82 +19,91 @@ #r "Serilog.Sinks.Seq.dll" #r "Equinox.Cosmos.dll" -let Category = "Favorites" -let streamName clientId = FsCodec.StreamName.create Category clientId +module Log = + + open Serilog + open Serilog.Events + let verbose = true // false will remove lots of noise + let log = + let c = LoggerConfiguration() + let c = if verbose then c.MinimumLevel.Debug() else c + let c = c.WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump + let c = c.WriteTo.Seq("http://localhost:5341") // https://getseq.net + let c = c.WriteTo.Console(if verbose then LogEventLevel.Debug else LogEventLevel.Information) + c.CreateLogger() + let dumpMetrics () = Equinox.Cosmos.Store.Log.InternalMetrics.dump log module Favorites = - type Item = { sku : string } - type Event = - | Added of Item - | Removed of Item - interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() - type State = string list - let initial : State = [] - let evolve state = function - | Added {sku = sku } -> sku :: state - | Removed {sku = sku } -> state |> List.filter (fun x -> x <> sku) - let fold s xs = Seq.fold evolve s xs + let Category = "Favorites" + let streamName clientId = FsCodec.StreamName.create Category clientId + + module Events = + + type Item = { sku : string } + type Event = + | Added of Item + | Removed of Item + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() // Coming soon, replace Newtonsoft with SystemTextJson and works same + + module Fold = + + type State = string list + let initial : State = [] + let evolve state = function + | Events.Added {sku = sku } -> sku :: state + | Events.Removed {sku = sku } -> state |> List.filter (fun x -> x <> sku) + let fold s xs = Seq.fold evolve s xs type Command = | Add of string | Remove of string let interpret command state = match command with - | Add sku -> if state |> List.contains sku then [] else [Added {sku = sku}] - | Remove sku -> if state |> List.contains sku then [Removed {sku = sku}] else [] + | Add sku -> if state |> List.contains sku then [] else [ Events.Added {sku = sku}] + | Remove sku -> if state |> List.contains sku then [ Events.Removed {sku = sku}] else [] - type Service internal (resolve : string -> Equinox.Stream) = + type Service internal (resolve : string -> Equinox.Stream) = - let execute clientId command : Async = + member __.Favorite(clientId, sku) = let stream = resolve clientId - stream.Transact(interpret command) - - member __.Favorite(clientId, sku) = execute clientId (Add sku) - member __.Unfavorite(clientId, skus) = execute clientId (Remove skus) + stream.Transact(interpret (Add sku)) + member __.Unfavorite(clientId, skus) = + let stream = resolve clientId + stream.Transact(interpret (Remove skus)) member __.List clientId: Async = let stream = resolve clientId stream.Query id -module Log = - open Serilog - open Serilog.Events - let verbose = true // false will remove lots of noise - let log = - let c = LoggerConfiguration() - let c = if verbose then c.MinimumLevel.Debug() else c - let c = c.WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump - let c = c.WriteTo.Seq("http://localhost:5341") // https://getseq.net - let c = c.WriteTo.Console(if verbose then LogEventLevel.Debug else LogEventLevel.Information) - c.CreateLogger() - let dumpMetrics () = Equinox.Cosmos.Store.Log.InternalMetrics.dump log + let create resolve = + let resolve clientId = Equinox.Stream(Log.log, resolve (streamName clientId), maxAttempts = 3) + Service(resolve) + + module Cosmos = + + open Equinox.Cosmos // Everything outside of this module is completely storage agnostic so can be unit tested simply and/or bound to any store + let accessStrategy = AccessStrategy.Unoptimized // Or Snapshot etc https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#access-strategies + let create (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + create resolver.Resolve let [] appName = "equinox-tutorial" -let cache = Equinox.Cache(appName, 20) -open Equinox.Cosmos -open System module Store = let read key = System.Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get - let connector = Connector(TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., log=Log.log) - let conn = connector.Connect(appName, Discovery.FromConnectionString (read "EQUINOX_COSMOS_CONNECTION")) |> Async.RunSynchronously - let gateway = Gateway(conn, BatchingPolicy()) + let connector = Equinox.Cosmos.Connector(System.TimeSpan.FromSeconds 5., 2, System.TimeSpan.FromSeconds 5., log=Log.log) + let conn = connector.Connect(appName, Equinox.Cosmos.Discovery.FromConnectionString (read "EQUINOX_COSMOS_CONNECTION")) |> Async.RunSynchronously + let createContext () = Equinox.Cosmos.Context(conn, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER") - let context = Context(gateway, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER") - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching - -module FavoritesCategory = - let resolver = Resolver(Store.context, Favorites.codec, Favorites.fold, Favorites.initial, Store.cacheStrategy, AccessStrategy.Unoptimized) - let resolve clientId = - let streamName = streamName clientId - Equinox.Stream(Log.log, resolver.Resolve streamName, maxAttempts = 3) - -let service = - Favorites.Service(FavoritesCategory.resolve) +let context = Store.createContext () +let cache = Equinox.Cache(appName, 20) +let service = Favorites.Cosmos.create (context, cache) let client = "ClientJ" + service.Favorite(client, "a") |> Async.RunSynchronously service.Favorite(client, "b") |> Async.RunSynchronously service.List(client) |> Async.RunSynchronously @@ -102,4 +111,28 @@ service.List(client) |> Async.RunSynchronously service.Unfavorite(client, "b") |> Async.RunSynchronously service.List(client) |> Async.RunSynchronously -Log.dumpMetrics () \ No newline at end of file +Log.dumpMetrics () + +(* EXAMPLE OUTPUT + +[13:48:33 INF] EqxCosmos Response 5/5 Backward 189ms i=0 rc=3.43 +[13:48:33 INF] EqxCosmos QueryB Favorites-ClientJ v5 5/1 190ms rc=3.43 +[13:48:33 DBG] No events generated +[13:48:33 INF] EqxCosmos Tip 200 90ms rc=1 +[13:48:33 INF] EqxCosmos Response 0/0 Forward 179ms i=null rc=3.62 +[13:48:33 INF] EqxCosmos QueryF Favorites-ClientJ v0 0/1 179ms rc=3.62 +[13:48:33 INF] EqxCosmos Sync: Conflict writing ["Added"] +[13:48:33 INF] EqxCosmos Sync 1+0 90ms rc=5.4 +[13:48:33 INF] EqxCosmos Tip 200 86ms rc=1 +[13:48:33 INF] EqxCosmos Response 5/5 Forward 184ms i=0 rc=4.37 +[13:48:33 INF] EqxCosmos QueryF Favorites-ClientJ v5 5/1 185ms rc=4.37 +[13:48:33 DBG] Resyncing and retrying +[13:48:33 INF] EqxCosmos Sync 1+0 96ms rc=37.67 +[13:48:34 INF] EqxCosmos Tip 302 90ms rc=1 +[13:48:34 INF] EqxCosmos Tip 302 92ms rc=1 +[13:48:34 INF] EqxCosmos Sync 1+0 96ms rc=37.33 +[13:48:34 INF] EqxCosmos Tip 302 87ms rc=1 +[13:48:34 INF] Read: 8 requests costing 16 RU (average: 2.05); Average latency: 125ms +[13:48:34 INF] Write: 3 requests costing 80 RU (average: 26.80); Average latency: 94ms +[13:48:34 INF] TOTAL: 11 requests costing 97 RU (average: 8.80); Average latency: 116ms +[13:48:34 INF] rps 2 = ~21 RU *)