Skip to content

Commit

Permalink
Refactored code moved into separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzoukr committed May 14, 2019
1 parent 2e0ef27 commit aa69c77
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 53 deletions.
3 changes: 1 addition & 2 deletions src/CosmoStore.CosmosDb/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ open Microsoft.Azure.Documents
open Microsoft.Azure.Documents.Client
open FSharp.Control.Tasks.V2
open CosmoStore
open CosmoStore.Helper
open System.Reflection
open System.IO
open CosmoStore.CosmosDb
Expand Down Expand Up @@ -77,7 +76,7 @@ let private appendEvents getOpts (client:DocumentClient) (storedProcUri:Uri) (st
return resp.Response
|> List.map toPositionAndDate
|> List.zip events
|> List.map (fun (evn,(pos,created)) -> eventWriteToEventRead streamId pos created evn)
|> List.map (fun (evn,(pos,created)) -> Conversion.eventWriteToEventRead streamId pos created evn)
}

let private streamsReadToQuery = function
Expand Down
5 changes: 2 additions & 3 deletions src/CosmoStore.InMemory/EventStore.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace CosmoStore.InMemory
open System
open CosmoStore
open CosmoStore.Helper
open FSharp.Control.Tasks.V2
open System.Reactive.Linq
open System.Reactive.Concurrency
Expand Down Expand Up @@ -31,11 +30,11 @@ module EventStore =

let nextPos = lastPosition + 1L

do validatePosition message.StreamId nextPos message.ExpectedPosition
do Validation.validatePosition message.StreamId nextPos message.ExpectedPosition

let ops =
message.EventWrites
|> List.mapi (fun i evn -> evn |> eventWriteToEventRead message.StreamId (nextPos + (int64 i)) DateTime.UtcNow)
|> List.mapi (fun i evn -> evn |> Conversion.eventWriteToEventRead message.StreamId (nextPos + (int64 i)) DateTime.UtcNow)

let updatedStream =
match metadataEntity with
Expand Down
5 changes: 2 additions & 3 deletions src/CosmoStore.Marten/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ module EventStore =
open Marten
open System.Linq
open CosmoStore
open CosmoStore.Helper
open FSharp.Control.Tasks.V2.ContextInsensitive
open System.Reactive.Linq
open System.Reactive.Concurrency
Expand Down Expand Up @@ -38,11 +37,11 @@ module EventStore =

let nextPos = lastPosition + 1L

do validatePosition message.StreamId nextPos message.ExpectedPosition
do Validation.validatePosition message.StreamId nextPos message.ExpectedPosition

let ops =
message.EventWrites
|> List.mapi (fun i evn -> evn |> eventWriteToEventRead message.StreamId (nextPos + (int64 i)) DateTime.UtcNow)
|> List.mapi (fun i evn -> evn |> Conversion.eventWriteToEventRead message.StreamId (nextPos + (int64 i)) DateTime.UtcNow)

let _ =
match metadataEntity with
Expand Down
3 changes: 1 addition & 2 deletions src/CosmoStore.TableStorage/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ open System
open Microsoft.WindowsAzure.Storage
open Microsoft.WindowsAzure.Storage.Table
open CosmoStore
open CosmoStore.Helper
open FSharp.Control.Tasks.V2
open CosmoStore.TableStorage
open System.Reactive.Linq
Expand Down Expand Up @@ -36,7 +35,7 @@ let private appendEvents (table:CloudTable) (streamId:string) (expectedPosition:
}

let nextPos = lastPosition + 1L
do validatePosition streamId nextPos expectedPosition
do Validation.validatePosition streamId nextPos expectedPosition

let batchOperation = TableBatchOperation()

Expand Down
13 changes: 13 additions & 0 deletions src/CosmoStore/Conversion.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module CosmoStore.Conversion

let eventWriteToEventRead streamId position createdUtc (x:EventWrite) = {
Id = x.Id
CorrelationId = x.CorrelationId
CausationId = x.CausationId
StreamId = streamId
Position = position
Name = x.Name
Data = x.Data
Metadata = x.Metadata
CreatedUtc = createdUtc
}
27 changes: 1 addition & 26 deletions src/CosmoStore/CosmoStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,4 @@ type EventStore = {
GetStreams : StreamsReadFilter -> Task<Stream list>
GetStream : string -> Task<Stream>
EventAppended : IObservable<EventRead>
}

