Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Latest commit

 

History

History
132 lines (78 loc) · 12.1 KB

QUICStream.md

File metadata and controls

132 lines (78 loc) · 12.1 KB

module QUICStream

streams within a QUIC connection

QUIC Stream - the high-level stream interface for applications

let ((findStream (csm:connection_mutable) (stream:U32.t)):(Stack (pointer_or_null quic_stream) ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):findStreamRec csm.streams.lhead stream

Find a stream within the connection, by its ID

let ((quic_OpenStream (cs:pointer connection) (stream:U32.t)):(Stack (pointer quic_stream) ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); monitorEnter (!*(cs)).monitor; let  csm = conn_get_mutable cs in let  strm = findStream csm stream in let  strm = if is_null strm then (if U32.gt stream csm.maxStreamID then C.String.print (C.String.of_literal "BUGBUG: send STREAM_ID_NEEDED
") else (); let  (strmm:quic_stream_mutable) = {state=StreamIdle appstate=AppOpen nextWriteOffset=0 maxStreamData=csm.defaultMaxStreamData nextReadOffset=0 segments=empty_list readysegments=empty_list partialsegments=empty_list} in let  strmr = FStar.Buffer.rcreate FStar.HyperHeap.root strmm 1 in let  (strmf:quic_stream_fixed) = {streamID=stream hasDataReady=createEvent 0 0 qsm_state=strmr} in let  strm = empty_entry strmf in let  pstrm = FStar.Buffer.rcreate FStar.HyperHeap.root strm 1 in let  list = insertTailList csm.streams pstrm in upd_streams (!*(cs)).csm_state list; pstrm) else strm in monitorExit (!*(cs)).monitor; pop_frame (); strm

Public API: Open a QUIC stream. Returns NULL on failure.

let ((quic_CloseStream (cs:pointer connection) (strm:pointer quic_stream)):(Stack unit ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); monitorEnter (!*(cs)).monitor; let  strmm = strm_get_mutable strm in if =(strmm.appstate, AppOpen) then C.String.print (C.String.of_literal "bugbug: send RST_STREAM with NO_ERROR to indicate that the stream is now closed
") else (); upd_appstate (!*(strm)).p.qsm_state AppClosed; monitorExit (!*(cs)).monitor; pop_frame ()

Public API: Close a stream

let ((quic_RecvStream (cs:pointer connection) (strm:pointer quic_stream) (buffer:buffer_t) (bufferlen:U32.t)):(Stack U32.t ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); waitForSingleObject (!*(strm)).p.hasDataReady 0xffffffff; monitorEnter (!*(cs)).monitor; let  strmm = strm_get_mutable strm in let  seg = strmm.readysegments.lhead in let  list = removeEntryList strmm.readysegments seg in upd_readysegments (!*(strm)).p.qsm_state list; if is_null strmm.readysegments.lhead then resetEvent (!*(strm)).p.hasDataReady else (); if (!*(seg)).p.fin then upd_appstate (!*(strm)).p.qsm_state AppFin else (); monitorExit (!*(cs)).monitor; let  blitlen = (!*(seg)).p.datalength in let  blitlen = if (U32.lt bufferlen blitlen) then blitlen else bufferlen in let  data = (!*(seg)).p.data in FStar.Buffer.blit data 0 buffer 0 blitlen; pop_frame (); blitlen

Public API: Receive data on a stream. This will block until data arrives. It returns the number of bytes written into the buffer.

let ((quic_StreamIsFin (cs:pointer connection) (strm:pointer quic_stream)):(Stack bool ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  strmm = strm_get_mutable strm in let  result = =(strmm.appstate, AppFin) in pop_frame (); result

Public API: Query if the 'fin' marker has been received, for end of the stream

let ((getNextSegment (strm:pointer quic_stream)):(Stack (pointer qstream_segment) ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  strmm = strm_get_mutable strm in let  seg = strmm.segments.lhead in let  list = removeEntryList strmm.segments seg in upd_segments (!*(strm)).p.qsm_state list; pop_frame (); seg

Retrieve the next stream segment from .segments, ready to send

let ((hasMoreToSend (strm:pointer quic_stream)):(Stack bool ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  strmm = strm_get_mutable strm in let  hasmore = if (is_null strmm.segments.lhead) then false else true in pop_frame (); hasmore

Determine if a stream has segments ready to send

let ((splitSegment (seg:pointer qstream_segment) (firstlength:U32.t)):(Stack (*((pointer qstream_segment), (pointer qstream_segment))) ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  data = (!*(seg)).p.data in let  firstdata = FStar.Buffer.rcreate FStar.HyperHeap.root 0 firstlength in Buffer.blit data 0 firstdata 0 firstlength; let  seconddatalength = _ in let  seconddata = FStar.Buffer.rcreate FStar.HyperHeap.root 0 seconddatalength in Buffer.blit data firstlength seconddata 0 seconddatalength; let  (first:qstream_segment_fixed) = {offset=(!*(seg)).p.offset data=firstdata datalength=firstlength fin=false available=createEvent 0 0} in let  first = empty_entry first in let  firstlength64 = Cast.uint32_to_uint64 firstlength in let  secondoffset = _ in let  (second:qstream_segment_fixed) = {offset=secondoffset data=seconddata datalength=seconddatalength fin=(!*(seg)).p.fin available=createEvent 0 0} in let  second = empty_entry second in let  pfirst = FStar.Buffer.rcreate FStar.HyperHeap.root first 1 in let  psecond = FStar.Buffer.rcreate FStar.HyperHeap.root second 1 in pop_frame (); ((FStar.Pervasives.Native.Mktuple2 pfirst psecond))

Merge a newly-arrive segment into the current segment list. This must handle overlapping and duplicated segments arriving out of order.

let ((segmentEnd (seg:pointer qstream_segment)):(Stack U64.t ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  offset = (!*(seg)).p.offset in let  length = (!*(seg)).p.datalength in let  segend = _ in pop_frame (); segend

Compute the end of a segment (offset+datalength)

let ((mergeSegmentsBody (partialsegments:qstream_segment_list) (c:pointer qstream_segment) (seg:pointer qstream_segment)):(Stack (*(qstream_segment_list, bool)) ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  c_offset = (!*(c)).p.offset in let  seg_offset = (!*(seg)).p.offset in let  c_segmentend = segmentEnd c in let  seg_segmentend = segmentEnd seg in let  ret = if &&((U64.lt c_segmentend c_offset), (=(c, partialsegments.lhead))) then (let  partialsegments = insertEntryBefore partialsegments c seg in ((FStar.Pervasives.Native.Mktuple2 partialsegments false))) else if U64.gte seg_offset c_segmentend then (if =(c, partialsegments.ltail) then (let  partialsegments = insertTailList partialsegments seg in ((FStar.Pervasives.Native.Mktuple2 partialsegments false))) else (((FStar.Pervasives.Native.Mktuple2 partialsegments true)))) else if &&((U64.gte seg_offset c_offset), (U64.lte seg_segmentend c_segmentend)) then (((FStar.Pervasives.Native.Mktuple2 partialsegments false))) else if &&((U64.lt seg_offset c_offset), (=(seg_segmentend, c_offset))) then (let  partialsegments = insertEntryBefore partialsegments c seg in ((FStar.Pervasives.Native.Mktuple2 partialsegments false))) else if &&((U64.lt seg_offset c_offset), (U64.gte seg_segmentend c_offset)) then (let  l = _ in let  l32 = Cast.uint64_to_uint32 l in let  (f, _) = splitSegment seg l32 in let  partialsegments = insertEntryBefore partialsegments c f in ((FStar.Pervasives.Native.Mktuple2 partialsegments false))) else (((FStar.Pervasives.Native.Mktuple2 partialsegments true))) in pop_frame (); ret

Body of the do/while loop. Returns the new list, and true to keep iterating, or false to stop

let ((mergeSegments (strm:pointer quic_stream) (seg:pointer qstream_segment)):(Stack unit ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  strmm = strm_get_mutable strm in let  list = strmm.partialsegments in let  list = if is_null list.lhead then (insertHeadList list seg) else (let  lv = list.ltail in if U64.gte (!*(seg)).p.offset (segmentEnd lv) then (insertTailList list seg) else if U64.gte (!*(seg)).p.offset (!*(lv)).p.offset then (let  l64 = _ in let  l = Cast.uint64_to_uint32 l64 in if U32.lt l (!*(seg)).p.datalength then (let  (_, s) = splitSegment seg l in insertTailList list s) else (list)) else (let  (c:(pointer (pointer qstream_segment))) = Buffer.create list.lhead 1 in let  listmutable = Buffer.create list 1 in C.Loops.do_while ((fun h break -> /\(live h c, (==>(break, False))))) ((fun _ -> let  (list, keepgoing) = mergeSegmentsBody list (!*(c)) seg in if not keepgoing then (.()<-(listmutable, 0, list)) else (); .()<-(c, 0, (!*((!*(c)))).flink); not keepgoing)); .()(listmutable, 0))) in upd_partialsegments (!*(strm)).p.qsm_state list; pop_frame ()

Search inside the partialsegments list... if seg is entirely in a gap, insert it if seg overlaps with another... if it is exactly overlapping, drop it if it overlaps from the front, insert before, after truncting seg if it overlaps from the rear, insert after, after pruning off the start

let ((makeStreamDataAvailable (strm:pointer quic_stream) (seg:pointer qstream_segment)):(Stack unit ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  strmm = strm_get_mutable strm in let  datalength = (!*(seg)).p.datalength in let  datalength64 = Cast.uint32_to_uint64 datalength in upd_nextReadOffset (!*(strm)).p.qsm_state _; let  list = insertTailList strmm.readysegments seg in upd_readysegments (!*(strm)).p.qsm_state list; setEvent (!*(strm)).p.hasDataReady; pop_frame ()

Indicate new data is ready for the application to receive on a stream

let ((streamRecv (cs:pointer connection) (id:U32.t) (seg:pointer qstream_segment)):(Stack unit ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  strm = quic_OpenStream cs id in let  strmm = strm_get_mutable strm in if =((!*(seg)).p.offset, strmm.nextReadOffset) then (makeStreamDataAvailable strm seg) else if U64.gt (segmentEnd seg) strmm.nextReadOffset then (mergeSegments strm seg; C.Loops.do_while ((fun h _ -> true)) ((fun _ -> let  strmm = strm_get_mutable strm in let  seg = strmm.partialsegments.lhead in let  seg_offset = (!*(seg)).p.offset in let  seg_end = segmentEnd seg in if &&((U64.lte seg_offset strmm.nextReadOffset), (U64.gt seg_end strmm.nextReadOffset)) then (let  list = removeEntryList strmm.partialsegments seg in upd_partialsegments (!*(strm)).p.qsm_state list; let  seg = if U64.lt seg_offset strmm.nextReadOffset then (let  l = _ in let  (_, s) = splitSegment seg (Cast.uint64_to_uint32 l) in s) else seg in makeStreamDataAvailable strm seg; let  strmm = strm_get_mutable strm in is_null strmm.partialsegments.lhead) else (true))); C.String.print (C.String.of_literal "Merge of out-of-order data is NYI")) else (); pop_frame ()

Stream data has arrived from the peer. Merge it in, taking care of out-of-order and partial/overlapping/disjoint segments

let ((setMaxStreamData (cs:pointer connection) (stream:U32.t) (maxdata:U64.t)):(Stack unit ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  strm = quic_OpenStream cs stream in let  strmm = strm_get_mutable strm in let  maxdata = if U64.gt maxdata strmm.maxStreamData then maxdata else strmm.maxStreamData in upd_maxStreamData (!*(strm)).p.qsm_state maxdata; pop_frame ()

Set the maximum allowable offset for a given stream

let ((ackStream (cs:pointer connection) (t:streamRecoveryTracker)):(Stack unit ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); setEvent (!*((t.segment))).p.available; pop_frame ()

Called when a previously sent stream segment has been ACK'd by the peer. This releases the data

let ((lostStream (cs:pointer connection) (t:streamRecoveryTracker)):(Stack unit ((requires ((fun _ -> true)))) ((ensures ((fun _ _ _ -> true)))))):push_frame (); let  csm = conn_get_mutable cs in let  strm = findStream csm t.recoverystreamID in let  strmm = strm_get_mutable strm in let  seg = t.segment in let  list = insertHeadList strmm.segments seg in upd_segments (!*(strm)).p.qsm_state list; setEvent (!*(cs)).dataReadyToSend; pop_frame ()

Called by LossandCongestion when it determines that a packet has been lost, and the stream data must be retransmitted.