Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nitely committed Feb 23, 2024
1 parent 9af4a3e commit f6a7016
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 23 deletions.
81 changes: 59 additions & 22 deletions src/hyperx/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,62 @@ proc initStream(id: StreamId): Stream {.raises: [].} =
msgs: newQueue[MsgData](1)
)

type
StreamsClosedError* = object of HyperxError
Streams = object
t: Table[StreamId, Stream]
isClosed: bool

func initStreams(): Streams {.raises: [].} =
result = Streams(
t: initTable[StreamId, Stream](16),
isClosed: false
)

func get(s: var Streams, sid: StreamId): var Stream {.raises: [].} =
try:
result = s.t[sid]
except KeyError:
doAssert false, "sid is not a stream"

func del(s: var Streams, sid: StreamId) {.raises: [].} =
s.t.del sid

func contains(s: Streams, sid: StreamId): bool {.raises: [].} =
s.t.contains sid

func open(s: var Streams, sid: StreamId) {.raises: [StreamsClosedError].} =
doAssert sid notin s.t
if s.isClosed:
raise newException(StreamsClosedError, "Streams is closed")
s.t[sid] = initStream(sid)

iterator values(s: Streams): Stream {.inline.} =
for v in values s.t:
yield v

proc close(s: var Streams, sid: StreamId) {.raises: [].} =
if sid notin s:
return
let stream = s.get sid
stream.msgs.close()
s.del sid

proc close(s: var Streams) {.raises: [].} =
if s.isClosed:
return
s.isClosed = true
for stream in values s:
stream.msgs.close()

type
ClientContext* = ref object
sock: MyAsyncSocket
hostname: string
port: Port
isConnected: bool
headersEnc, headersDec: DynHeaders
streams: Table[StreamId, Stream]
streams: Streams
currStreamId: StreamId
sendMsgs, recvMsgs: QueueAsync[MsgData]
maxPeerStrmIdSeen: StreamId
Expand All @@ -168,10 +216,9 @@ proc newClient*(
sock: newMySocket(),
hostname: hostname,
port: port,
# XXX remove max headers limit
headersEnc: initDynHeaders(stgHeaderTableSize.int),
headersDec: initDynHeaders(stgHeaderTableSize.int),
streams: initTable[StreamId, Stream](16),
streams: initStreams(),
currStreamId: 1.StreamId,
recvMsgs: newQueue[MsgData](10),
sendMsgs: newQueue[MsgData](10),
Expand All @@ -196,16 +243,10 @@ proc close(client: ClientContext) {.raises: [InternalOsError].} =
finally:
client.sendMsgs.close()
client.recvMsgs.close()
# XXX race con may create stream but
# client is closed
for stream in values client.streams:
stream.msgs.close()
client.streams.close()

func stream(client: ClientContext, sid: StreamId): var Stream {.raises: [].} =
try:
result = client.streams[sid]
except KeyError:
doAssert false, "sid is not a stream"
client.streams.get sid

func stream(client: ClientContext, sid: FrmSid): var Stream {.raises: [].} =
client.stream sid.StreamId
Expand All @@ -214,12 +255,7 @@ proc close(client: ClientContext, sid: StreamId) {.raises: [].} =
# Close stream messages queue and delete stream from
# the client.
# This does nothing if the stream is already close
if sid notin client.streams:
return
let stream = client.stream(sid)
if not stream.msgs.isClosed:
stream.msgs.close()
client.streams.del stream.id
client.streams.close sid

func doTransitionSend(s: var Stream, frm: Frame) {.raises: [].} =
discard
Expand Down Expand Up @@ -371,20 +407,21 @@ proc read(client: ClientContext, frm: Frame) {.async.} =
debugInfo "Continuation"
await client.readUntilEnd(frm)

func openMainStream(client: ClientContext): StreamId {.raises: [].} =
func openMainStream(client: ClientContext): StreamId {.raises: [StreamsClosedError].} =
doAssert frmSidMain.StreamId notin client.streams
result = frmSidMain.StreamId
client.streams[result] = initStream result
client.streams.open result

func openStream(client: ClientContext): StreamId {.raises: [].} =
func openStream(client: ClientContext): StreamId {.raises: [StreamsClosedError].} =
# XXX some error if max sid is reached
# XXX error if maxStreams is reached
result = client.currStreamId
client.streams[result] = initStream result
client.streams.open result
# client uses odd numbers, and server even numbers
client.currStreamId += 2.StreamId

proc handshake(client: ClientContext) {.async.} =
doAssert client.isConnected
debugInfo "handshake"
# we need to do this before sending any other frame
# XXX: allow sending some params
Expand Down Expand Up @@ -534,7 +571,7 @@ proc responseDispatcherNaked(client: ClientContext) {.async.} =
# XXX need to reply as closed stream ?
debugInfo "stream not found " & $frm.sid.int
continue
let stream = client.streams[frm.sid.StreamId]
let stream = client.streams.get frm.sid.StreamId
try:
await stream.msgs.put msg
except QueueClosedError:
Expand Down
3 changes: 2 additions & 1 deletion src/hyperx/queue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func isClosed*[T](q: QueueAsync[T]): bool {.raises: [].} =
q.isClosed

proc close*[T](q: QueueAsync[T]) {.raises: [].} =
doAssert not q.isClosed
if q.isClosed:
return
q.isClosed = true
#let closedError = newQueueClosedError()
for ev in items q.putEv:
Expand Down

0 comments on commit f6a7016

Please sign in to comment.