Skip to content

Commit

Permalink
only one ping in-progress per stream
Browse files Browse the repository at this point in the history
  • Loading branch information
nitely committed Oct 21, 2024
1 parent a86f6d3 commit c32dd00
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tests/functional/tconcurrentdata
tests/functional/tflowcontrol
tests/functional/tcancel
tests/functional/tcancelremote
tests/functional/tmisc
src/hyperx/client
src/hyperx/server
src/hyperx/clientserver
Expand Down
1 change: 1 addition & 0 deletions hyperx.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ task functest, "Func test":
exec "nim c -r -d:release tests/functional/tflowcontrol.nim"
exec "nim c -r -d:release tests/functional/tcancel.nim"
exec "nim c -r -d:release tests/functional/tcancelremote.nim"
exec "nim c -r -d:release tests/functional/tmisc.nim"

task funcserveinsec, "Func Serve Insecure":
exec "nim c -r -d:release tests/functional/tserverinsecure.nim"
Expand Down
17 changes: 11 additions & 6 deletions src/hyperx/clientserver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,8 @@ proc connect(client: ClientContext) {.async.} =
raise newHyperxConnError(err.msg)

proc failSilently(f: Future[void]) {.async.} =
## Be careful when wrapping non {.async.} procs,
## as they may raise before the wrap
if f == nil:
return
try:
Expand Down Expand Up @@ -922,12 +924,13 @@ proc read(stream: Stream): Future[Frame] {.async.} =
break
return frm

proc writeRst(strm: ClientStream, code: ErrorCode): Future[void] =
# this needs to be {.async.} to fail-silently
proc writeRst(strm: ClientStream, code: ErrorCode) {.async.} =
template stream: untyped = strm.stream
check stream.state in strmStateRstSendAllowed,
newStrmError errStreamClosed
strm.stateSend = csStateEnded
result = strm.write newRstStreamFrame(
await strm.write newRstStreamFrame(
stream.id.FrmSid, code.int
)

Expand Down Expand Up @@ -1217,10 +1220,12 @@ template with*(strm: ClientStream, body: untyped): untyped =
proc ping(strm: ClientStream) {.async.} =
# this is done for rst pings; only one stream ping
# will ever be in progress
# XXX avoid sending the ping if there is one in progress
let sig = strm.stream.pingSig.waitFor()
await strm.client.send newPingFrame(strm.stream.id.uint32)
await sig
if strm.stream.pingSig.len > 0:
await strm.stream.pingSig.waitFor()
else:
let sig = strm.stream.pingSig.waitFor()
await strm.client.send newPingFrame(strm.stream.id.uint32)
await sig

proc cancel*(strm: ClientStream, code: ErrorCode) {.async.} =
## This may never return until the stream/conn is closed.
Expand Down
3 changes: 3 additions & 0 deletions src/hyperx/signal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ proc newSignal*(): SignalAsync {.raises: [].} =
isClosed: false
)

proc len*(sig: SignalAsync): int {.raises: [].} =
sig.waiters.len

proc waitFor*(sig: SignalAsync): Future[void] {.raises: [SignalClosedError].} =
if sig.isClosed:
raise newSignalClosedError()
Expand Down
95 changes: 95 additions & 0 deletions tests/functional/tmisc.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
{.define: ssl.}
{.define: hyperxSanityCheck.}

import std/asyncdispatch
import ../../src/hyperx/client
import ../../src/hyperx/errors
import ./tutils.nim

template testAsync(name: string, body: untyped): untyped =
(proc () =
echo "test " & name
var checked = false
proc test() {.async.} =
body
checked = true
waitFor test()
doAssert not hasPendingOperations()
doAssert checked
)()

const defaultHeaders = @[
(":method", "POST"),
(":scheme", "https"),
(":path", "/foo"),
(":authority", "foo.bar"),
("user-agent", "HyperX/0.1"),
("content-type", "text/plain")
]

testAsync "cancel many times":
var checked = 0
var client = newClient(localHost, localPort)
with client:
let strm = client.newClientStream()
try:
with strm:
await strm.sendHeaders(defaultHeaders, finish = false)
var data = new string
await strm.recvHeaders(data)
doAssert data[] == ":status: 200\r\n"
await strm.cancel(errCancel)
await strm.cancel(errCancel)
inc checked
# XXX remove
raise newException(ValueError, "foo")
# XXX change with sendEnded/recvEnded to stream status check
except ValueError as err:
doAssert err.msg == "foo"
doAssert checked == 1

testAsync "cancel concurrently":
var checked = 0
var client = newClient(localHost, localPort)
with client:
let strm = client.newClientStream()
try:
with strm:
await strm.sendHeaders(defaultHeaders, finish = false)
var data = new string
await strm.recvHeaders(data)
doAssert data[] == ":status: 200\r\n"
let fut1 = strm.cancel(errCancel)
let fut2 = strm.cancel(errCancel)
await fut1
await fut2
inc checked
# XXX remove
raise newException(ValueError, "foo")
# XXX change with sendEnded/recvEnded to stream status check
except ValueError as err:
doAssert err.msg == "foo"
doAssert checked == 1

testAsync "cancel task":
var checked = 0
var client = newClient(localHost, localPort)
var cancelFut: Future[void]
with client:
let strm = client.newClientStream()
try:
with strm:
await strm.sendHeaders(defaultHeaders, finish = false)
var data = new string
await strm.recvHeaders(data)
doAssert data[] == ":status: 200\r\n"
cancelFut = strm.cancel(errCancel)
await strm.cancel(errCancel)
inc checked
# XXX remove
raise newException(ValueError, "foo")
# XXX change with sendEnded/recvEnded to stream status check
except ValueError as err:
doAssert err.msg == "foo"
await cancelFut
doAssert checked == 1

0 comments on commit c32dd00

Please sign in to comment.