Skip to content

Commit

Permalink
Configurable collection name #25
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzoukr committed May 14, 2019
1 parent aa69c77 commit e1b689b
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 26 deletions.
2 changes: 2 additions & 0 deletions src/CosmoStore.CosmosDb/CosmosDb.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ module internal Throughput =

type Configuration = {
DatabaseName : string
CollectionName : string
ServiceEndpoint : Uri
AuthKey : string
Throughput: int
}
with
static member CreateDefault serviceEndpoint authKey = {
DatabaseName = "EventStore"
CollectionName = "Events"
ServiceEndpoint = serviceEndpoint
AuthKey = authKey
Throughput = Throughput.min
Expand Down
46 changes: 23 additions & 23 deletions src/CosmoStore.CosmosDb/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ open CosmoStore.CosmosDb
open System.Reactive.Linq
open System.Reactive.Concurrency

let private collectionName = "Events"
let private partitionKey = "streamId"
let private appendEventProcName = "AppendEvents"

Expand All @@ -22,8 +21,8 @@ let private createDatabase dbName (client:DocumentClient) =
return ()
}

let private createCollection (dbUri:Uri) (throughput:int) (client:DocumentClient) =
let collection = DocumentCollection( Id = collectionName)
let private createCollection (dbUri:Uri) (conf:Configuration) (client:DocumentClient) =
let collection = DocumentCollection( Id = conf.CollectionName)

