diff --git a/src/hyperx/clientserver.nim b/src/hyperx/clientserver.nim index 1c7ef43..64e07bc 100644 --- a/src/hyperx/clientserver.nim +++ b/src/hyperx/clientserver.nim @@ -26,8 +26,9 @@ const SSL_OP_NO_RENEGOTIATION = 1073741824 const SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION = 65536 const - preface* = "PRI * HTTP/2.0\r\L\r\LSM\r\L\r\L" - statusLineLen* = ":status: xxx\r\n".len + preface = "PRI * HTTP/2.0\r\L\r\LSM\r\L\r\L" + statusLineLen = ":status: xxx\r\n".len + maxStreamId = (1'u32 shl 31) - 1'u32 # https://httpwg.org/specs/rfc9113.html#SettingValues stgHeaderTableSize* = 4096'u32 stgInitialMaxConcurrentStreams* = uint32.high @@ -184,16 +185,6 @@ func openMainStream(client: ClientContext): Stream {.raises: [StreamsClosedError doAssert frmSidMain notin client.streams result = client.streams.open(frmSidMain, client.peerWindowSize.int32) -func openStream(client: ClientContext): Stream {.raises: [StreamsClosedError, GracefulShutdownError].} = - # XXX some error if max sid is reached - # XXX error if maxStreams is reached - doAssert client.typ == ctClient - check not client.isGracefulShutdown, newGracefulShutdownError() - var sid = client.currStreamId - sid += (if sid == StreamId 0: StreamId 1 else: StreamId 2) - result = client.streams.open(sid, client.peerWindowSize.int32) - client.currStreamId = sid - func maxPeerStreamIdSeen(client: ClientContext): StreamId {.raises: [].} = case client.typ of ctClient: StreamId 0 @@ -296,6 +287,7 @@ proc sendNaked(client: ClientContext, frm: Frame) {.async.} = debugInfo debugPayload(frm) doAssert frm.payloadLen.int == frm.payload.len doAssert frm.payload.len <= client.peerMaxFrameSize.int + doAssert frm.sid <= StreamId maxStreamId check not client.sock.isClosed, newConnClosedError() GC_ref frm try: @@ -794,10 +786,13 @@ func newClientStream*(client: ClientContext, stream: Stream): ClientStream = ) func newClientStream*(client: ClientContext): ClientStream = - let stream = client.openStream() + doAssert client.typ == ctClient + check not client.isGracefulShutdown, newGracefulShutdownError() + let stream = client.streams.dummy() newClientStream(client, stream) proc close(strm: ClientStream) {.raises: [].} = + strm.stream.close() strm.client.streams.close(strm.stream.id) strm.bodyRecvSig.close() strm.headersRecvSig.close() @@ -806,6 +801,17 @@ proc close(strm: ClientStream) {.raises: [].} = except SignalClosedError: discard +func openStream(strm: ClientStream) {.raises: [StreamsClosedError, GracefulShutdownError].} = + # XXX some error if max sid is reached + # XXX error if maxStreams is reached + template client: untyped = strm.client + doAssert client.typ == ctClient + check not client.isGracefulShutdown, newGracefulShutdownError() + var sid = client.currStreamId + sid += (if sid == StreamId 0: StreamId 1 else: StreamId 2) + client.streams.open(strm.stream, sid, client.peerWindowSize.int32) + client.currStreamId = sid + func recvEnded*(strm: ClientStream): bool {.raises: [].} = strm.stateRecv == csStateEnded and strm.headersRecv.len == 0 and @@ -1063,6 +1069,8 @@ proc sendHeadersImpl*( doAssert strm.stream.state in strmStateHeaderSendAllowed doAssert strm.stateSend == csStateOpened or (strm.stateSend in {csStateHeaders, csStateData} and finish) + if strm.stream.state == strmIdle: + strm.openStream() strm.stateSend = csStateHeaders var frm = newFrame() frm.add headers @@ -1168,6 +1176,7 @@ template with*(strm: ClientStream, body: untyped): untyped = proc ping(client: ClientContext, strm: Stream) {.async.} = # this is done for rst and go-away pings; only one stream ping # will ever be in progress + doAssert strm.id in client.streams if strm.pingSig.len > 0: await strm.pingSig.waitFor() else: diff --git a/src/hyperx/frame.nim b/src/hyperx/frame.nim index 847cc08..b8eb4e2 100644 --- a/src/hyperx/frame.nim +++ b/src/hyperx/frame.nim @@ -114,6 +114,7 @@ type proc `==`*(a, b: FrmSid): bool {.borrow.} proc `+=`*(a: var FrmSid, b: FrmSid) {.borrow.} proc `<`*(a, b: FrmSid): bool {.borrow.} +proc `<=`*(a, b: FrmSid): bool {.borrow.} #func `+=`*(a: var FrmSid, b: uint) {.raises: [].} = # a = (a.uint + b).FrmSid diff --git a/src/hyperx/stream.nim b/src/hyperx/stream.nim index b2ab27b..daaa637 100644 --- a/src/hyperx/stream.nim +++ b/src/hyperx/stream.nim @@ -225,15 +225,31 @@ func del*(s: var Streams, sid: StreamId) {.raises: [].} = func contains*(s: Streams, sid: StreamId): bool {.raises: [].} = s.t.contains sid +func dummy*(s: var Streams): Stream {.raises: [StreamsClosedError].} = + check not s.isClosed, newStreamsClosedError("Cannot open stream") + result = newStream(uint32.high.StreamId, 0) + func open*( s: var Streams, + stream: Stream, sid: StreamId, peerWindow: int32 -): Stream {.raises: [StreamsClosedError].} = +) {.raises: [StreamsClosedError].} = doAssert sid notin s.t, $sid.int + doAssert stream.id == uint32.high.StreamId + doAssert stream.state == strmIdle check not s.isClosed, newStreamsClosedError("Cannot open stream") - result = newStream(sid, peerWindow) - s.t[sid] = result + stream.id = sid + stream.peerWindow = peerWindow + s.t[sid] = stream + +func open*( + s: var Streams, + sid: StreamId, + peerWindow: int32 +): Stream {.raises: [StreamsClosedError].} = + result = s.dummy() + s.open(result, sid, peerWindow) iterator values*(s: Streams): Stream {.inline.} = for v in values s.t: diff --git a/tests/functional/tmisc.nim b/tests/functional/tmisc.nim index 01385d5..14a6be7 100644 --- a/tests/functional/tmisc.nim +++ b/tests/functional/tmisc.nim @@ -21,6 +21,12 @@ template testAsync(name: string, body: untyped): untyped = doAssert checked )() +proc sleepCycle: Future[void] = + let fut = newFuture[void]() + proc wakeup = fut.complete() + callSoon wakeup + return fut + const defaultHeaders = @[ (":method", "POST"), (":scheme", "https"), @@ -107,6 +113,8 @@ testAsync "server graceful close": inc checked doAssert checked == 2 +# XXX cannot longer check server is sending GoAway +# since cannot create a stream before sending headers testAsync "send after server graceful close": var checked = 0 var client = newClient(localHost, localPort) @@ -127,10 +135,8 @@ testAsync "send after server graceful close": try: with strm2: await strm2.sendHeaders(defaultHeaders, finish = true) - var data = new string - await strm2.recvHeaders(data) doAssert false - except HyperxError: + except GracefulShutdownError: inc checked doAssert checked == 2 @@ -162,20 +168,69 @@ testAsync "client graceful close": inc checked doAssert checked == 2 +# XXX cannot longer create a stream before shutdown +# and send on it after because of lazy stream id testAsync "send after client graceful close": var checked = 0 var client = newClient(localHost, localPort) with client: + let strm = client.newClientStream() + await client.gracefulClose() + try: + with strm: + await strm.sendHeaders(defaultHeaders, finish = true) + doAssert false + except GracefulShutdownError: + inc checked + doAssert checked == 1 + +testAsync "lazy client stream id": + var order = new seq[int] + order[] = newSeq[int]() + var checked = new int + checked[] = 0 + proc stream(client: ClientContext, sleep = 0) {.async.} = let strm = client.newClientStream() with strm: - # This is not correct usage - await client.gracefulClose() + for i in 0 .. sleep-1: + await sleepCycle() + #echo $sleep + order[].add sleep + var headers = defaultHeaders + headers.add ("x-no-echo-headers", "true") + await strm.sendHeaders(headers, finish = false) + var data = new string + await strm.recvHeaders(data) + doAssert data[] == ":status: 200\r\n" + data[] = "foobar" & $sleep + await strm.sendBody(data, finish = true) + data[] = "" + await strm.recvBody(data) + doAssert data[] == "foobar" & $sleep + checked[] += 1 + checked[] += 1 + var client = newClient(localHost, localPort) + with client: + let strm1 = client.stream(200) + let strm2 = client.stream(100) + await strm1 + await strm2 + doAssert order[] == @[100, 200] + checked[] += 1 + doAssert checked[] == 5 + +testAsync "cancel lazy stream": + var checked = 0 + var client = newClient(localHost, localPort) + with client: + let strm = client.newClientStream() + await strm.cancel(hyxCancel) + with strm: try: await strm.sendHeaders(defaultHeaders, finish = true) - var data = new string - await strm.recvHeaders(data) - except HyperxConnError: - #doAssert err.code == errNoError - inc checked + doAssert false + except HyperxStrmError as err: + doAssert err.code == hyxStreamClosed + inc checked inc checked doAssert checked == 2 diff --git a/tests/testclient.nim b/tests/testclient.nim index 1bf2451..99573ec 100644 --- a/tests/testclient.nim +++ b/tests/testclient.nim @@ -428,10 +428,10 @@ testAsync "stream error NO_ERROR handling": await tc.checkHandshake() let strm = tc.client.newClientStream() with strm: - await tc.replyNoError(strm.stream.id) let sendFut = strm.send(dataOut) let recvFut = strm.recv(dataIn) - await sendFut # this could raise + await sendFut + await tc.replyNoError(strm.stream.id) await recvFut # this should never raise doAssert dataIn[] == headers