Skip to content

Commit

Permalink
Code refactored to reuse the functions (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunjee17 authored and Dzoukr committed Apr 12, 2019
1 parent cc4dd75 commit 2e0ef27
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 70 deletions.
12 changes: 0 additions & 12 deletions src/CosmoStore.CosmosDb/Conversion.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,6 @@ open CosmoStore
open Microsoft.Azure.Documents
open Newtonsoft.Json.Linq

let eventWriteToEventRead streamId position createdUtc (x:EventWrite) = {
Id = x.Id
CorrelationId = x.CorrelationId
CausationId = x.CausationId
StreamId = streamId
Position = position
Name = x.Name
Data = x.Data
Metadata = x.Metadata
CreatedUtc = createdUtc
}

let documentToStream (x:Document) = {
Id = x.GetPropertyValue<string>("streamId")
LastUpdatedUtc = x.GetPropertyValue<DateTime>("lastUpdatedUtc")
Expand Down
3 changes: 2 additions & 1 deletion src/CosmoStore.CosmosDb/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ open Microsoft.Azure.Documents
open Microsoft.Azure.Documents.Client
open FSharp.Control.Tasks.V2
open CosmoStore
open CosmoStore.Helper
open System.Reflection
open System.IO
open CosmoStore.CosmosDb
Expand Down Expand Up @@ -76,7 +77,7 @@ let private appendEvents getOpts (client:DocumentClient) (storedProcUri:Uri) (st
return resp.Response
|> List.map toPositionAndDate
|> List.zip events
|> List.map (fun (evn,(pos,created)) -> Conversion.eventWriteToEventRead streamId pos created evn)
|> List.map (fun (evn,(pos,created)) -> eventWriteToEventRead streamId pos created evn)
}

let private streamsReadToQuery = function
Expand Down
24 changes: 1 addition & 23 deletions src/CosmoStore.InMemory/EventStore.fs
Original file line number Diff line number Diff line change
@@ -1,35 +1,13 @@
namespace CosmoStore.InMemory
open System
open CosmoStore
open CosmoStore.Helper
open FSharp.Control.Tasks.V2
open System.Reactive.Linq
open System.Reactive.Concurrency


module EventStore =
let private validatePosition streamId (nextPos: int64) = function
| ExpectedPosition.Any -> ()
| ExpectedPosition.NoStream ->
if nextPos > 1L then
failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L)
| ExpectedPosition.Exact expectedPos ->
if nextPos <> expectedPos then
failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos


let checkNull a = obj.ReferenceEquals(a, null)

let eventWriteToEventRead streamId position createdUtc (x: EventWrite) = {
Id = x.Id
CorrelationId = x.CorrelationId
CausationId = x.CausationId
StreamId = streamId
Position = position
Name = x.Name
Data = x.Data
Metadata = x.Metadata
CreatedUtc = createdUtc
}

type StreamData = {
StreamStore: StreamStoreType
Expand Down
26 changes: 1 addition & 25 deletions src/CosmoStore.Marten/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,11 @@ module EventStore =
open Marten
open System.Linq
open CosmoStore
open CosmoStore.Helper
open FSharp.Control.Tasks.V2.ContextInsensitive
open System.Reactive.Linq
open System.Reactive.Concurrency


let private validatePosition streamId (nextPos: int64) = function
| ExpectedPosition.Any -> ()
| ExpectedPosition.NoStream ->
if nextPos > 1L then
failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L)
| ExpectedPosition.Exact expectedPos ->
if nextPos <> expectedPos then
failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos


let checkNull a = obj.ReferenceEquals(a, null)

let eventWriteToEventRead streamId position createdUtc (x: EventWrite) = {
Id = x.Id
CorrelationId = x.CorrelationId
CausationId = x.CausationId
StreamId = streamId
Position = position
Name = x.Name
Data = x.Data
Metadata = x.Metadata
CreatedUtc = createdUtc
}

type StreamData = {
Store: IDocumentStore
StreamId: string
Expand Down
9 changes: 1 addition & 8 deletions src/CosmoStore.TableStorage/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ open System
open Microsoft.WindowsAzure.Storage
open Microsoft.WindowsAzure.Storage.Table
open CosmoStore
open CosmoStore.Helper
open FSharp.Control.Tasks.V2
open CosmoStore.TableStorage
open System.Reactive.Linq
Expand All @@ -20,14 +21,6 @@ let private tryGetStreamMetadata (table:CloudTable) (streamId:string) =
return (entity, entity |> Conversion.entityToStream) |> Some
}

let private validatePosition streamId (nextPos:int64) = function
| ExpectedPosition.Any -> ()
| ExpectedPosition.NoStream ->
if nextPos > 1L then
failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L)
| ExpectedPosition.Exact expectedPos ->
if nextPos <> expectedPos then
failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos

let private appendEvents (table:CloudTable) (streamId:string) (expectedPosition:ExpectedPosition) (events:EventWrite list) =

Expand Down
27 changes: 26 additions & 1 deletion src/CosmoStore/CosmoStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,29 @@ type EventStore = {
GetStreams : StreamsReadFilter -> Task<Stream list>
GetStream : string -> Task<Stream>
EventAppended : IObservable<EventRead>
}
}

module Helper =
let validatePosition streamId (nextPos: int64) = function
| ExpectedPosition.Any -> ()
| ExpectedPosition.NoStream ->
if nextPos > 1L then
failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L)
| ExpectedPosition.Exact expectedPos ->
if nextPos <> expectedPos then
failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos


let checkNull a = obj.ReferenceEquals(a, null)

let eventWriteToEventRead streamId position createdUtc (x: EventWrite) = {
Id = x.Id
CorrelationId = x.CorrelationId
CausationId = x.CausationId
StreamId = streamId
Position = position
Name = x.Name
Data = x.Data
Metadata = x.Metadata
CreatedUtc = createdUtc
}

0 comments on commit 2e0ef27

Please sign in to comment.