module Helper =
let validatePosition streamId (nextPos: int64) = function
| ExpectedPosition.Any -> ()
| ExpectedPosition.NoStream ->
if nextPos > 1L then
failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L)
| ExpectedPosition.Exact expectedPos ->
if nextPos <> expectedPos then
failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos


let checkNull a = obj.ReferenceEquals(a, null)

let eventWriteToEventRead streamId position createdUtc (x: EventWrite) = {
Id = x.Id
CorrelationId = x.CorrelationId
CausationId = x.CausationId
StreamId = streamId
Position = position
Name = x.Name
Data = x.Data
Metadata = x.Metadata
CreatedUtc = createdUtc
}
}
2 changes: 2 additions & 0 deletions src/CosmoStore/CosmoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
</PropertyGroup>
<ItemGroup>
<Compile Include="CosmoStore.fs" />
<Compile Include="Conversion.fs" />
<Compile Include="Validation.fs" />
</ItemGroup>
<Import Project="..\..\.paket\Paket.Restore.targets" />
</Project>
10 changes: 10 additions & 0 deletions src/CosmoStore/Validation.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module CosmoStore.Validation

let validatePosition streamId (nextPos:int64) = function
| ExpectedPosition.Any -> ()
| ExpectedPosition.NoStream ->
if nextPos > 1L then
failwithf "ESERROR_POSITION_STREAMEXISTS: Stream '%s' was expected to be empty, but contains %i events" streamId (nextPos - 1L)
| ExpectedPosition.Exact expectedPos ->
if nextPos <> expectedPos then
failwithf "ESERROR_POSITION_POSITIONNOTMATCH: Stream '%s' was expected to have next position %i, but has %i" streamId expectedPos nextPos
2 changes: 1 addition & 1 deletion tests/CosmoStore.CosmosDb.Tests/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ let private config =
CosmoStore.CosmosDb.Configuration.CreateDefault
(Uri "https://localhost:8081")
"C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
|> fun cfg -> { cfg with DatabaseName = "CosmosStoreTests"; Throughput = 10000 }
|> fun cfg -> { cfg with DatabaseName = "CosmoStoreTests"; Throughput = 10000 }

let private getCleanEventStore() =
let client = new DocumentClient(config.ServiceEndpoint, config.AuthKey)
Expand Down
2 changes: 1 addition & 1 deletion tests/CosmoStore.TableStorage.Tests/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ open CosmoStore.Tests
open CosmoStore.TableStorage
open Microsoft.WindowsAzure.Storage

let private tableName = "CosmosStoreTests"
let private tableName = "CosmoStoreTests"

let private conf = Configuration.CreateDefaultForLocalEmulator() |> fun cfg -> { cfg with TableName = tableName }

Expand Down
30 changes: 15 additions & 15 deletions tests/CosmoStore.Tests/BasicTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,23 @@ let private withCorrelationId i (e:EventWrite) = { e with CorrelationId = Some i
let eventsTests (cfg:TestConfiguration) =
testList "Events" [

testTask "Append events parallel" {
let streamId = cfg.GetStreamId()
// testTask "Append events parallel" {
// let streamId = cfg.GetStreamId()

let storeEvent = async {
return!
[1..10]
|> List.map cfg.GetEvent
|> cfg.Store.AppendEvents streamId Any
|> Async.AwaitTask
}
// let storeEvent = async {
// return!
// [1..10]
// |> List.map cfg.GetEvent
// |> cfg.Store.AppendEvents streamId Any
// |> Async.AwaitTask
// }

[1..10]
|> List.map (fun _ -> storeEvent)
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
}
// [1..10]
// |> List.map (fun _ -> storeEvent)
// |> Async.Parallel
// |> Async.RunSynchronously
// |> ignore
// }

// testTask "Store same event twice" {
// let streamId = cfg.GetStreamId()
Expand Down

0 comments on commit aa69c77

Please sign in to comment.