Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nitely committed Jan 21, 2024
1 parent ffdf949 commit 1e2926a
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 11 deletions.
46 changes: 41 additions & 5 deletions src/hyperx/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,28 @@ const
errInadequateSecurity = 0x0c.ErrorCode
errHttp11Required = 0x0d.ErrorCode

func `$`(errCode: ErrorCode): string =
case errCode
of errNoError: "NO_ERROR"
of errProtocolError: "PROTOCOL_ERROR"
of errInternalError: "INTERNAL_ERROR"
of errFlowControlError: "FLOW_CONTROL_ERROR"
of errSettingsTimeout: "SETTINGS_TIMEOUT"
of errStreamClosed: "STREAM_CLOSED"
of errFrameSizeError: "FRAME_SIZE_ERROR"
of errRefusedStream: "REFUSED_STREAM"
of errCancel: "CANCEL"
of errCompressionError: "COMPRESSION_ERROR"
of errConnectError: "CONNECT_ERROR"
of errEnhanceYourCalm: "ENHANCE_YOUR_CALM"
of errInadequateSecurity: "INADEQUATE_SECURITY"
of errHttp11Required: "HTTP_1_1_REQUIRED"
else: "UNKNOWN ERROR CODE"

type
HyperxError* = object of CatchableError
ConnError = object of HyperxError
HyperxConnectionError* = object of HyperxError
ConnError = object of HyperxConnectionError
code: ErrorCode
FrameSizeError = object of HyperxError
StrmError = object of HyperxError
Expand All @@ -42,7 +61,7 @@ type
ConnectionClosedError = object of OSError

func newConnError(errCode: ErrorCode): ref ConnError =
result = (ref ConnError)(code: errCode)
result = (ref ConnError)(code: errCode, msg: "Connection Error: " & $errCode)

func newConnClosedError(): ref ConnectionClosedError =
result = (ref ConnectionClosedError)()
Expand Down Expand Up @@ -371,7 +390,15 @@ proc connect(client: ClientContext) {.async.} =
await client.startHandshake()

proc close(client: ClientContext) =
if not client.isConnected:
return
client.isConnected = false
client.sock.close()
client.dptMsgs.close()
# XXX race con may create stream but
# client is closed
for stream in values client.streams:
stream.strmMsgs.close()

proc consumeMainStream(client: ClientContext, dptMsg: DptMsgData) {.async.} =
# XXX process settings, window updates, etc
Expand Down Expand Up @@ -431,6 +458,7 @@ proc responseDispatcher(client: ClientContext) {.async.} =
raise err
finally:
debugInfo "responseDispatcher exited"
client.close()

proc recvTaskNaked(client: ClientContext) {.async.} =
## Receive frames and dispatch to opened streams
Expand Down Expand Up @@ -465,6 +493,7 @@ proc recvTask(client: ClientContext) {.async.} =
raise err
finally:
debugInfo "recvTask exited"
client.close()

