Skip to content

Commit

Permalink
Add DynamoDB (#321)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored May 13, 2022
1 parent fe4abb4 commit a98aa4c
Show file tree
Hide file tree
Showing 26 changed files with 2,156 additions and 64 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

.DS_Store

# DynamoDb local simulator data directory
docker-dynamodblocal-data/

# ephemeral EventStoreDB certs created via `docker compose up`
certs/

Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `Equinox`: `Decider.Transact(interpret : 'state -> Async<'event list>)` [#314](https://github.com/jet/equinox/pull/314)
- `CosmosStore.Prometheus`: Add `rut` tag to enable filtering/grouping by Read vs Write activity as per `DynamoDB` [#321](https://github.com/jet/equinox/pull/321)
- `DynamoDb`/`DynamoDb.Prometheus`: Implements the majority of the `CosmosStore` functionality via `FSharp.AWS.DynamoDB` [#321](https://github.com/jet/equinox/pull/321)
- `EventStoreDb`: As per `EventStore` module, but using the modern `EventStore.Client.Grpc.Streams` client [#196](https://github.com/jet/equinox/pull/196)

### Changed
Expand Down
18 changes: 18 additions & 0 deletions Equinox.sln
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.EventStoreDb", "src
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.EventStoreDb.Integration", "tests\Equinox.EventStoreDb.Integration\Equinox.EventStoreDb.Integration.fsproj", "{BA63048B-3CA3-448D-A4CD-0C772D57B6F8}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore", "src\Equinox.DynamoStore\Equinox.DynamoStore.fsproj", "{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Integration", "tests\Equinox.DynamoStore.Integration\Equinox.DynamoStore.Integration.fsproj", "{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore.Prometheus", "src\Equinox.DynamoStore.Prometheus\Equinox.DynamoStore.Prometheus.fsproj", "{A9AF41B3-AB28-4296-B4A4-B90DA7821476}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -209,6 +215,18 @@ Global
{BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Release|Any CPU.Build.0 = Release|Any CPU
{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E04E86B4-4E35-4AC9-8D8F-B01297484FC1}.Release|Any CPU.Build.0 = Release|Any CPU
{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2C8FCD63-4A3C-4EA6-88E0-E0F287B0F47A}.Release|Any CPU.Build.0 = Release|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A9AF41B3-AB28-4296-B4A4-B90DA7821476}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
76 changes: 70 additions & 6 deletions README.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
<Exec Command="dotnet pack src/Equinox.Core $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.CosmosStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.CosmosStore.Prometheus $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.DynamoStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.DynamoStore.Prometheus $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.EventStore $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.EventStoreDb $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Equinox.MemoryStore $(Cfg) $(PackOptions)" />
Expand Down
23 changes: 23 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,29 @@ services:
clusternetwork:
ipv4_address: 172.30.240.13

dynamodb-local:
image: amazon/dynamodb-local
container_name: dynamodb-local
hostname: dynamodb-local
restart: always
volumes:
- ./docker-dynamodblocal-data:/home/dynamodblocal/data
ports:
- 8000:8000
command: "-jar DynamoDBLocal.jar -sharedDb -dbPath /home/dynamodblocal/data/"

dynamodb-admin:
image: aaronshaf/dynamodb-admin
ports:
- "8001:8001"
environment:
DYNAMO_ENDPOINT: "http://dynamodb-local:8000"
AWS_REGION: "us-west-2"
AWS_ACCESS_KEY_ID: local
AWS_SECRET_ACCESS_KEY: local
depends_on:
- dynamodb-local

networks:
clusternetwork:
name: eventstoredb.local
Expand Down
3 changes: 2 additions & 1 deletion samples/Infrastructure/Infrastructure.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<ProjectReference Include="..\..\src\Equinox.SqlStreamStore.MySql\Equinox.SqlStreamStore.MySql.fsproj" />
<ProjectReference Include="..\..\src\Equinox.SqlStreamStore.Postgres\Equinox.SqlStreamStore.Postgres.fsproj" />
<ProjectReference Include="..\..\src\Equinox.CosmosStore\Equinox.CosmosStore.fsproj" />
<ProjectReference Include="..\..\src\Equinox.DynamoStore\Equinox.DynamoStore.fsproj" />
<ProjectReference Include="..\..\src\Equinox.EventStoreDb\Equinox.EventStoreDb.fsproj" />
<ProjectReference Include="..\..\src\Equinox.MemoryStore\Equinox.MemoryStore.fsproj" />
<ProjectReference Include="..\Store\Domain\Domain.fsproj" />
Expand All @@ -25,7 +26,7 @@
<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="FSharp.Core" Version="4.5.4" />
<PackageReference Include="FSharp.Core" Version="4.7.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="5.1.1" />
Expand Down
3 changes: 3 additions & 0 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type StreamResolver(storage) =
| Storage.StorageConfig.Cosmos (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized
Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy).Resolve
| Storage.StorageConfig.Dynamo (store, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.DynamoStore.AccessStrategy.Snapshot snapshot else Equinox.DynamoStore.AccessStrategy.Unoptimized
Equinox.DynamoStore.DynamoStoreCategory<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStoreDb.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Expand Down
64 changes: 64 additions & 0 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type StorageConfig =
// For MemoryStore, we keep the events as UTF8 arrays - we could use FsCodec.Codec.Box to remove the JSON encoding, which would improve perf but can conceal problems
| Memory of Equinox.MemoryStore.VolatileStore<ReadOnlyMemory<byte>>
| Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.CosmosStore.CachingStrategy * unfolds: bool
| Dynamo of Equinox.DynamoStore.DynamoStoreContext * Equinox.DynamoStore.CachingStrategy * unfolds: bool
| Es of Equinox.EventStoreDb.EventStoreContext * Equinox.EventStoreDb.CachingStrategy option * unfolds: bool
| Sql of Equinox.SqlStreamStore.SqlStreamStoreContext * Equinox.SqlStreamStore.CachingStrategy option * unfolds: bool

Expand Down Expand Up @@ -125,6 +126,69 @@ module Cosmos =
let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
StorageConfig.Cosmos (context, cacheStrategy, unfolds)

module Dynamo =

open Equinox.DynamoStore
let [<Literal>] SERVICE_URL = "EQUINOX_DYNAMO_SERVICE_URL"
let [<Literal>] ACCESS_KEY = "EQUINOX_DYNAMO_ACCESS_KEY_ID"
let [<Literal>] SECRET_KEY = "EQUINOX_DYNAMO_SECRET_ACCESS_KEY"
let [<Literal>] TABLE = "EQUINOX_DYNAMO_TABLE"
type [<NoEquality; NoComparison>] Arguments =
| [<AltCommandLine "-V">] VerboseStore
| [<AltCommandLine "-s">] ServiceUrl of string
| [<AltCommandLine "-sa">] AccessKey of string
| [<AltCommandLine "-ss">] SecretKey of string
| [<AltCommandLine "-t">] Table of string
| [<AltCommandLine "-ta">] ArchiveTable of string
| [<AltCommandLine "-r">] Retries of int
| [<AltCommandLine "-rt">] RetriesTimeoutS of float
| [<AltCommandLine "-tb">] TipMaxBytes of int
| [<AltCommandLine "-te">] TipMaxEvents of int
| [<AltCommandLine "-b">] QueryMaxItems of int
interface IArgParserTemplate with
member a.Usage = a |> function
| VerboseStore -> "Include low level Store logging."
| ServiceUrl _ -> "specify a server endpoint for a Dynamo account. (optional if environment variable " + SERVICE_URL + " specified)"
| AccessKey _ -> "specify an access key id for a Dynamo account. (optional if environment variable " + ACCESS_KEY + " specified)"
| SecretKey _ -> "specify a secret access key for a Dynamo account. (optional if environment variable " + SECRET_KEY + " specified)"
| Table _ -> "specify a table name for the primary store. (optional if environment variable " + TABLE + " specified)"
| ArchiveTable _ -> "specify a table name for the Archive. Default: Do not attempt to look in an Archive store as a Fallback to locate pruned events."
| Retries _ -> "specify operation retries (default: 1)."
| RetriesTimeoutS _ -> "specify max wait-time including retries in seconds (default: 5)"
| TipMaxBytes _ -> "specify maximum number of bytes to hold in Tip before calving off to a frozen Batch. Default: 32K"
| TipMaxEvents _ -> "specify maximum number of events to hold in Tip before calving off to a frozen Batch. Default: limited by Max Bytes"
| QueryMaxItems _ -> "specify maximum number of batches of events to retrieve in per query response. Default: 10"
type Info(args : ParseResults<Arguments>) =
let serviceUrl = args.TryGetResult ServiceUrl |> defaultWithEnvVar SERVICE_URL "ServiceUrl"
let accessKey = args.TryGetResult AccessKey |> defaultWithEnvVar ACCESS_KEY "AccessKey"
let secretKey = args.TryGetResult SecretKey |> defaultWithEnvVar SECRET_KEY "SecretKey"
let retries = args.GetResult(Retries, 1)
let timeout = args.GetResult(RetriesTimeoutS, 5.) |> TimeSpan.FromSeconds
member val Connector = DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout)

member val Table = args.TryGetResult Table |> defaultWithEnvVar TABLE "Table"
member val ArchiveTable = args.TryGetResult ArchiveTable

member x.TipMaxEvents = args.TryGetResult TipMaxEvents
member x.TipMaxBytes = args.GetResult(TipMaxBytes, 32 * 1024)
member x.QueryMaxItems = args.GetResult(QueryMaxItems, 10)

let logTable (log: ILogger) endpoint role table =
log.Information("DynamoDB {name:l} {endpoint} Table {table}", role, endpoint, table)
let createStoreClient (log : ILogger) (a : Info) =
let client = a.Connector.CreateClient()
let storeClient = DynamoStoreClient(client, a.Table, ?archiveTableName = a.ArchiveTable)
logTable log a.Connector.Endpoint "Primary" a.Table
match a.ArchiveTable with None -> () | Some at -> logTable log a.Connector.Endpoint "Archive" at
storeClient
let config (log : ILogger) (cache, unfolds) (a : Info) =
let storeClient = createStoreClient log a
log.Information("DynamoStore Max Events in Tip: {maxTipBytes}b {maxTipEvents}e Query Limit: {queryMaxItems} items",
a.TipMaxBytes, a.TipMaxEvents, a.QueryMaxItems)
let context = DynamoStoreContext(storeClient, maxBytes = a.TipMaxBytes, queryMaxItems = a.QueryMaxItems, ?tipMaxEvents = a.TipMaxEvents)
let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
StorageConfig.Dynamo (context, cacheStrategy, unfolds)

/// To establish a local node to run the tests against, follow https://developers.eventstore.com/server/v21.10/installation.html#use-docker-compose
/// and/or do `docker compose up` in github.com/jet/equinox
module EventStore =
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/AsAt.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// - the same general point applies to over-using querying of streams for read purposes as we do here;
// applying CQRS principles can often lead to a better model regardless of raw necessity

#if !LOCAL
#if LOCAL
// Compile Tutorial.fsproj by either a) right-clicking or b) typing
// dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter
#if VISUALSTUDIO
Expand Down
2 changes: 1 addition & 1 deletion samples/Tutorial/FulfilmentCenter.fsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if !LOCAL
#if LOCAL
#I "bin/Debug/net6.0/"
#r "Serilog.dll"
#r "Serilog.Sinks.Console.dll"
Expand Down
6 changes: 6 additions & 0 deletions samples/Web/Startup.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Arguments =
| [<AltCommandLine "-U">] Unfolds
| [<CliPrefix(CliPrefix.None); Last>] Memory of ParseResults<Storage.MemoryStore.Arguments>
| [<CliPrefix(CliPrefix.None); Last>] Cosmos of ParseResults<Storage.Cosmos.Arguments>
| [<CliPrefix(CliPrefix.None); Last>] Dynamo of ParseResults<Storage.Dynamo.Arguments>
| [<CliPrefix(CliPrefix.None); Last>] Es of ParseResults<Storage.EventStore.Arguments>
| [<CliPrefix(CliPrefix.None); Last; AltCommandLine "ms">] MsSql of ParseResults<Storage.Sql.Ms.Arguments>
| [<CliPrefix(CliPrefix.None); Last; AltCommandLine "my">] MySql of ParseResults<Storage.Sql.My.Arguments>
Expand All @@ -27,6 +28,7 @@ type Arguments =
| Cached -> "employ a 50MB cache."
| Unfolds -> "employ a store-appropriate Rolling Snapshots and/or Unfolding strategy."
| Cosmos _ -> "specify storage in CosmosDB (--help for options)."
| Dynamo _ -> "specify storage in DynamoDB (--help for options)."
| Es _ -> "specify storage in EventStore (--help for options)."
| Memory _ -> "specify In-Memory Volatile Store (Default store)."
| MsSql _ -> "specify storage in Sql Server (--help for options)."
Expand Down Expand Up @@ -68,6 +70,10 @@ type Startup() =
let storeLog = createStoreLog <| sargs.Contains Storage.Cosmos.Arguments.VerboseStore
log.Information("CosmosDB Storage options: {options:l}", options)
Storage.Cosmos.config log (cache, unfolds) (Storage.Cosmos.Info sargs), storeLog
| Some (Dynamo sargs) ->
let storeLog = createStoreLog <| sargs.Contains Storage.Dynamo.Arguments.VerboseStore
log.Information("DynamoDB Storage options: {options:l}", options)
Storage.Dynamo.config log (cache, unfolds) (Storage.Dynamo.Info sargs), storeLog
| Some (Es sargs) ->
let storeLog = createStoreLog <| sargs.Contains Storage.EventStore.Arguments.VerboseStore
log.Information("EventStoreDB Storage options: {options:l}", options)
Expand Down
Loading

0 comments on commit a98aa4c

Please sign in to comment.