Skip to content

Commit

Permalink
Expose SessionToken
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 12, 2020
1 parent de12988 commit 1a6c3e3
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 6 deletions.
5 changes: 3 additions & 2 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,9 @@ module internal Tip =
type [<NoComparison>] Token = { container: Container; stream: string; pos: Position }
module Token =
let create (container,stream) pos : StreamToken =
{ value = box { container = container; stream = stream; pos = pos }
version = pos.index }
{ value = box { container = container; stream = stream; pos = pos }
version = pos.index
sessionToken = null }
let (|Unpack|) (token: StreamToken) : Container*string*Position = let t = unbox<Token> token.value in t.container,t.stream,t.pos
let supersedes (Unpack (_,_,currentPos)) (Unpack (_,_,xPos)) =
let currentVersion, newVersion = currentPos.index, xPos.index
Expand Down
3 changes: 2 additions & 1 deletion src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ module Token =
{ value = box {
stream = { name = streamName}
pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } }
version = streamVersion }
version = streamVersion
sessionToken = null }

/// No batching / compaction; we only need to retain the StreamVersion
let ofNonCompacting streamName streamVersion : StreamToken =
Expand Down
3 changes: 2 additions & 1 deletion src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ module private Token =

let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken =
{ value = box { streamName = streamName; streamVersion = streamVersion }
version = int64 streamVersion }
version = int64 streamVersion
sessionToken = null }
let (|Unpack|) (token: StreamToken) : Token = unbox<Token> token.value
/// Represent a stream known to be empty
let ofEmpty streamName initial = streamTokenOfIndex streamName -1, initial
Expand Down
3 changes: 2 additions & 1 deletion src/Equinox.SqlStreamStore/SqlStreamStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ module Token =
{ value = box {
stream = { name = streamName}
pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } }
version = streamVersion }
version = streamVersion
sessionToken = null }
/// No batching / compaction; we only need to retain the StreamVersion
let ofNonCompacting streamName streamVersion : StreamToken =
create None None streamName streamVersion
Expand Down
7 changes: 6 additions & 1 deletion src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ open Serilog

/// Store-specific opaque token to be used for synchronization purposes
[<NoComparison>]
type StreamToken = { value : obj; version: int64 }
type StreamToken = { value : obj; version: int64; sessionToken : string }

/// Internal type used to represent the outcome of a TrySync operation
[<NoEquality; NoComparison; RequireQualifiedAccess>]
Expand Down Expand Up @@ -36,6 +36,10 @@ type ISyncContext<'state> =
/// Exposes the underlying Store's internal Version/Index (which, depending on the Codec, may or may not be reflected in the last event presented)
abstract member Version : int64

/// Exposes the underlying Store's internal SessionToken
// (relevant to CosmosDB)
abstract member SessionToken : string

/// The present State of the stream within the context of this Flow
abstract member State : 'state

Expand All @@ -61,6 +65,7 @@ module internal Flow =
member __.CreateMemento() = tokenAndState
member __.State = snd tokenAndState
member __.Version = (fst tokenAndState).version
member __.SessionToken = (fst tokenAndState).sessionToken

member __.TryWithoutResync(log : ILogger, events) : Async<bool> =
trySyncOr log events (fun _resync -> async { return false })
Expand Down

0 comments on commit 1a6c3e3

Please sign in to comment.