template withConnection*(
client: ClientContext,
Expand All @@ -473,13 +502,16 @@ template withConnection*(
block:
var recvFut: Future[void]
var waitForRecvFut = false
var respFut: Future[void]
var waitForRespFut = false
try:
debugInfo "connecting"
await client.connect()
debugInfo "connected"
recvFut = client.recvTask()
waitForRecvFut = true
asyncCheck client.responseDispatcher()
respFut = client.responseDispatcher()
waitForRespFut = true
block:
body
except Exception as err:
Expand All @@ -488,11 +520,15 @@ template withConnection*(
finally:
debugInfo "exit"
client.close()
client.isConnected = false
try:
if waitForRecvFut:
await recvFut
except ConnectionClosedError:
except ConnectionClosedError, QueueClosedError:
discard
try:
if waitForRespFut:
await respFut
except ConnectionClosedError, QueueClosedError:
discard

type
Expand Down
24 changes: 23 additions & 1 deletion src/hyperx/queue.nim
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import std/asyncdispatch
import std/deques

type
QueueError* = object of CatchableError
QueueClosedError* = object of QueueError

func newQueueClosedError(): ref QueueClosedError =
result = (ref QueueClosedError)(msg: "Queue is closed")

type
QueueAsync*[T] = ref object
s: Deque[T]
size, used: int
# XXX use/reuse FutureVars
putEv, popEv: Deque[Future[void]]
isClosed: bool

proc newQueue*[T](size: int): QueueAsync[T] =
doAssert size > 0
Expand All @@ -16,7 +24,8 @@ proc newQueue*[T](size: int): QueueAsync[T] =
size: size,
used: 0,
putEv: initDeque[Future[void]](2),
popEv: initDeque[Future[void]](2)
popEv: initDeque[Future[void]](2),
isClosed: false
)

proc popEvent[T](q: QueueAsync[T]): Future[void] =
Expand All @@ -37,6 +46,8 @@ proc putDone[T](q: QueueAsync[T]) =

proc put*[T](q: QueueAsync[T], v: T) {.async.} =
doAssert q.used <= q.size
if q.isClosed:
raise newQueueClosedError()
if q.used == q.size:
await q.popEvent()
q.s.addFirst v
Expand All @@ -46,13 +57,24 @@ proc put*[T](q: QueueAsync[T], v: T) {.async.} =

proc pop*[T](q: QueueAsync[T]): Future[T] {.async.} =
doAssert q.used >= 0
if q.isClosed:
raise newQueueClosedError()
if q.used == 0:
await q.putEvent()
result = q.s.popLast()
dec q.used
doAssert q.used >= 0
q.popDone()

proc close*[T](q: QueueAsync[T]) =
doAssert not q.isClosed
q.isClosed = true
let closedError = newQueueClosedError()
for ev in items q.putEv:
ev.fail(closedError)
for ev in items q.popEv:
ev.fail(closedError)

when isMainModule:
block:
proc test() {.async.} =
Expand Down
10 changes: 7 additions & 3 deletions src/hyperx/testutils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func toString(bytes: openArray[byte]): string =
result = newString(L)
copyMem(result.cstring, bytes[0].unsafeAddr, L)

proc frame(
proc frame*(
typ: FrmTyp,
sid: FrmSid,
pl: FrmPayloadLen,
Expand All @@ -44,8 +44,12 @@ proc frameStr(
): string =
result = frame(typ, sid, pl, flags).rawStr()

proc headerFrame*(sid: FrmSid, pl: FrmPayloadLen): Frame =
result = frame(frmtHeaders, sid, pl, @[frmfEndHeaders])
proc headerFrame*(
sid: FrmSid,
pl: FrmPayloadLen,
flags = @[frmfEndHeaders]
): Frame =
result = frame(frmtHeaders, sid, pl, flags)

proc dataFrame*(sid: FrmSid, pl: FrmPayloadLen): Frame =
result = frame(frmtData, sid, pl, @[frmfEndStream])
Expand Down
28 changes: 26 additions & 2 deletions tests/testclient.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{.define: ssl.}

import std/strutils
import std/asyncdispatch
import ../src/hyperx/client
import ../src/hyperx/testutils
Expand Down Expand Up @@ -107,9 +108,11 @@ testAsync "response with headers prio":
let encHeaders = hencode(tc, headers)
var frm1 = headerFrame(
tc.sid.FrmSid,
encHeaders.len.FrmPayloadLen
(encHeaders.len + frmPrioritySize).FrmPayloadLen,
@[frmfPriority, frmfEndHeaders]
)
await tc.reply(frm1, encHeaders)
let prio = "12345"
await tc.reply(frm1, prio & encHeaders)
var frm2 = dataFrame(
tc.sid.FrmSid,
text.len.FrmPayloadLen
Expand All @@ -133,3 +136,24 @@ testAsync "response with headers prio":
doAssert tc.resps[0].text == text
doAssert tc.resps[1].headers == headers2
doAssert tc.resps[1].text == text2

testAsync "response with bad prio length":
var tc = newTestClient("foo.bar")
proc replyPrio(tc: TestClientContext) {.async.} =
let prio = "1"
var frm1 = headerFrame(
tc.sid.FrmSid,
prio.len.FrmPayloadLen,
@[frmfPriority, frmfEndHeaders]
)
await tc.reply(frm1, prio)
var errorMsg = ""
try:
withConnection tc:
await (
tc.get("/") and
tc.replyPrio()
)
except HyperxConnectionError as err:
errorMsg = err.msg
doAssert "PROTOCOL_ERROR" in errorMsg

0 comments on commit 1e2926a

Please sign in to comment.