diff --git a/src/CosmoStore.CosmosDb/EventStore.fs b/src/CosmoStore.CosmosDb/EventStore.fs index 698bf09..7ea5e0a 100644 --- a/src/CosmoStore.CosmosDb/EventStore.fs +++ b/src/CosmoStore.CosmosDb/EventStore.fs @@ -5,7 +5,6 @@ open Microsoft.Azure.Documents open Microsoft.Azure.Documents.Client open FSharp.Control.Tasks.V2 open CosmoStore -open CosmoStore.Helper open System.Reflection open System.IO open CosmoStore.CosmosDb @@ -77,7 +76,7 @@ let private appendEvents getOpts (client:DocumentClient) (storedProcUri:Uri) (st return resp.Response |> List.map toPositionAndDate |> List.zip events - |> List.map (fun (evn,(pos,created)) -> eventWriteToEventRead streamId pos created evn) + |> List.map (fun (evn,(pos,created)) -> Conversion.eventWriteToEventRead streamId pos created evn) } let private streamsReadToQuery = function diff --git a/src/CosmoStore.InMemory/EventStore.fs b/src/CosmoStore.InMemory/EventStore.fs index f232177..72b1629 100644 --- a/src/CosmoStore.InMemory/EventStore.fs +++ b/src/CosmoStore.InMemory/EventStore.fs @@ -1,7 +1,6 @@ namespace CosmoStore.InMemory open System open CosmoStore -open CosmoStore.Helper open FSharp.Control.Tasks.V2 open System.Reactive.Linq open System.Reactive.Concurrency @@ -31,11 +30,11 @@ module EventStore = let nextPos = lastPosition + 1L - do validatePosition message.StreamId nextPos message.ExpectedPosition + do Validation.validatePosition message.StreamId nextPos message.ExpectedPosition let ops = message.EventWrites - |> List.mapi (fun i evn -> evn |> eventWriteToEventRead message.StreamId (nextPos + (int64 i)) DateTime.UtcNow) + |> List.mapi (fun i evn -> evn |> Conversion.eventWriteToEventRead message.StreamId (nextPos + (int64 i)) DateTime.UtcNow) let updatedStream = match metadataEntity with diff --git a/src/CosmoStore.Marten/EventStore.fs b/src/CosmoStore.Marten/EventStore.fs index 60baa68..9128bec 100644 --- a/src/CosmoStore.Marten/EventStore.fs +++ b/src/CosmoStore.Marten/EventStore.fs @@ -8,7 +8,6 @@ module EventStore = open Marten open System.Linq open CosmoStore - open CosmoStore.Helper open FSharp.Control.Tasks.V2.ContextInsensitive open System.Reactive.Linq open System.Reactive.Concurrency @@ -38,11 +37,11 @@ module EventStore = let nextPos = lastPosition + 1L - do validatePosition message.StreamId nextPos message.ExpectedPosition + do Validation.validatePosition message.StreamId nextPos message.ExpectedPosition let ops = message.EventWrites - |> List.mapi (fun i evn -> evn |> eventWriteToEventRead message.StreamId (nextPos + (int64 i)) DateTime.UtcNow) + |> List.mapi (fun i evn -> evn |> Conversion.eventWriteToEventRead message.StreamId (nextPos + (int64 i)) DateTime.UtcNow) let _ = match metadataEntity with diff --git a/src/CosmoStore.TableStorage/EventStore.fs b/src/CosmoStore.TableStorage/EventStore.fs index d9c1996..96f71c0 100644 --- a/src/CosmoStore.TableStorage/EventStore.fs +++ b/src/CosmoStore.TableStorage/EventStore.fs @@ -4,7 +4,6 @@ open System open Microsoft.WindowsAzure.Storage open Microsoft.WindowsAzure.Storage.Table open CosmoStore -open CosmoStore.Helper open FSharp.Control.Tasks.V2 open CosmoStore.TableStorage open System.Reactive.Linq @@ -36,7 +35,7 @@ let private appendEvents (table:CloudTable) (streamId:string) (expectedPosition: } let nextPos = lastPosition + 1L - do validatePosition streamId nextPos expectedPosition + do Validation.validatePosition streamId nextPos expectedPosition let batchOperation = TableBatchOperation() diff --git a/src/CosmoStore/Conversion.fs b/src/CosmoStore/Conversion.fs new file mode 100644 index 0000000..464451c --- /dev/null +++ b/src/CosmoStore/Conversion.fs @@ -0,0 +1,13 @@ +module CosmoStore.Conversion + +let eventWriteToEventRead streamId position createdUtc (x:EventWrite) = { + Id = x.Id + CorrelationId = x.CorrelationId + CausationId = x.CausationId + StreamId = streamId + Position = position + Name = x.Name + Data = x.Data + Metadata = x.Metadata + CreatedUtc = createdUtc +} \ No newline at end of file diff --git a/src/CosmoStore/CosmoStore.fs b/src/CosmoStore/CosmoStore.fs index 82819f0..bca3bba 100644 --- a/src/CosmoStore/CosmoStore.fs +++ b/src/CosmoStore/CosmoStore.fs @@ -57,29 +57,4 @@ type EventStore = { GetStreams : StreamsReadFilter -> Task GetStream : string -> Task EventAppended : IObservable -} - -module Helper = - let validatePosition streamId (nextPos: int64) = function - | ExpectedPosition.Any -> () - | ExpectedPosition.NoStream -> - if nextPos > 1L then - failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L) - | ExpectedPosition.Exact expectedPos -> - if nextPos <> expectedPos then - failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos - - - let checkNull a = obj.ReferenceEquals(a, null) - - let eventWriteToEventRead streamId position createdUtc (x: EventWrite) = { - Id = x.Id - CorrelationId = x.CorrelationId - CausationId = x.CausationId - StreamId = streamId - Position = position - Name = x.Name - Data = x.Data - Metadata = x.Metadata - CreatedUtc = createdUtc - } \ No newline at end of file +} \ No newline at end of file diff --git a/src/CosmoStore/CosmoStore.fsproj b/src/CosmoStore/CosmoStore.fsproj index ca8cce9..8003df2 100644 --- a/src/CosmoStore/CosmoStore.fsproj +++ b/src/CosmoStore/CosmoStore.fsproj @@ -6,6 +6,8 @@ + + \ No newline at end of file diff --git a/src/CosmoStore/Validation.fs b/src/CosmoStore/Validation.fs new file mode 100644 index 0000000..55f251a --- /dev/null +++ b/src/CosmoStore/Validation.fs @@ -0,0 +1,10 @@ +module CosmoStore.Validation + +let validatePosition streamId (nextPos:int64) = function + | ExpectedPosition.Any -> () + | ExpectedPosition.NoStream -> + if nextPos > 1L then + failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L) + | ExpectedPosition.Exact expectedPos -> + if nextPos <> expectedPos then + failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos diff --git a/tests/CosmoStore.CosmosDb.Tests/Program.fs b/tests/CosmoStore.CosmosDb.Tests/Program.fs index 70eda13..8135459 100644 --- a/tests/CosmoStore.CosmosDb.Tests/Program.fs +++ b/tests/CosmoStore.CosmosDb.Tests/Program.fs @@ -11,7 +11,7 @@ let private config = CosmoStore.CosmosDb.Configuration.CreateDefault (Uri "https://localhost:8081") "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==" - |> fun cfg -> { cfg with DatabaseName = "CosmosStoreTests"; Throughput = 10000 } + |> fun cfg -> { cfg with DatabaseName = "CosmoStoreTests"; Throughput = 10000 } let private getCleanEventStore() = let client = new DocumentClient(config.ServiceEndpoint, config.AuthKey) diff --git a/tests/CosmoStore.TableStorage.Tests/Program.fs b/tests/CosmoStore.TableStorage.Tests/Program.fs index 2c09ffd..f5f7266 100644 --- a/tests/CosmoStore.TableStorage.Tests/Program.fs +++ b/tests/CosmoStore.TableStorage.Tests/Program.fs @@ -7,7 +7,7 @@ open CosmoStore.Tests open CosmoStore.TableStorage open Microsoft.WindowsAzure.Storage -let private tableName = "CosmosStoreTests" +let private tableName = "CosmoStoreTests" let private conf = Configuration.CreateDefaultForLocalEmulator() |> fun cfg -> { cfg with TableName = tableName } diff --git a/tests/CosmoStore.Tests/BasicTests.fs b/tests/CosmoStore.Tests/BasicTests.fs index aa8a327..4126b10 100644 --- a/tests/CosmoStore.Tests/BasicTests.fs +++ b/tests/CosmoStore.Tests/BasicTests.fs @@ -12,23 +12,23 @@ let private withCorrelationId i (e:EventWrite) = { e with CorrelationId = Some i let eventsTests (cfg:TestConfiguration) = testList "Events" [ - testTask "Append events parallel" { - let streamId = cfg.GetStreamId() + // testTask "Append events parallel" { + // let streamId = cfg.GetStreamId() - let storeEvent = async { - return! - [1..10] - |> List.map cfg.GetEvent - |> cfg.Store.AppendEvents streamId Any - |> Async.AwaitTask - } + // let storeEvent = async { + // return! + // [1..10] + // |> List.map cfg.GetEvent + // |> cfg.Store.AppendEvents streamId Any + // |> Async.AwaitTask + // } - [1..10] - |> List.map (fun _ -> storeEvent) - |> Async.Parallel - |> Async.RunSynchronously - |> ignore - } + // [1..10] + // |> List.map (fun _ -> storeEvent) + // |> Async.Parallel + // |> Async.RunSynchronously + // |> ignore + // } // testTask "Store same event twice" { // let streamId = cfg.GetStreamId()