Skip to content

Commit

Permalink
Lazy client stream id (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
nitely authored Dec 30, 2024
1 parent 6c47f27 commit 8860133
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 28 deletions.
35 changes: 22 additions & 13 deletions src/hyperx/clientserver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ const SSL_OP_NO_RENEGOTIATION = 1073741824
const SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION = 65536

const
preface* = "PRI * HTTP/2.0\r\L\r\LSM\r\L\r\L"
statusLineLen* = ":status: xxx\r\n".len
preface = "PRI * HTTP/2.0\r\L\r\LSM\r\L\r\L"
statusLineLen = ":status: xxx\r\n".len
maxStreamId = (1'u32 shl 31) - 1'u32
# https://httpwg.org/specs/rfc9113.html#SettingValues
stgHeaderTableSize* = 4096'u32
stgInitialMaxConcurrentStreams* = uint32.high
Expand Down Expand Up @@ -184,16 +185,6 @@ func openMainStream(client: ClientContext): Stream {.raises: [StreamsClosedError
doAssert frmSidMain notin client.streams
result = client.streams.open(frmSidMain, client.peerWindowSize.int32)

func openStream(client: ClientContext): Stream {.raises: [StreamsClosedError, GracefulShutdownError].} =
# XXX some error if max sid is reached
# XXX error if maxStreams is reached
doAssert client.typ == ctClient
check not client.isGracefulShutdown, newGracefulShutdownError()
var sid = client.currStreamId
sid += (if sid == StreamId 0: StreamId 1 else: StreamId 2)
result = client.streams.open(sid, client.peerWindowSize.int32)
client.currStreamId = sid

func maxPeerStreamIdSeen(client: ClientContext): StreamId {.raises: [].} =
case client.typ
of ctClient: StreamId 0
Expand Down Expand Up @@ -296,6 +287,7 @@ proc sendNaked(client: ClientContext, frm: Frame) {.async.} =
debugInfo debugPayload(frm)
doAssert frm.payloadLen.int == frm.payload.len
doAssert frm.payload.len <= client.peerMaxFrameSize.int
doAssert frm.sid <= StreamId maxStreamId
check not client.sock.isClosed, newConnClosedError()
GC_ref frm
try:
Expand Down Expand Up @@ -794,10 +786,13 @@ func newClientStream*(client: ClientContext, stream: Stream): ClientStream =
)

func newClientStream*(client: ClientContext): ClientStream =
let stream = client.openStream()
doAssert client.typ == ctClient
check not client.isGracefulShutdown, newGracefulShutdownError()
let stream = client.streams.dummy()
newClientStream(client, stream)

proc close(strm: ClientStream) {.raises: [].} =
strm.stream.close()
strm.client.streams.close(strm.stream.id)
strm.bodyRecvSig.close()
strm.headersRecvSig.close()
Expand All @@ -806,6 +801,17 @@ proc close(strm: ClientStream) {.raises: [].} =
except SignalClosedError:
discard

func openStream(strm: ClientStream) {.raises: [StreamsClosedError, GracefulShutdownError].} =
# XXX some error if max sid is reached
# XXX error if maxStreams is reached
template client: untyped = strm.client
doAssert client.typ == ctClient
check not client.isGracefulShutdown, newGracefulShutdownError()
var sid = client.currStreamId
sid += (if sid == StreamId 0: StreamId 1 else: StreamId 2)
client.streams.open(strm.stream, sid, client.peerWindowSize.int32)
client.currStreamId = sid

func recvEnded*(strm: ClientStream): bool {.raises: [].} =
strm.stateRecv == csStateEnded and
strm.headersRecv.len == 0 and
Expand Down Expand Up @@ -1063,6 +1069,8 @@ proc sendHeadersImpl*(
doAssert strm.stream.state in strmStateHeaderSendAllowed
doAssert strm.stateSend == csStateOpened or
(strm.stateSend in {csStateHeaders, csStateData} and finish)
if strm.stream.state == strmIdle:
strm.openStream()
strm.stateSend = csStateHeaders
var frm = newFrame()
frm.add headers
Expand Down Expand Up @@ -1168,6 +1176,7 @@ template with*(strm: ClientStream, body: untyped): untyped =
proc ping(client: ClientContext, strm: Stream) {.async.} =
# this is done for rst and go-away pings; only one stream ping
# will ever be in progress
doAssert strm.id in client.streams
if strm.pingSig.len > 0:
await strm.pingSig.waitFor()
else:
Expand Down
1 change: 1 addition & 0 deletions src/hyperx/frame.nim
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type
proc `==`*(a, b: FrmSid): bool {.borrow.}
proc `+=`*(a: var FrmSid, b: FrmSid) {.borrow.}
proc `<`*(a, b: FrmSid): bool {.borrow.}
proc `<=`*(a, b: FrmSid): bool {.borrow.}

#func `+=`*(a: var FrmSid, b: uint) {.raises: [].} =
# a = (a.uint + b).FrmSid
Expand Down
22 changes: 19 additions & 3 deletions src/hyperx/stream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,31 @@ func del*(s: var Streams, sid: StreamId) {.raises: [].} =
func contains*(s: Streams, sid: StreamId): bool {.raises: [].} =
s.t.contains sid

func dummy*(s: var Streams): Stream {.raises: [StreamsClosedError].} =
check not s.isClosed, newStreamsClosedError("Cannot open stream")
result = newStream(uint32.high.StreamId, 0)

func open*(
s: var Streams,
stream: Stream,
sid: StreamId,
peerWindow: int32
): Stream {.raises: [StreamsClosedError].} =
) {.raises: [StreamsClosedError].} =
doAssert sid notin s.t, $sid.int
doAssert stream.id == uint32.high.StreamId
doAssert stream.state == strmIdle
check not s.isClosed, newStreamsClosedError("Cannot open stream")
result = newStream(sid, peerWindow)
s.t[sid] = result
stream.id = sid
stream.peerWindow = peerWindow
s.t[sid] = stream

func open*(
s: var Streams,
sid: StreamId,
peerWindow: int32
): Stream {.raises: [StreamsClosedError].} =
result = s.dummy()
s.open(result, sid, peerWindow)

iterator values*(s: Streams): Stream {.inline.} =
for v in values s.t:
Expand Down
75 changes: 65 additions & 10 deletions tests/functional/tmisc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ template testAsync(name: string, body: untyped): untyped =
doAssert checked
)()

proc sleepCycle: Future[void] =
let fut = newFuture[void]()
proc wakeup = fut.complete()
callSoon wakeup
return fut

const defaultHeaders = @[
(":method", "POST"),
(":scheme", "https"),
Expand Down Expand Up @@ -107,6 +113,8 @@ testAsync "server graceful close":
inc checked
doAssert checked == 2

# XXX cannot longer check server is sending GoAway
# since cannot create a stream before sending headers
testAsync "send after server graceful close":
var checked = 0
var client = newClient(localHost, localPort)
Expand All @@ -127,10 +135,8 @@ testAsync "send after server graceful close":
try:
with strm2:
await strm2.sendHeaders(defaultHeaders, finish = true)
var data = new string
await strm2.recvHeaders(data)
doAssert false
except HyperxError:
except GracefulShutdownError:
inc checked
doAssert checked == 2

Expand Down Expand Up @@ -162,20 +168,69 @@ testAsync "client graceful close":
inc checked
doAssert checked == 2

# XXX cannot longer create a stream before shutdown
# and send on it after because of lazy stream id
testAsync "send after client graceful close":
var checked = 0
var client = newClient(localHost, localPort)
with client:
let strm = client.newClientStream()
await client.gracefulClose()
try:
with strm:
await strm.sendHeaders(defaultHeaders, finish = true)
doAssert false
except GracefulShutdownError:
inc checked
doAssert checked == 1

testAsync "lazy client stream id":
var order = new seq[int]
order[] = newSeq[int]()
var checked = new int
checked[] = 0
proc stream(client: ClientContext, sleep = 0) {.async.} =
let strm = client.newClientStream()
with strm:
# This is not correct usage
await client.gracefulClose()
for i in 0 .. sleep-1:
await sleepCycle()
#echo $sleep
order[].add sleep
var headers = defaultHeaders
headers.add ("x-no-echo-headers", "true")
await strm.sendHeaders(headers, finish = false)
var data = new string
await strm.recvHeaders(data)
doAssert data[] == ":status: 200\r\n"
data[] = "foobar" & $sleep
await strm.sendBody(data, finish = true)
data[] = ""
await strm.recvBody(data)
doAssert data[] == "foobar" & $sleep
checked[] += 1
checked[] += 1
var client = newClient(localHost, localPort)
with client:
let strm1 = client.stream(200)
let strm2 = client.stream(100)
await strm1
await strm2
doAssert order[] == @[100, 200]
checked[] += 1
doAssert checked[] == 5

testAsync "cancel lazy stream":
var checked = 0
var client = newClient(localHost, localPort)
with client:
let strm = client.newClientStream()
await strm.cancel(hyxCancel)
with strm:
try:
await strm.sendHeaders(defaultHeaders, finish = true)
var data = new string
await strm.recvHeaders(data)
except HyperxConnError:
#doAssert err.code == errNoError
inc checked
doAssert false
except HyperxStrmError as err:
doAssert err.code == hyxStreamClosed
inc checked
inc checked
doAssert checked == 2
4 changes: 2 additions & 2 deletions tests/testclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,10 @@ testAsync "stream error NO_ERROR handling":
await tc.checkHandshake()
let strm = tc.client.newClientStream()
with strm:
await tc.replyNoError(strm.stream.id)
let sendFut = strm.send(dataOut)
let recvFut = strm.recv(dataIn)
await sendFut # this could raise
await sendFut
await tc.replyNoError(strm.stream.id)
await recvFut # this should never raise
doAssert dataIn[] == headers

Expand Down

0 comments on commit 8860133

Please sign in to comment.