Skip to content

Commit

Permalink
Content-Length, chunked transfer encoding and Host header improvements (
Browse files Browse the repository at this point in the history
#2563)

* Content-Length, chunked transfer encoding and Host header improvements

* Fix docs

* Fixes
  • Loading branch information
vigoo authored Jan 6, 2024
1 parent f2ce5cf commit 4d30f47
Show file tree
Hide file tree
Showing 29 changed files with 241 additions and 134 deletions.
6 changes: 3 additions & 3 deletions docs/dsl/body.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ To create an `Body` that encodes a Stream you can use `Body.fromStream`.
- Using a Stream of Bytes

```scala mdoc:silent
val streamHttpData1: Body = Body.fromStream(ZStream.fromChunk(Chunk.fromArray("Some String".getBytes(Charsets.Http))))
val streamHttpData1: Body = Body.fromStreamChunked(ZStream.fromChunk(Chunk.fromArray("Some String".getBytes(Charsets.Http))))
```

- Using a Stream of String

```scala mdoc:silent
val streamHttpData2: Body = Body.fromCharSequenceStream(ZStream("a", "b", "c"), Charsets.Http)
val streamHttpData2: Body = Body.fromCharSequenceStreamChunked(ZStream("a", "b", "c"), Charsets.Http)
```

### Creating a Body from a `File`

To create an `Body` that encodes a File you can use `Body.fromFile`:

```scala mdoc:silent:crash
val fileHttpData: Body = Body.fromFile(new java.io.File(getClass.getResource("/fileName.txt").getPath))
val fileHttpData: ZIO[Any, Nothing, Body] = Body.fromFile(new java.io.File(getClass.getResource("/fileName.txt").getPath))
```
4 changes: 1 addition & 3 deletions docs/dsl/headers.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ object SimpleResponseDispatcher extends ZIOAppDefault {
if (acceptsStreaming)
Response(
status = Status.Ok,
// Setting response header
headers = Headers(Header.ContentLength(message.length.toLong)), // adding CONTENT-LENGTH header
body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
)
else {
// Adding a custom header to Response
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/advanced/streaming-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object FileStreaming extends ZIOAppDefault {

// Read the file as ZStream
// Uses the blocking version of ZStream.fromFile
Method.GET / "blocking" -> Handler.fromStream(ZStream.fromPath(Paths.get("README.md"))),
Method.GET / "blocking" -> Handler.fromStreamChunked(ZStream.fromPath(Paths.get("README.md"))),

// Uses netty's capability to write file content to the Channel
// Content-type response headers are automatically identified and added
Expand Down
3 changes: 1 addition & 2 deletions docs/examples/advanced/streaming-response.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ object StreamingResponse extends ZIOAppDefault {
handler(
http.Response(
status = Status.Ok,
headers = Headers(Header.ContentLength(message.length.toLong)),
body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
),
),
).toHttpApp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[cli] object Retriever {

override def retrieve(): Task[FormField] =
for {
chunk <- Body.fromFile(new java.io.File(path.toUri())).asChunk
chunk <- Body.fromFile(new java.io.File(path.toUri())).flatMap(_.asChunk)
} yield FormField.binaryField(name, chunk, media)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object FileStreaming extends ZIOAppDefault {

// Read the file as ZStream
// Uses the blocking version of ZStream.fromFile
Method.GET / "blocking" -> Handler.fromStream(ZStream.fromPath(Paths.get("README.md"))),
Method.GET / "blocking" -> Handler.fromStreamChunked(ZStream.fromPath(Paths.get("README.md"))),

// Uses netty's capability to write file content to the Channel
// Content-type response headers are automatically identified and added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object RequestStreaming extends ZIOAppDefault {

// Creating HttpData from the stream
// This works for file of any size
val data = Body.fromStream(stream)
val data = Body.fromStreamChunked(stream)

Response(body = data)
}).toHttpApp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ object StreamingResponse extends ZIOAppDefault {
handler(
http.Response(
status = Status.Ok,
headers = Headers(Header.ContentLength(message.length.toLong)),
body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
),
),
).toHttpApp
Expand Down
63 changes: 51 additions & 12 deletions zio-http/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package zio.http

import java.io.FileInputStream
import java.io.{FileInputStream, IOException}
import java.nio.charset._
import java.nio.file._

Expand Down Expand Up @@ -120,6 +120,11 @@ trait Body { self =>
*/
def isComplete: Boolean

/**
* Returns whether or not the content length is known
*/
def knownContentLength: Option[Long]

/**
* Returns whether or not the body is known to be empty. Note that some bodies
* may not be known to be empty until an attempt is made to consume them.
Expand Down Expand Up @@ -174,8 +179,10 @@ object Body {
/**
* Constructs a [[zio.http.Body]] from the contents of a file.
*/
def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4): Body =
FileBody(file, chunkSize)
def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4)(implicit trace: Trace): ZIO[Any, Nothing, Body] =
ZIO.succeed(file.length()).map { fileSize =>
FileBody(file, chunkSize, fileSize)
}

/**
* Constructs a [[zio.http.Body]] from from form data, using multipart
Expand All @@ -187,7 +194,7 @@ object Body {
)(implicit trace: Trace): Body = {
val bytes = form.multipartBytes(specificBoundary)

StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
}

/**
Expand All @@ -199,26 +206,48 @@ object Body {
form: Form,
)(implicit trace: Trace): UIO[Body] =
form.multipartBytesUUID.map { case (boundary, bytes) =>
StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(boundary))
StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(boundary))
}

/**
* Constructs a [[zio.http.Body]] from a stream of bytes.
* Constructs a [[zio.http.Body]] from a stream of bytes with a known length.
*/
def fromStream(stream: ZStream[Any, Throwable, Byte], contentLength: Long): Body =
StreamBody(stream, knownContentLength = Some(contentLength))

/**
* Constructs a [[zio.http.Body]] from a stream of bytes of unknown length,
* using chunked transfer encoding.
*/
def fromStream(stream: ZStream[Any, Throwable, Byte]): Body =
StreamBody(stream)
def fromStreamChunked(stream: ZStream[Any, Throwable, Byte]): Body =
StreamBody(stream, knownContentLength = None)

/**
* Constructs a [[zio.http.Body]] from a stream of text, using the specified
* character set, which defaults to the HTTP character set.
* Constructs a [[zio.http.Body]] from a stream of text with known length,
* using the specified character set, which defaults to the HTTP character
* set.
*/
def fromCharSequenceStream(
stream: ZStream[Any, Throwable, CharSequence],
contentLength: Long,
charset: Charset = Charsets.Http,
)(implicit
trace: Trace,
): Body =
fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks)
fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks, contentLength)

/**
* Constructs a [[zio.http.Body]] from a stream of text with unknown length
* using chunked transfer encoding, using the specified character set, which
* defaults to the HTTP character set.
*/
def fromCharSequenceStreamChunked(
stream: ZStream[Any, Throwable, CharSequence],
charset: Charset = Charsets.Http,
)(implicit
trace: Trace,
): Body =
fromStreamChunked(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks)

/**
* Helper to create Body from String
Expand Down Expand Up @@ -269,6 +298,8 @@ object Body {
override def contentType(newMediaType: MediaType): Body = EmptyBody

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body = EmptyBody

override def knownContentLength: Option[Long] = Some(0L)
}

private[zio] final case class ChunkBody(
Expand Down Expand Up @@ -298,6 +329,8 @@ object Body {

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))

override def knownContentLength: Option[Long] = Some(data.length.toLong)
}

private[zio] final case class ArrayBody(
Expand Down Expand Up @@ -330,8 +363,9 @@ object Body {
}

private[zio] final case class FileBody(
val file: java.io.File,
file: java.io.File,
chunkSize: Int = 1024 * 4,
fileSize: Long,
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body
Expand Down Expand Up @@ -375,10 +409,13 @@ object Body {

override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))

override def knownContentLength: Option[Long] = Some(fileSize)
}

private[zio] final case class StreamBody(
stream: ZStream[Any, Throwable, Byte],
knownContentLength: Option[Long],
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body {
Expand Down Expand Up @@ -421,6 +458,8 @@ object Body {

def contentType(newMediaType: zio.http.MediaType, newBoundary: zio.http.Boundary): zio.http.Body = this

override def knownContentLength: Option[Long] = Some(0L)

}

private val zioEmptyArray = ZIO.succeed(Array.empty[Byte])(Trace.empty)
Expand Down
64 changes: 45 additions & 19 deletions zio-http/src/main/scala/zio/http/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -832,17 +832,18 @@ object Handler {
ZIO.fail(new AccessDeniedException(file.getAbsolutePath))
} else {
if (file.isFile) {
val length = Headers(Header.ContentLength(file.length()))
val response = http.Response(headers = length, body = Body.fromFile(file))
val pathName = file.toPath.toString

// Set MIME type in the response headers. This is only relevant in
// case of RandomAccessFile transfers as browsers use the MIME type,
// not the file extension, to determine how to process a URL.
// {{{<a href="MSDN Doc">https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type</a>}}}
determineMediaType(pathName) match {
case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType)))
case None => ZIO.succeed(response)
Body.fromFile(file).flatMap { body =>
val response = http.Response(body = body)
val pathName = file.toPath.toString

// Set MIME type in the response headers. This is only relevant in
// case of RandomAccessFile transfers as browsers use the MIME type,
// not the file extension, to determine how to process a URL.
// {{{<a href="MSDN Doc">https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type</a>}}}
determineMediaType(pathName) match {
case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType)))
case None => ZIO.succeed(response)
}
}
} else {
ZIO.fail(new NotDirectoryException(s"Found directory instead of a file."))
Expand Down Expand Up @@ -897,11 +898,10 @@ object Handler {
.acquireReleaseWith(openZip)(closeZip)
.mapZIO(jar => ZIO.attemptBlocking(jar.getEntry(resourcePath) -> jar))
.flatMap { case (entry, jar) => ZStream.fromInputStream(jar.getInputStream(entry)) }
response = Response(body = Body.fromStream(inZStream))
response = Response(body = Body.fromStream(inZStream, contentLength))
} yield mediaType.fold(response) { t =>
response
.addHeader(Header.ContentType(t))
.addHeader(Header.ContentLength(contentLength))
}
}
}
Expand All @@ -913,27 +913,53 @@ object Handler {

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream as the body
* provided ZStream with a known content length as the body
*/
def fromStream[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit
def fromStream[R](stream: ZStream[R, Throwable, String], contentLength: Long, charset: Charset = Charsets.Http)(
implicit trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), contentLength, charset))
}
}.flatten

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream with a known content length as the body
*/
def fromStream[R](stream: ZStream[R, Throwable, Byte], contentLength: Long)(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromStream(stream.provideEnvironment(env), contentLength))
}
}.flatten

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream as the body using chunked transfer encoding
*/
def fromStreamChunked[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), charset))
fromBody(Body.fromCharSequenceStreamChunked(stream.provideEnvironment(env), charset))
}
}.flatten

