diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index a5d15414d..0ac318abd 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -752,8 +752,9 @@ module internal Tip = type [] Token = { container: Container; stream: string; pos: Position } module Token = let create (container,stream) pos : StreamToken = - { value = box { container = container; stream = stream; pos = pos } - version = pos.index } + { value = box { container = container; stream = stream; pos = pos } + version = pos.index + sessionToken = null } let (|Unpack|) (token: StreamToken) : Container*string*Position = let t = unbox token.value in t.container,t.stream,t.pos let supersedes (Unpack (_,_,currentPos)) (Unpack (_,_,xPos)) = let currentVersion, newVersion = currentPos.index, xPos.index diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index feef7ff7a..24f9a8297 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -317,7 +317,8 @@ module Token = { value = box { stream = { name = streamName} pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } } - version = streamVersion } + version = streamVersion + sessionToken = null } /// No batching / compaction; we only need to retain the StreamVersion let ofNonCompacting streamName streamVersion : StreamToken = diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index 74e6fc994..73247449f 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -45,7 +45,8 @@ module private Token = let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken = { value = box { streamName = streamName; streamVersion = streamVersion } - version = int64 streamVersion } + version = int64 streamVersion + sessionToken = null } let (|Unpack|) (token: StreamToken) : Token = unbox token.value /// Represent a stream known to be empty let ofEmpty streamName initial = streamTokenOfIndex streamName -1, initial diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index d1cc56d69..86f26bbaa 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -298,7 +298,8 @@ module Token = { value = box { stream = { name = streamName} pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } } - version = streamVersion } + version = streamVersion + sessionToken = null } /// No batching / compaction; we only need to retain the StreamVersion let ofNonCompacting streamName streamVersion : StreamToken = create None None streamName streamVersion diff --git a/src/Equinox/Flow.fs b/src/Equinox/Flow.fs index 93bc1af0c..36f6aad71 100755 --- a/src/Equinox/Flow.fs +++ b/src/Equinox/Flow.fs @@ -6,7 +6,7 @@ open Serilog /// Store-specific opaque token to be used for synchronization purposes [] -type StreamToken = { value : obj; version: int64 } +type StreamToken = { value : obj; version: int64; sessionToken : string } /// Internal type used to represent the outcome of a TrySync operation [] @@ -36,6 +36,10 @@ type ISyncContext<'state> = /// Exposes the underlying Store's internal Version/Index (which, depending on the Codec, may or may not be reflected in the last event presented) abstract member Version : int64 + /// Exposes the underlying Store's internal SessionToken + // (relevant to CosmosDB) + abstract member SessionToken : string + /// The present State of the stream within the context of this Flow abstract member State : 'state @@ -61,6 +65,7 @@ module internal Flow = member __.CreateMemento() = tokenAndState member __.State = snd tokenAndState member __.Version = (fst tokenAndState).version + member __.SessionToken = (fst tokenAndState).sessionToken member __.TryWithoutResync(log : ILogger, events) : Async = trySyncOr log events (fun _resync -> async { return false })