Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nitely committed Jan 27, 2024
1 parent 1915003 commit e258351
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions src/hyperx/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ type
ConnectionClosedError = object of HyperxConnectionError
InternalOSError = object of HyperxConnectionError
StrmError = object of HyperxError
StrmProtocolError = object of StrmError
StrmStreamClosedError = object of StrmError
code: ErrorCode

func newConnError(errCode: ErrorCode): ref ConnError =
result = (ref ConnError)(code: errCode, msg: "Connection Error: " & $errCode)

func newConnClosedError(): ref ConnectionClosedError =
result = (ref ConnectionClosedError)(msg: "Connection Closed")

func newStrmError(errCode: ErrorCode): ref StrmError =
result = (ref StrmError)(code: errCode, msg: "Stream Error: " & $errCode)

const
preface = "PRI * HTTP/2.0\r\L\r\LSM\r\L\r\L"
# https://httpwg.org/specs/rfc9113.html#SettingValues
Expand Down Expand Up @@ -183,7 +185,20 @@ proc initStream(id: StreamId): Stream =
)

proc read(s: Stream): Future[MsgData] {.async.} =
template frm: untyped = result.frm
template payload: untyped = result.payload
result = await s.msgs.pop()
# XXX move somewhere else
try:
if frm.typ == frmtWindowUpdate:
check payload.s.len > 0, newStrmError(errProtocolError)
except StrmError as err:
# This needs to be done here, it cannot be
# sent by the dispatcher or receiver
# since it may block
#if client.isConnected:
# await client.sendRstStream(err.code)
raise err

type
ClientContext* = ref object
Expand Down Expand Up @@ -245,7 +260,7 @@ proc doTransitionRecv(s: var Stream, frm: Frame) =
check frm.typ in frmRecvAllowed, newConnError(errProtocolError)
if not s.state.isAllowedToRecv frm:
if s.state == strmHalfClosedRemote:
raiseError StrmStreamClosedError
raise newStrmError(errStreamClosed) # XXX this probably cannot be done here
else:
raise newConnError(errProtocolError)
let event = frm.toEventRecv()
Expand Down Expand Up @@ -435,9 +450,6 @@ proc responseDispatcherNaked(client: ClientContext) {.async.} =
template frm: untyped = msg.frm
while client.isConnected:
let msg = await client.recvMsgs.pop()
if frm.sid.StreamId notin client.streams:
debugInfo "stream not found " & $frm.sid.int
continue
debugInfo "recv data on stream " & $frm.sid.int
if frm.sid == frmsidMain:
await consumeMainStream(client, msg)
Expand All @@ -446,14 +458,19 @@ proc responseDispatcherNaked(client: ClientContext) {.async.} =
client.maxPeerStrmIdSeen = max(
client.maxPeerStrmIdSeen.int, frm.sid.int
).StreamId
let stream = client.streams[frm.sid.StreamId]
if frm.typ == frmtHeaders:
# XXX implement initDecodedBytes as seq[byte] in hpack
var headers = initDecodedStr()
# can raise a connError
decode(msg.payload.s, headers, client.dynDecHeaders)
msg.payload.s.setLen 0
msg.payload.s.add $headers
# Process headers even if the stream
# does not exist
if frm.sid.StreamId notin client.streams:
debugInfo "stream not found " & $frm.sid.int
continue
let stream = client.streams[frm.sid.StreamId]
await stream.msgs.put msg

proc responseDispatcher(client: ClientContext) {.async.} =
Expand Down

0 comments on commit e258351

Please sign in to comment.