// always use partition key
collection.PartitionKey.Paths.Add(sprintf "/%s" partitionKey)
Expand All @@ -37,8 +36,8 @@ let private createCollection (dbUri:Uri) (throughput:int) (client:DocumentClient
collection.UniqueKeyPolicy <- UniqueKeyPolicy(UniqueKeys = keys)

// throughput
let throughput = throughput |> Throughput.correct
let ro = new RequestOptions()
let throughput = conf.Throughput |> Throughput.correct
let ro = RequestOptions()
ro.OfferThroughput <- Nullable<int>(throughput)

task {
Expand All @@ -54,13 +53,14 @@ let private getStoredProcedure name =
return! reader.ReadToEndAsync()
}

let private createStoreProcedures (collUri:Uri) (procUri:Uri) (client:DocumentClient) =
let private createStoreProcedures (collUri:Uri) (procUri:Uri) collectionName (client:DocumentClient) =
task {
try
let! _ = client.DeleteStoredProcedureAsync(procUri)
()
with _ -> ()
let! storedProc = getStoredProcedure "AppendEvents.js"
let! storedProcTemplate = getStoredProcedure "AppendEvents.js"
let storedProc = storedProcTemplate.Replace("%%COLLECTION_NAME%%", collectionName)
let! _ = client.CreateStoredProcedureAsync(collUri, StoredProcedure(Id = appendEventProcName, Body = storedProc))
return ()
}
Expand Down Expand Up @@ -90,12 +90,12 @@ let private createQuery q (pars:(string * obj) list) =
SqlQuerySpec(q, ps)

let private runQuery<'a> (client:DocumentClient) (collectionUri:Uri) (q:SqlQuerySpec) =
let opts = new FeedOptions()
let opts = FeedOptions()
opts.EnableCrossPartitionQuery <- true
opts.EnableScanInQuery <- Nullable<bool>(true)
client.CreateDocumentQuery<'a>(collectionUri, q, opts)

let private getStreams (client:DocumentClient) (collectionUri:Uri) (streamsRead:StreamsReadFilter) =
let private getStreams (client:DocumentClient) (collectionUri:Uri) collectionName (streamsRead:StreamsReadFilter) =
task {
let queryAdd,param = streamsRead |> streamsReadToQuery
return createQuery
Expand All @@ -112,7 +112,7 @@ let private streamEventsReadToQuery = function
| ToPosition pos -> sprintf "AND e.position <= %i" pos
| PositionRange(st,en) -> sprintf "AND e.position >= %i AND e.position <= %i" st en

let private getEvents (client:DocumentClient) (collectionUri:Uri) streamId (eventsRead:EventsReadRange) =
let private getEvents (client:DocumentClient) (collectionUri:Uri) collectionName streamId (eventsRead:EventsReadRange) =
task {
return createQuery
(sprintf "SELECT * FROM %s e WHERE e.streamId = @streamId AND e.type = 'Event' %s ORDER BY e.position ASC" collectionName (streamEventsReadToQuery eventsRead))
Expand All @@ -122,14 +122,14 @@ let private getEvents (client:DocumentClient) (collectionUri:Uri) streamId (even
|> List.map Conversion.documentToEventRead
}

let private getEvent (client:DocumentClient) (collectionUri:Uri) streamId position =
let private getEvent (client:DocumentClient) (collectionUri:Uri) collectionName streamId position =
task {
let filter = EventsReadRange.PositionRange(position, position)
let! events = getEvents client collectionUri streamId filter
let! events = getEvents client collectionUri collectionName streamId filter
return events.Head
}

let private getEventsByCorrelationId (client:DocumentClient) (collectionUri:Uri) (corrId:Guid) =
let private getEventsByCorrelationId (client:DocumentClient) (collectionUri:Uri) collectionName (corrId:Guid) =
task {
return createQuery
(sprintf "SELECT * FROM %s e WHERE e.correlationId = @corrId AND e.type = 'Event' ORDER BY e.createdUtc ASC" collectionName)
Expand All @@ -139,7 +139,7 @@ let private getEventsByCorrelationId (client:DocumentClient) (collectionUri:Uri)
|> List.map Conversion.documentToEventRead
}

let private getStream (client:DocumentClient) (collectionUri:Uri) streamId =
let private getStream (client:DocumentClient) (collectionUri:Uri) collectionName streamId =
task {
return createQuery
(sprintf "SELECT * FROM %s e WHERE e.type = 'Stream' AND e.streamId = @streamId" collectionName) [("@streamId", streamId :> obj)]
Expand All @@ -154,14 +154,14 @@ let private getRequestOptions streamId = RequestOptions(PartitionKey = Partition
let getEventStore (configuration:Configuration) =
let client = new DocumentClient(configuration.ServiceEndpoint, configuration.AuthKey)
let dbUri = UriFactory.CreateDatabaseUri(configuration.DatabaseName)
let eventsCollectionUri = UriFactory.CreateDocumentCollectionUri(configuration.DatabaseName, collectionName)
let appendEventProcUri = UriFactory.CreateStoredProcedureUri(configuration.DatabaseName, collectionName, appendEventProcName)
let eventsCollectionUri = UriFactory.CreateDocumentCollectionUri(configuration.DatabaseName, configuration.CollectionName)
let appendEventProcUri = UriFactory.CreateStoredProcedureUri(configuration.DatabaseName, configuration.CollectionName, appendEventProcName)
let eventAppended = Event<EventRead>()

task {
do! createDatabase configuration.DatabaseName client
do! createCollection dbUri configuration.Throughput client
do! createStoreProcedures eventsCollectionUri appendEventProcUri client
do! createCollection dbUri configuration client
do! createStoreProcedures eventsCollectionUri appendEventProcUri configuration.CollectionName client
} |> Async.AwaitTask |> Async.RunSynchronously

{
Expand All @@ -179,10 +179,10 @@ let getEventStore (configuration:Configuration) =
return events
}

GetEvent = getEvent client eventsCollectionUri
GetEvents = getEvents client eventsCollectionUri
GetEventsByCorrelationId = getEventsByCorrelationId client eventsCollectionUri
GetStreams = getStreams client eventsCollectionUri
GetStream = getStream client eventsCollectionUri
GetEvent = getEvent client eventsCollectionUri configuration.CollectionName
GetEvents = getEvents client eventsCollectionUri configuration.CollectionName
GetEventsByCorrelationId = getEventsByCorrelationId client eventsCollectionUri configuration.CollectionName
GetStreams = getStreams client eventsCollectionUri configuration.CollectionName
GetStream = getStream client eventsCollectionUri configuration.CollectionName
EventAppended = Observable.ObserveOn(eventAppended.Publish :> IObservable<_>, ThreadPoolScheduler.Instance)
}
2 changes: 1 addition & 1 deletion src/CosmoStore.CosmosDb/StoredProcedures/AppendEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
}

// metadata query
var metadataQuery = 'SELECT * FROM Events e WHERE e.streamId = "' + streamId + '" AND e.type = "' + streamType + '"';
var metadataQuery = 'SELECT * FROM %%COLLECTION_NAME%% e WHERE e.streamId = "' + streamId + '" AND e.type = "' + streamType + '"';
var transactionAccepted = collection.queryDocuments(collection.getSelfLink(), metadataQuery, run);
if (!transactionAccepted) throw "Transaction not accepted, rollback";
}
6 changes: 4 additions & 2 deletions tests/CosmoStore.CosmosDb.Tests/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ open CosmoStore.Tests
open CosmoStore.CosmosDb
open Microsoft.Azure.Documents.Client

let collectionName = "MyEvents"

let private config =
CosmoStore.CosmosDb.Configuration.CreateDefault
(Uri "https://localhost:8081")
"C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
|> fun cfg -> { cfg with DatabaseName = "CosmoStoreTests"; Throughput = 10000 }
|> fun cfg -> { cfg with DatabaseName = "CosmoStoreTests"; Throughput = 10000; CollectionName = collectionName }

let private getCleanEventStore() =
let client = new DocumentClient(config.ServiceEndpoint, config.AuthKey)
try
do client.DeleteDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(config.DatabaseName, "Events"))
do client.DeleteDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(config.DatabaseName, collectionName))
|> Async.AwaitTask
|> Async.RunSynchronously
|> ignore
Expand Down

0 comments on commit e1b689b

Please sign in to comment.