/**
* Creates a Handler that always succeeds with a 200 status code and the
* provided ZStream as the body
* provided ZStream as the body using chunked transfer encoding
*/
def fromStream[R](stream: ZStream[R, Throwable, Byte])(implicit
def fromStreamChunked[R](stream: ZStream[R, Throwable, Byte])(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
fromBody(Body.fromStream(stream.provideEnvironment(env)))
fromBody(Body.fromStreamChunked(stream.provideEnvironment(env)))
}
}.flatten

Expand Down
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zio/http/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object Response {
* \- stream of data to be sent as Server Sent Events
*/
def fromServerSentEvents(data: ZStream[Any, Nothing, ServerSentEvent])(implicit trace: Trace): Response =
Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStream(data.map(_.encode)))
Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStreamChunked(data.map(_.encode)))

/**
* Creates a new response for the provided socket app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ private[http] object BodyCodec {
ZIO.succeed((body.asStream >>> ZPipeline.decodeCharsWith(Charset.defaultCharset()) >>> codec.streamDecoder).orDie)

def encodeToBody(value: ZStream[Any, Nothing, E], codec: BinaryCodec[E])(implicit trace: Trace): Body =
Body.fromStream(value >>> codec.streamEncoder)
Body.fromStreamChunked(value >>> codec.streamEncoder)

def encodeToBody(value: ZStream[Any, Nothing, E], codec: Codec[String, Char, E])(implicit trace: Trace): Body =
Body.fromStream(value >>> codec.streamEncoder.map(_.toByte))
Body.fromStreamChunked(value >>> codec.streamEncoder.map(_.toByte))

type Element = E
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,13 +530,15 @@ private[codec] object EncoderDecoder {
} else None
private def encodeBody(inputs: Array[Any], contentType: => Header.ContentType): Body = {
if (isByteStream) {
Body.fromStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]])
Body.fromStreamChunked(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]])
} else {
if (inputs.length > 1) {
Body.fromMultipartForm(encodeMultipartFormData(inputs), formBoundary)
} else {
if (isEventStream) {
Body.fromCharSequenceStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode))
Body.fromCharSequenceStreamChunked(
inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode),
)
} else if (inputs.length < 1) {
Body.empty
} else {
Expand Down
Loading

0 comments on commit 4d30f47

Please sign in to comment.