From 2e0ef27318ffe7e9d226be8237a946174bd8e1f4 Mon Sep 17 00:00:00 2001 From: Kunjan Dalal Date: Fri, 12 Apr 2019 13:07:54 +0100 Subject: [PATCH] Code refactored to reuse the functions (#21) --- src/CosmoStore.CosmosDb/Conversion.fs | 12 ---------- src/CosmoStore.CosmosDb/EventStore.fs | 3 ++- src/CosmoStore.InMemory/EventStore.fs | 24 +------------------- src/CosmoStore.Marten/EventStore.fs | 26 +--------------------- src/CosmoStore.TableStorage/EventStore.fs | 9 +------- src/CosmoStore/CosmoStore.fs | 27 ++++++++++++++++++++++- 6 files changed, 31 insertions(+), 70 deletions(-) diff --git a/src/CosmoStore.CosmosDb/Conversion.fs b/src/CosmoStore.CosmosDb/Conversion.fs index c6b0ed0..c76f752 100644 --- a/src/CosmoStore.CosmosDb/Conversion.fs +++ b/src/CosmoStore.CosmosDb/Conversion.fs @@ -5,18 +5,6 @@ open CosmoStore open Microsoft.Azure.Documents open Newtonsoft.Json.Linq -let eventWriteToEventRead streamId position createdUtc (x:EventWrite) = { - Id = x.Id - CorrelationId = x.CorrelationId - CausationId = x.CausationId - StreamId = streamId - Position = position - Name = x.Name - Data = x.Data - Metadata = x.Metadata - CreatedUtc = createdUtc -} - let documentToStream (x:Document) = { Id = x.GetPropertyValue("streamId") LastUpdatedUtc = x.GetPropertyValue("lastUpdatedUtc") diff --git a/src/CosmoStore.CosmosDb/EventStore.fs b/src/CosmoStore.CosmosDb/EventStore.fs index 7ea5e0a..698bf09 100644 --- a/src/CosmoStore.CosmosDb/EventStore.fs +++ b/src/CosmoStore.CosmosDb/EventStore.fs @@ -5,6 +5,7 @@ open Microsoft.Azure.Documents open Microsoft.Azure.Documents.Client open FSharp.Control.Tasks.V2 open CosmoStore +open CosmoStore.Helper open System.Reflection open System.IO open CosmoStore.CosmosDb @@ -76,7 +77,7 @@ let private appendEvents getOpts (client:DocumentClient) (storedProcUri:Uri) (st return resp.Response |> List.map toPositionAndDate |> List.zip events - |> List.map (fun (evn,(pos,created)) -> Conversion.eventWriteToEventRead streamId pos created evn) + |> List.map (fun (evn,(pos,created)) -> eventWriteToEventRead streamId pos created evn) } let private streamsReadToQuery = function diff --git a/src/CosmoStore.InMemory/EventStore.fs b/src/CosmoStore.InMemory/EventStore.fs index dc63268..f232177 100644 --- a/src/CosmoStore.InMemory/EventStore.fs +++ b/src/CosmoStore.InMemory/EventStore.fs @@ -1,35 +1,13 @@ namespace CosmoStore.InMemory open System open CosmoStore +open CosmoStore.Helper open FSharp.Control.Tasks.V2 open System.Reactive.Linq open System.Reactive.Concurrency module EventStore = - let private validatePosition streamId (nextPos: int64) = function - | ExpectedPosition.Any -> () - | ExpectedPosition.NoStream -> - if nextPos > 1L then - failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L) - | ExpectedPosition.Exact expectedPos -> - if nextPos <> expectedPos then - failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos - - - let checkNull a = obj.ReferenceEquals(a, null) - - let eventWriteToEventRead streamId position createdUtc (x: EventWrite) = { - Id = x.Id - CorrelationId = x.CorrelationId - CausationId = x.CausationId - StreamId = streamId - Position = position - Name = x.Name - Data = x.Data - Metadata = x.Metadata - CreatedUtc = createdUtc - } type StreamData = { StreamStore: StreamStoreType diff --git a/src/CosmoStore.Marten/EventStore.fs b/src/CosmoStore.Marten/EventStore.fs index 9700e97..60baa68 100644 --- a/src/CosmoStore.Marten/EventStore.fs +++ b/src/CosmoStore.Marten/EventStore.fs @@ -8,35 +8,11 @@ module EventStore = open Marten open System.Linq open CosmoStore + open CosmoStore.Helper open FSharp.Control.Tasks.V2.ContextInsensitive open System.Reactive.Linq open System.Reactive.Concurrency - - let private validatePosition streamId (nextPos: int64) = function - | ExpectedPosition.Any -> () - | ExpectedPosition.NoStream -> - if nextPos > 1L then - failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L) - | ExpectedPosition.Exact expectedPos -> - if nextPos <> expectedPos then - failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos - - - let checkNull a = obj.ReferenceEquals(a, null) - - let eventWriteToEventRead streamId position createdUtc (x: EventWrite) = { - Id = x.Id - CorrelationId = x.CorrelationId - CausationId = x.CausationId - StreamId = streamId - Position = position - Name = x.Name - Data = x.Data - Metadata = x.Metadata - CreatedUtc = createdUtc - } - type StreamData = { Store: IDocumentStore StreamId: string diff --git a/src/CosmoStore.TableStorage/EventStore.fs b/src/CosmoStore.TableStorage/EventStore.fs index 40a137c..d9c1996 100644 --- a/src/CosmoStore.TableStorage/EventStore.fs +++ b/src/CosmoStore.TableStorage/EventStore.fs @@ -4,6 +4,7 @@ open System open Microsoft.WindowsAzure.Storage open Microsoft.WindowsAzure.Storage.Table open CosmoStore +open CosmoStore.Helper open FSharp.Control.Tasks.V2 open CosmoStore.TableStorage open System.Reactive.Linq @@ -20,14 +21,6 @@ let private tryGetStreamMetadata (table:CloudTable) (streamId:string) = return (entity, entity |> Conversion.entityToStream) |> Some } -let private validatePosition streamId (nextPos:int64) = function - | ExpectedPosition.Any -> () - | ExpectedPosition.NoStream -> - if nextPos > 1L then - failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L) - | ExpectedPosition.Exact expectedPos -> - if nextPos <> expectedPos then - failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos let private appendEvents (table:CloudTable) (streamId:string) (expectedPosition:ExpectedPosition) (events:EventWrite list) = diff --git a/src/CosmoStore/CosmoStore.fs b/src/CosmoStore/CosmoStore.fs index bca3bba..82819f0 100644 --- a/src/CosmoStore/CosmoStore.fs +++ b/src/CosmoStore/CosmoStore.fs @@ -57,4 +57,29 @@ type EventStore = { GetStreams : StreamsReadFilter -> Task GetStream : string -> Task EventAppended : IObservable -} \ No newline at end of file +} + +module Helper = + let validatePosition streamId (nextPos: int64) = function + | ExpectedPosition.Any -> () + | ExpectedPosition.NoStream -> + if nextPos > 1L then + failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L) + | ExpectedPosition.Exact expectedPos -> + if nextPos <> expectedPos then + failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos + + + let checkNull a = obj.ReferenceEquals(a, null) + + let eventWriteToEventRead streamId position createdUtc (x: EventWrite) = { + Id = x.Id + CorrelationId = x.CorrelationId + CausationId = x.CausationId + StreamId = streamId + Position = position + Name = x.Name + Data = x.Data + Metadata = x.Metadata + CreatedUtc = createdUtc + } \ No newline at end of file