diff --git a/docs/dsl/body.md b/docs/dsl/body.md
index 7cedab6ce..3727c5c53 100644
--- a/docs/dsl/body.md
+++ b/docs/dsl/body.md
@@ -59,13 +59,13 @@ To create an `Body` that encodes a Stream you can use `Body.fromStream`.
- Using a Stream of Bytes
```scala mdoc:silent
- val streamHttpData1: Body = Body.fromStream(ZStream.fromChunk(Chunk.fromArray("Some String".getBytes(Charsets.Http))))
+ val streamHttpData1: Body = Body.fromStreamChunked(ZStream.fromChunk(Chunk.fromArray("Some String".getBytes(Charsets.Http))))
```
- Using a Stream of String
```scala mdoc:silent
- val streamHttpData2: Body = Body.fromCharSequenceStream(ZStream("a", "b", "c"), Charsets.Http)
+ val streamHttpData2: Body = Body.fromCharSequenceStreamChunked(ZStream("a", "b", "c"), Charsets.Http)
```
### Creating a Body from a `File`
@@ -73,5 +73,5 @@ To create an `Body` that encodes a Stream you can use `Body.fromStream`.
To create an `Body` that encodes a File you can use `Body.fromFile`:
```scala mdoc:silent:crash
- val fileHttpData: Body = Body.fromFile(new java.io.File(getClass.getResource("/fileName.txt").getPath))
+ val fileHttpData: ZIO[Any, Nothing, Body] = Body.fromFile(new java.io.File(getClass.getResource("/fileName.txt").getPath))
```
diff --git a/docs/dsl/headers.md b/docs/dsl/headers.md
index 520e78377..2cdcc8fa3 100644
--- a/docs/dsl/headers.md
+++ b/docs/dsl/headers.md
@@ -88,9 +88,7 @@ object SimpleResponseDispatcher extends ZIOAppDefault {
if (acceptsStreaming)
Response(
status = Status.Ok,
- // Setting response header
- headers = Headers(Header.ContentLength(message.length.toLong)), // adding CONTENT-LENGTH header
- body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
+ body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
)
else {
// Adding a custom header to Response
diff --git a/docs/examples/advanced/streaming-file.md b/docs/examples/advanced/streaming-file.md
index fb0e1b455..2e02449e1 100644
--- a/docs/examples/advanced/streaming-file.md
+++ b/docs/examples/advanced/streaming-file.md
@@ -22,7 +22,7 @@ object FileStreaming extends ZIOAppDefault {
// Read the file as ZStream
// Uses the blocking version of ZStream.fromFile
- Method.GET / "blocking" -> Handler.fromStream(ZStream.fromPath(Paths.get("README.md"))),
+ Method.GET / "blocking" -> Handler.fromStreamChunked(ZStream.fromPath(Paths.get("README.md"))),
// Uses netty's capability to write file content to the Channel
// Content-type response headers are automatically identified and added
diff --git a/docs/examples/advanced/streaming-response.md b/docs/examples/advanced/streaming-response.md
index 1389e67d4..c70f987b4 100644
--- a/docs/examples/advanced/streaming-response.md
+++ b/docs/examples/advanced/streaming-response.md
@@ -30,8 +30,7 @@ object StreamingResponse extends ZIOAppDefault {
handler(
http.Response(
status = Status.Ok,
- headers = Headers(Header.ContentLength(message.length.toLong)),
- body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
+ body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
),
),
).toHttpApp
diff --git a/zio-http-cli/src/main/scala/zio/http/endpoint/cli/Retriever.scala b/zio-http-cli/src/main/scala/zio/http/endpoint/cli/Retriever.scala
index f50ed0964..d3d30b9e9 100644
--- a/zio-http-cli/src/main/scala/zio/http/endpoint/cli/Retriever.scala
+++ b/zio-http-cli/src/main/scala/zio/http/endpoint/cli/Retriever.scala
@@ -50,7 +50,7 @@ private[cli] object Retriever {
override def retrieve(): Task[FormField] =
for {
- chunk <- Body.fromFile(new java.io.File(path.toUri())).asChunk
+ chunk <- Body.fromFile(new java.io.File(path.toUri())).flatMap(_.asChunk)
} yield FormField.binaryField(name, chunk, media)
}
diff --git a/zio-http-example/src/main/scala/example/FileStreaming.scala b/zio-http-example/src/main/scala/example/FileStreaming.scala
index 5bfc67626..dbfd6b9f9 100644
--- a/zio-http-example/src/main/scala/example/FileStreaming.scala
+++ b/zio-http-example/src/main/scala/example/FileStreaming.scala
@@ -17,7 +17,7 @@ object FileStreaming extends ZIOAppDefault {
// Read the file as ZStream
// Uses the blocking version of ZStream.fromFile
- Method.GET / "blocking" -> Handler.fromStream(ZStream.fromPath(Paths.get("README.md"))),
+ Method.GET / "blocking" -> Handler.fromStreamChunked(ZStream.fromPath(Paths.get("README.md"))),
// Uses netty's capability to write file content to the Channel
// Content-type response headers are automatically identified and added
diff --git a/zio-http-example/src/main/scala/example/RequestStreaming.scala b/zio-http-example/src/main/scala/example/RequestStreaming.scala
index 5535bcb4a..bd8166a29 100644
--- a/zio-http-example/src/main/scala/example/RequestStreaming.scala
+++ b/zio-http-example/src/main/scala/example/RequestStreaming.scala
@@ -14,7 +14,7 @@ object RequestStreaming extends ZIOAppDefault {
// Creating HttpData from the stream
// This works for file of any size
- val data = Body.fromStream(stream)
+ val data = Body.fromStreamChunked(stream)
Response(body = data)
}).toHttpApp
diff --git a/zio-http-example/src/main/scala/example/StreamingResponse.scala b/zio-http-example/src/main/scala/example/StreamingResponse.scala
index 2af65b16b..e884b06a6 100644
--- a/zio-http-example/src/main/scala/example/StreamingResponse.scala
+++ b/zio-http-example/src/main/scala/example/StreamingResponse.scala
@@ -25,8 +25,7 @@ object StreamingResponse extends ZIOAppDefault {
handler(
http.Response(
status = Status.Ok,
- headers = Headers(Header.ContentLength(message.length.toLong)),
- body = Body.fromStream(ZStream.fromChunk(message)), // Encoding content using a ZStream
+ body = Body.fromStream(ZStream.fromChunk(message), message.length.toLong), // Encoding content using a ZStream
),
),
).toHttpApp
diff --git a/zio-http/src/main/scala/zio/http/Body.scala b/zio-http/src/main/scala/zio/http/Body.scala
index 084784cbc..d3a9de1cb 100644
--- a/zio-http/src/main/scala/zio/http/Body.scala
+++ b/zio-http/src/main/scala/zio/http/Body.scala
@@ -16,7 +16,7 @@
package zio.http
-import java.io.FileInputStream
+import java.io.{FileInputStream, IOException}
import java.nio.charset._
import java.nio.file._
@@ -120,6 +120,11 @@ trait Body { self =>
*/
def isComplete: Boolean
+ /**
+ * Returns whether or not the content length is known
+ */
+ def knownContentLength: Option[Long]
+
/**
* Returns whether or not the body is known to be empty. Note that some bodies
* may not be known to be empty until an attempt is made to consume them.
@@ -174,8 +179,10 @@ object Body {
/**
* Constructs a [[zio.http.Body]] from the contents of a file.
*/
- def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4): Body =
- FileBody(file, chunkSize)
+ def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4)(implicit trace: Trace): ZIO[Any, Nothing, Body] =
+ ZIO.succeed(file.length()).map { fileSize =>
+ FileBody(file, chunkSize, fileSize)
+ }
/**
* Constructs a [[zio.http.Body]] from from form data, using multipart
@@ -187,7 +194,7 @@ object Body {
)(implicit trace: Trace): Body = {
val bytes = form.multipartBytes(specificBoundary)
- StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
+ StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(specificBoundary))
}
/**
@@ -199,26 +206,48 @@ object Body {
form: Form,
)(implicit trace: Trace): UIO[Body] =
form.multipartBytesUUID.map { case (boundary, bytes) =>
- StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(boundary))
+ StreamBody(bytes, knownContentLength = None, Some(MediaType.multipart.`form-data`), Some(boundary))
}
/**
- * Constructs a [[zio.http.Body]] from a stream of bytes.
+ * Constructs a [[zio.http.Body]] from a stream of bytes with a known length.
+ */
+ def fromStream(stream: ZStream[Any, Throwable, Byte], contentLength: Long): Body =
+ StreamBody(stream, knownContentLength = Some(contentLength))
+
+ /**
+ * Constructs a [[zio.http.Body]] from a stream of bytes of unknown length,
+ * using chunked transfer encoding.
*/
- def fromStream(stream: ZStream[Any, Throwable, Byte]): Body =
- StreamBody(stream)
+ def fromStreamChunked(stream: ZStream[Any, Throwable, Byte]): Body =
+ StreamBody(stream, knownContentLength = None)
/**
- * Constructs a [[zio.http.Body]] from a stream of text, using the specified
- * character set, which defaults to the HTTP character set.
+ * Constructs a [[zio.http.Body]] from a stream of text with known length,
+ * using the specified character set, which defaults to the HTTP character
+ * set.
*/
def fromCharSequenceStream(
stream: ZStream[Any, Throwable, CharSequence],
+ contentLength: Long,
charset: Charset = Charsets.Http,
)(implicit
trace: Trace,
): Body =
- fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks)
+ fromStream(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks, contentLength)
+
+ /**
+ * Constructs a [[zio.http.Body]] from a stream of text with unknown length
+ * using chunked transfer encoding, using the specified character set, which
+ * defaults to the HTTP character set.
+ */
+ def fromCharSequenceStreamChunked(
+ stream: ZStream[Any, Throwable, CharSequence],
+ charset: Charset = Charsets.Http,
+ )(implicit
+ trace: Trace,
+ ): Body =
+ fromStreamChunked(stream.map(seq => Chunk.fromArray(seq.toString.getBytes(charset))).flattenChunks)
/**
* Helper to create Body from String
@@ -269,6 +298,8 @@ object Body {
override def contentType(newMediaType: MediaType): Body = EmptyBody
override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body = EmptyBody
+
+ override def knownContentLength: Option[Long] = Some(0L)
}
private[zio] final case class ChunkBody(
@@ -298,6 +329,8 @@ object Body {
override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))
+
+ override def knownContentLength: Option[Long] = Some(data.length.toLong)
}
private[zio] final case class ArrayBody(
@@ -330,8 +363,9 @@ object Body {
}
private[zio] final case class FileBody(
- val file: java.io.File,
+ file: java.io.File,
chunkSize: Int = 1024 * 4,
+ fileSize: Long,
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body
@@ -375,10 +409,13 @@ object Body {
override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))
+
+ override def knownContentLength: Option[Long] = Some(fileSize)
}
private[zio] final case class StreamBody(
stream: ZStream[Any, Throwable, Byte],
+ knownContentLength: Option[Long],
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body {
@@ -421,6 +458,8 @@ object Body {
def contentType(newMediaType: zio.http.MediaType, newBoundary: zio.http.Boundary): zio.http.Body = this
+ override def knownContentLength: Option[Long] = Some(0L)
+
}
private val zioEmptyArray = ZIO.succeed(Array.empty[Byte])(Trace.empty)
diff --git a/zio-http/src/main/scala/zio/http/Handler.scala b/zio-http/src/main/scala/zio/http/Handler.scala
index 851a8be2a..8556e5d1f 100644
--- a/zio-http/src/main/scala/zio/http/Handler.scala
+++ b/zio-http/src/main/scala/zio/http/Handler.scala
@@ -832,17 +832,18 @@ object Handler {
ZIO.fail(new AccessDeniedException(file.getAbsolutePath))
} else {
if (file.isFile) {
- val length = Headers(Header.ContentLength(file.length()))
- val response = http.Response(headers = length, body = Body.fromFile(file))
- val pathName = file.toPath.toString
-
- // Set MIME type in the response headers. This is only relevant in
- // case of RandomAccessFile transfers as browsers use the MIME type,
- // not the file extension, to determine how to process a URL.
- // {{{https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type}}}
- determineMediaType(pathName) match {
- case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType)))
- case None => ZIO.succeed(response)
+ Body.fromFile(file).flatMap { body =>
+ val response = http.Response(body = body)
+ val pathName = file.toPath.toString
+
+ // Set MIME type in the response headers. This is only relevant in
+ // case of RandomAccessFile transfers as browsers use the MIME type,
+ // not the file extension, to determine how to process a URL.
+ // {{{https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type}}}
+ determineMediaType(pathName) match {
+ case Some(mediaType) => ZIO.succeed(response.addHeader(Header.ContentType(mediaType)))
+ case None => ZIO.succeed(response)
+ }
}
} else {
ZIO.fail(new NotDirectoryException(s"Found directory instead of a file."))
@@ -897,11 +898,10 @@ object Handler {
.acquireReleaseWith(openZip)(closeZip)
.mapZIO(jar => ZIO.attemptBlocking(jar.getEntry(resourcePath) -> jar))
.flatMap { case (entry, jar) => ZStream.fromInputStream(jar.getInputStream(entry)) }
- response = Response(body = Body.fromStream(inZStream))
+ response = Response(body = Body.fromStream(inZStream, contentLength))
} yield mediaType.fold(response) { t =>
response
.addHeader(Header.ContentType(t))
- .addHeader(Header.ContentLength(contentLength))
}
}
}
@@ -913,27 +913,53 @@ object Handler {
/**
* Creates a Handler that always succeeds with a 200 status code and the
- * provided ZStream as the body
+ * provided ZStream with a known content length as the body
*/
- def fromStream[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit
+ def fromStream[R](stream: ZStream[R, Throwable, String], contentLength: Long, charset: Charset = Charsets.Http)(
+ implicit trace: Trace,
+ ): Handler[R, Throwable, Any, Response] =
+ Handler.fromZIO {
+ ZIO.environment[R].map { env =>
+ fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), contentLength, charset))
+ }
+ }.flatten
+
+ /**
+ * Creates a Handler that always succeeds with a 200 status code and the
+ * provided ZStream with a known content length as the body
+ */
+ def fromStream[R](stream: ZStream[R, Throwable, Byte], contentLength: Long)(implicit
+ trace: Trace,
+ ): Handler[R, Throwable, Any, Response] =
+ Handler.fromZIO {
+ ZIO.environment[R].map { env =>
+ fromBody(Body.fromStream(stream.provideEnvironment(env), contentLength))
+ }
+ }.flatten
+
+ /**
+ * Creates a Handler that always succeeds with a 200 status code and the
+ * provided ZStream as the body using chunked transfer encoding
+ */
+ def fromStreamChunked[R](stream: ZStream[R, Throwable, String], charset: Charset = Charsets.Http)(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
- fromBody(Body.fromCharSequenceStream(stream.provideEnvironment(env), charset))
+ fromBody(Body.fromCharSequenceStreamChunked(stream.provideEnvironment(env), charset))
}
}.flatten
/**
* Creates a Handler that always succeeds with a 200 status code and the
- * provided ZStream as the body
+ * provided ZStream as the body using chunked transfer encoding
*/
- def fromStream[R](stream: ZStream[R, Throwable, Byte])(implicit
+ def fromStreamChunked[R](stream: ZStream[R, Throwable, Byte])(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
Handler.fromZIO {
ZIO.environment[R].map { env =>
- fromBody(Body.fromStream(stream.provideEnvironment(env)))
+ fromBody(Body.fromStreamChunked(stream.provideEnvironment(env)))
}
}.flatten
diff --git a/zio-http/src/main/scala/zio/http/Response.scala b/zio-http/src/main/scala/zio/http/Response.scala
index bba283a2f..0cb0b52e6 100644
--- a/zio-http/src/main/scala/zio/http/Response.scala
+++ b/zio-http/src/main/scala/zio/http/Response.scala
@@ -181,7 +181,7 @@ object Response {
* \- stream of data to be sent as Server Sent Events
*/
def fromServerSentEvents(data: ZStream[Any, Nothing, ServerSentEvent])(implicit trace: Trace): Response =
- Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStream(data.map(_.encode)))
+ Response(Status.Ok, contentTypeEventStream, Body.fromCharSequenceStreamChunked(data.map(_.encode)))
/**
* Creates a new response for the provided socket app
diff --git a/zio-http/src/main/scala/zio/http/codec/internal/BodyCodec.scala b/zio-http/src/main/scala/zio/http/codec/internal/BodyCodec.scala
index a03465f2f..4d09af86f 100644
--- a/zio-http/src/main/scala/zio/http/codec/internal/BodyCodec.scala
+++ b/zio-http/src/main/scala/zio/http/codec/internal/BodyCodec.scala
@@ -143,10 +143,10 @@ private[http] object BodyCodec {
ZIO.succeed((body.asStream >>> ZPipeline.decodeCharsWith(Charset.defaultCharset()) >>> codec.streamDecoder).orDie)
def encodeToBody(value: ZStream[Any, Nothing, E], codec: BinaryCodec[E])(implicit trace: Trace): Body =
- Body.fromStream(value >>> codec.streamEncoder)
+ Body.fromStreamChunked(value >>> codec.streamEncoder)
def encodeToBody(value: ZStream[Any, Nothing, E], codec: Codec[String, Char, E])(implicit trace: Trace): Body =
- Body.fromStream(value >>> codec.streamEncoder.map(_.toByte))
+ Body.fromStreamChunked(value >>> codec.streamEncoder.map(_.toByte))
type Element = E
}
diff --git a/zio-http/src/main/scala/zio/http/codec/internal/EncoderDecoder.scala b/zio-http/src/main/scala/zio/http/codec/internal/EncoderDecoder.scala
index 6fee2f52d..bb7afab79 100644
--- a/zio-http/src/main/scala/zio/http/codec/internal/EncoderDecoder.scala
+++ b/zio-http/src/main/scala/zio/http/codec/internal/EncoderDecoder.scala
@@ -530,13 +530,15 @@ private[codec] object EncoderDecoder {
} else None
private def encodeBody(inputs: Array[Any], contentType: => Header.ContentType): Body = {
if (isByteStream) {
- Body.fromStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]])
+ Body.fromStreamChunked(inputs(0).asInstanceOf[ZStream[Any, Nothing, Byte]])
} else {
if (inputs.length > 1) {
Body.fromMultipartForm(encodeMultipartFormData(inputs), formBoundary)
} else {
if (isEventStream) {
- Body.fromCharSequenceStream(inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode))
+ Body.fromCharSequenceStreamChunked(
+ inputs(0).asInstanceOf[ZStream[Any, Nothing, ServerSentEvent]].map(_.encode),
+ )
} else if (inputs.length < 1) {
Body.empty
} else {
diff --git a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala
index cac6af0a5..5cd83ac9d 100644
--- a/zio-http/src/main/scala/zio/http/netty/NettyBody.scala
+++ b/zio-http/src/main/scala/zio/http/netty/NettyBody.scala
@@ -39,8 +39,14 @@ object NettyBody extends BodyEncoding {
private[zio] def fromAsync(
unsafeAsync: UnsafeAsync => Unit,
+ knownContentLength: Option[Long],
contentTypeHeader: Option[Header.ContentType] = None,
- ): Body = AsyncBody(unsafeAsync, contentTypeHeader.map(_.mediaType), contentTypeHeader.flatMap(_.boundary))
+ ): Body = AsyncBody(
+ unsafeAsync,
+ knownContentLength,
+ contentTypeHeader.map(_.mediaType),
+ contentTypeHeader.flatMap(_.boundary),
+ )
/**
* Helper to create Body from ByteBuf
@@ -79,6 +85,8 @@ object NettyBody extends BodyEncoding {
override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))
+
+ override def knownContentLength: Option[Long] = Some(asciiString.length().toLong)
}
private[zio] final case class ByteBufBody(
@@ -109,10 +117,13 @@ object NettyBody extends BodyEncoding {
override def contentType(newMediaType: MediaType, newBoundary: Boundary): Body =
copy(mediaType = Some(newMediaType), boundary = boundary.orElse(Some(newBoundary)))
+
+ override def knownContentLength: Option[Long] = Some(byteBuf.readableBytes().toLong)
}
private[zio] final case class AsyncBody(
unsafeAsync: UnsafeAsync => Unit,
+ knownContentLength: Option[Long],
override val mediaType: Option[MediaType] = None,
override val boundary: Option[Boundary] = None,
) extends Body
diff --git a/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala b/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala
index 55bef0ef4..f077ad668 100644
--- a/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala
+++ b/zio-http/src/main/scala/zio/http/netty/NettyBodyWriter.scala
@@ -35,7 +35,7 @@ object NettyBodyWriter {
body match {
case body: ByteBufBody =>
ctx.write(body.byteBuf)
- ctx.flush()
+ ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
None
case body: FileBody =>
val file = body.file
@@ -45,30 +45,50 @@ object NettyBodyWriter {
// Write the end marker.
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
None
- case AsyncBody(async, _, _) =>
- async(
- new UnsafeAsync {
- override def apply(message: Chunk[Byte], isLast: Boolean): Unit = {
- val nettyMsg = message match {
- case b: ByteArray => Unpooled.wrappedBuffer(b.array)
- case other => Unpooled.wrappedBuffer(other.toArray)
- }
- ctx.writeAndFlush(nettyMsg)
- if (isLast) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
- }
+ case AsyncBody(async, _, _, _) =>
+ contentLength.orElse(body.knownContentLength) match {
+ case Some(_) =>
+ async(
+ new UnsafeAsync {
+ override def apply(message: Chunk[Byte], isLast: Boolean): Unit = {
+ val nettyMsg = message match {
+ case b: ByteArray => Unpooled.wrappedBuffer(b.array)
+ case other => Unpooled.wrappedBuffer(other.toArray)
+ }
+ ctx.writeAndFlush(nettyMsg)
+ if (isLast) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
+ }
- override def fail(cause: Throwable): Unit =
- ctx.fireExceptionCaught(cause)
- },
- )
- None
+ override def fail(cause: Throwable): Unit =
+ ctx.fireExceptionCaught(cause)
+ },
+ )
+ None
+ case None =>
+ async(
+ new UnsafeAsync {
+ override def apply(message: Chunk[Byte], isLast: Boolean): Unit = {
+ val nettyMsg = message match {
+ case b: ByteArray => Unpooled.wrappedBuffer(b.array)
+ case other => Unpooled.wrappedBuffer(other.toArray)
+ }
+ ctx.writeAndFlush(new DefaultHttpContent(nettyMsg))
+ if (isLast) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
+ }
+
+ override def fail(cause: Throwable): Unit =
+ ctx.fireExceptionCaught(cause)
+ },
+ )
+ None
+ }
case AsciiStringBody(asciiString, _, _) =>
ctx.write(Unpooled.wrappedBuffer(asciiString.array()))
- ctx.flush()
+ ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
None
- case StreamBody(stream, _, _) =>
+ case StreamBody(stream, _, _, _) =>
Some(
- contentLength match {
+ contentLength.orElse(body.knownContentLength) match {
case Some(length) =>
stream.chunks
.runFoldZIO(length) { (remaining, bytes) =>
@@ -76,18 +96,19 @@ object NettyBodyWriter {
case 0L =>
NettyFutureExecutor.executed {
// Flushes the last body content and LastHttpContent together to avoid race conditions.
- ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
+ ctx.write(Unpooled.wrappedBuffer(bytes.toArray))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}.as(0L)
case n =>
NettyFutureExecutor.executed {
- ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(bytes.toArray)))
+ ctx.writeAndFlush(Unpooled.wrappedBuffer(bytes.toArray))
}.as(n)
}
}
.flatMap {
- case 0L => ZIO.unit
+ case 0L =>
+ ZIO.unit
case remaining =>
val actualLength = length - remaining
ZIO.logWarning(s"Expected Content-Length of $length, but sent $actualLength bytes") *>
@@ -113,10 +134,10 @@ object NettyBodyWriter {
None
case ChunkBody(data, _, _) =>
ctx.write(Unpooled.wrappedBuffer(data.toArray))
- ctx.flush()
+ ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
None
case EmptyBody =>
- ctx.flush()
+ ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
None
}
}
diff --git a/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala b/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala
index 948382d49..c5d70aa1f 100644
--- a/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala
+++ b/zio-http/src/main/scala/zio/http/netty/NettyResponse.scala
@@ -48,10 +48,11 @@ object NettyResponse {
unsafe: Unsafe,
trace: Trace,
): ZIO[Any, Nothing, Response] = {
- val status = Conversions.statusFromNetty(jRes.status())
- val headers = Conversions.headersFromNetty(jRes.headers())
+ val status = Conversions.statusFromNetty(jRes.status())
+ val headers = Conversions.headersFromNetty(jRes.headers())
+ val knownContentLength = headers.get(Header.ContentLength).map(_.length)
- if (headers.get(Header.ContentLength).map(_.length).contains(0L)) {
+ if (knownContentLength.contains(0L)) {
onComplete
.succeed(ChannelState.forStatus(status))
.as(
@@ -67,9 +68,7 @@ object NettyResponse {
responseHandler,
): Unit
- val data = NettyBody.fromAsync { callback =>
- responseHandler.connect(callback)
- }
+ val data = NettyBody.fromAsync(callback => responseHandler.connect(callback), knownContentLength)
ZIO.succeed(Response(status, headers, data))
}
}
diff --git a/zio-http/src/main/scala/zio/http/netty/NettyResponseEncoder.scala b/zio-http/src/main/scala/zio/http/netty/NettyResponseEncoder.scala
index 0e6fcc828..6da366db1 100644
--- a/zio-http/src/main/scala/zio/http/netty/NettyResponseEncoder.scala
+++ b/zio-http/src/main/scala/zio/http/netty/NettyResponseEncoder.scala
@@ -38,8 +38,15 @@ private[zio] object NettyResponseEncoder {
val bytes = runtime.runtime(ctx).unsafe.run(body.asArray).getOrThrow()
fastEncode(response, bytes)
} else {
- val jHeaders = Conversions.headersToNetty(response.headers)
- val jStatus = Conversions.statusToNetty(response.status)
+ val jHeaders = Conversions.headersToNetty(response.headers)
+ val jStatus = Conversions.statusToNetty(response.status)
+
+ response.body.knownContentLength match {
+ case Some(contentLength) if !jHeaders.contains(HttpHeaderNames.CONTENT_LENGTH) =>
+ jHeaders.set(HttpHeaderNames.CONTENT_LENGTH, contentLength)
+ case _ =>
+ }
+
val hasContentLength = jHeaders.contains(HttpHeaderNames.CONTENT_LENGTH)
if (!hasContentLength) jHeaders.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
new DefaultHttpResponse(HttpVersion.HTTP_1_1, jStatus, jHeaders)
@@ -62,10 +69,6 @@ private[zio] object NettyResponseEncoder {
val jContent = Unpooled.wrappedBuffer(bytes)
val jResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, jStatus, jContent, false)
- // TODO: Unit test for this
- // Client can't handle chunked responses and currently treats them as a FullHttpResponse.
- // Due to this client limitation it is not possible to write a unit-test for this.
- // Alternative would be to use sttp client for this use-case.
if (!hasContentLength) jHeaders.set(HttpHeaderNames.CONTENT_LENGTH, jContent.readableBytes())
jResponse.headers().add(jHeaders)
jResponse
diff --git a/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala b/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala
index 72d14d89e..fc1466fe7 100644
--- a/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala
+++ b/zio-http/src/main/scala/zio/http/netty/client/NettyRequestEncoder.scala
@@ -40,16 +40,17 @@ private[zio] object NettyRequestEncoder {
// Host and port information should be in the headers.
val path = replaceEmptyPathWithSlash(req.url).relative.addLeadingSlash.encode
- val encodedReqHeaders = Conversions.headersToNetty(req.allHeaders)
+ val headers = Conversions.headersToNetty(req.allHeaders)
- val headers = req.url.hostPort match {
- case Some(host) => encodedReqHeaders.set(HttpHeaderNames.HOST, host)
- case _ => encodedReqHeaders
+ req.url.hostPort match {
+ case Some(host) if !headers.contains(HttpHeaderNames.HOST) =>
+ headers.set(HttpHeaderNames.HOST, host)
+ case _ =>
}
if (req.body.isComplete) {
- req.body.asChunk.map { chunk =>
- val content = Unpooled.wrappedBuffer(chunk.toArray)
+ req.body.asArray.map { array =>
+ val content = Unpooled.wrappedBuffer(array)
val writerIndex = content.writerIndex()
headers.set(HttpHeaderNames.CONTENT_LENGTH, writerIndex.toString)
@@ -60,7 +61,12 @@ private[zio] object NettyRequestEncoder {
}
} else {
ZIO.attempt {
- headers.set(HttpHeaderNames.TRANSFER_ENCODING, "chunked")
+ req.body.knownContentLength match {
+ case Some(length) =>
+ headers.set(HttpHeaderNames.CONTENT_LENGTH, length.toString)
+ case None =>
+ headers.set(HttpHeaderNames.TRANSFER_ENCODING, "chunked")
+ }
new DefaultHttpRequest(jVersion, method, path, headers)
}
}
diff --git a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala
index f44c549d6..10e33420c 100644
--- a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala
+++ b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala
@@ -224,13 +224,9 @@ private[zio] final case class ServerInboundHandler(
remoteAddress = remoteAddress,
)
case nettyReq: HttpRequest =>
- val handler = addAsyncBodyHandler(ctx)
- val body = NettyBody.fromAsync(
- { async =>
- handler.connect(async)
- },
- contentType,
- )
+ val knownContentLength = headers.get(Header.ContentLength).map(_.length)
+ val handler = addAsyncBodyHandler(ctx)
+ val body = NettyBody.fromAsync(async => handler.connect(async), knownContentLength, contentType)
Request(
body = body,
diff --git a/zio-http/src/test/scala/zio/http/BodySpec.scala b/zio-http/src/test/scala/zio/http/BodySpec.scala
index 88af33918..8313dd132 100644
--- a/zio-http/src/test/scala/zio/http/BodySpec.scala
+++ b/zio-http/src/test/scala/zio/http/BodySpec.scala
@@ -37,7 +37,7 @@ object BodySpec extends ZIOHttpSpec {
check(Gen.string) { payload =>
val stringBuffer = payload.getBytes(Charsets.Http)
val responseContent = ZStream.fromIterable(stringBuffer, chunkSize = 2)
- val res = Body.fromStream(responseContent).asString(Charsets.Http)
+ val res = Body.fromStreamChunked(responseContent).asString(Charsets.Http)
assertZIO(res)(equalTo(payload))
}
},
@@ -45,12 +45,12 @@ object BodySpec extends ZIOHttpSpec {
suite("fromFile")(
test("success") {
lazy val file = testFile
- val res = Body.fromFile(file).asString(Charsets.Http)
+ val res = Body.fromFile(file).flatMap(_.asString(Charsets.Http))
assertZIO(res)(equalTo("foo\nbar"))
},
test("success small chunk") {
lazy val file = testFile
- val res = Body.fromFile(file, 3).asString(Charsets.Http)
+ val res = Body.fromFile(file, 3).flatMap(_.asString(Charsets.Http))
assertZIO(res)(equalTo("foo\nbar"))
},
),
diff --git a/zio-http/src/test/scala/zio/http/ClientSpec.scala b/zio-http/src/test/scala/zio/http/ClientSpec.scala
index 8463f7459..230baf8c4 100644
--- a/zio-http/src/test/scala/zio/http/ClientSpec.scala
+++ b/zio-http/src/test/scala/zio/http/ClientSpec.scala
@@ -64,7 +64,7 @@ object ClientSpec extends HttpRunnableSpec {
val app = Handler.fromFunctionZIO[Request] { req => req.body.asString.map(Response.text(_)) }.sandbox.toHttpApp
val stream = ZStream.fromIterable(List("a", "b", "c"), chunkSize = 1)
val res = app
- .deploy(Request(method = Method.POST, body = Body.fromCharSequenceStream(stream)))
+ .deploy(Request(method = Method.POST, body = Body.fromCharSequenceStreamChunked(stream)))
.flatMap(_.body.asString)
assertZIO(res)(equalTo("abc"))
},
@@ -87,7 +87,7 @@ object ClientSpec extends HttpRunnableSpec {
} yield assertTrue(loggedUrl == s"$baseURL/")
},
test("reading of unfinished body must fail") {
- val app = Handler.fromStream(ZStream.never).sandbox.toHttpApp
+ val app = Handler.fromStreamChunked(ZStream.never).sandbox.toHttpApp
val requestCode = (client: Client) =>
(for {
response <- ZIO.scoped(client(Request()))
diff --git a/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala b/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala
index a8b3b7800..7d76fc71d 100644
--- a/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala
+++ b/zio-http/src/test/scala/zio/http/ClientStreamingSpec.scala
@@ -37,10 +37,12 @@ object ClientStreamingSpec extends HttpRunnableSpec {
handler(Response.text("simple response")),
Method.GET / "streaming-get" ->
handler(
- Response(body = Body.fromStream(ZStream.fromIterable("streaming response".getBytes).rechunk(3))),
+ Response(body = Body.fromStreamChunked(ZStream.fromIterable("streaming response".getBytes).rechunk(3))),
),
Method.POST / "simple-post" -> handler((req: Request) => req.ignoreBody.as(Response.ok)),
- Method.POST / "streaming-echo" -> handler((req: Request) => Response(body = Body.fromStream(req.body.asStream))),
+ Method.POST / "streaming-echo" -> handler((req: Request) =>
+ Response(body = Body.fromStreamChunked(req.body.asStream)),
+ ),
Method.POST / "form" -> handler((req: Request) =>
req.body.asMultipartFormStream.flatMap { form =>
form.collectAll.flatMap { inMemoryForm =>
@@ -95,7 +97,7 @@ object ClientStreamingSpec extends HttpRunnableSpec {
.request(
Request.post(
URL.decode(s"http://localhost:$port/simple-post").toOption.get,
- Body.fromStream(
+ Body.fromStreamChunked(
(ZStream.fromIterable("streaming request".getBytes) @@ ZStreamAspect.rechunk(3))
.schedule(Schedule.fixed(10.millis)),
),
@@ -111,7 +113,7 @@ object ClientStreamingSpec extends HttpRunnableSpec {
.request(
Request.post(
URL.decode(s"http://localhost:$port/streaming-echo").toOption.get,
- Body.fromStream(
+ Body.fromStreamChunked(
ZStream.fromIterable("streaming request".getBytes) @@ ZStreamAspect.rechunk(3),
),
),
@@ -221,7 +223,7 @@ object ClientStreamingSpec extends HttpRunnableSpec {
Request
.post(
URL.decode(s"http://localhost:$port/form").toOption.get,
- Body.fromStream(stream),
+ Body.fromStreamChunked(stream),
)
.addHeaders(Headers(Header.ContentType(MediaType.multipart.`form-data`, Some(boundary)))),
)
@@ -243,7 +245,7 @@ object ClientStreamingSpec extends HttpRunnableSpec {
.request(
Request.post(
URL.decode(s"http://localhost:$port/simple-post").toOption.get,
- Body.fromStream(ZStream.fail(new RuntimeException("Some error"))),
+ Body.fromStreamChunked(ZStream.fail(new RuntimeException("Some error"))),
),
)
.exit
@@ -262,7 +264,7 @@ object ClientStreamingSpec extends HttpRunnableSpec {
.request(
Request.post(
URL.decode(s"http://localhost:$port/streaming-echo").toOption.get,
- Body.fromStream(
+ Body.fromStreamChunked(
(ZStream.fromIterable("streaming request".getBytes) @@ ZStreamAspect.rechunk(3)).chunks.tap { chunk =>
if (chunk == Chunk.fromArray("que".getBytes))
sync.await
diff --git a/zio-http/src/test/scala/zio/http/HandlerSpec.scala b/zio-http/src/test/scala/zio/http/HandlerSpec.scala
index 6a45ae0cf..e5630f7ab 100644
--- a/zio-http/src/test/scala/zio/http/HandlerSpec.scala
+++ b/zio-http/src/test/scala/zio/http/HandlerSpec.scala
@@ -387,11 +387,12 @@ object HandlerSpec extends ZIOHttpSpec with ExitAssertion {
val tempFile = tempPath.toFile
val http = Handler.fromFileZIO(ZIO.succeed(tempFile))
for {
- r <- http.apply {}
+ r <- http.apply {}
+ tempFile <- Body.fromFile(tempFile)
} yield {
assert(r.status)(equalTo(Status.Ok)) &&
assert(r.headers)(contains(Header.ContentType(MediaType.image.`jpeg`))) &&
- assert(r.body)(equalTo(Body.fromFile(tempFile)))
+ assert(r.body)(equalTo(tempFile))
}
}
},
diff --git a/zio-http/src/test/scala/zio/http/ResponseCompressionSpec.scala b/zio-http/src/test/scala/zio/http/ResponseCompressionSpec.scala
index 5868d7a38..43858644a 100644
--- a/zio-http/src/test/scala/zio/http/ResponseCompressionSpec.scala
+++ b/zio-http/src/test/scala/zio/http/ResponseCompressionSpec.scala
@@ -41,7 +41,7 @@ object ResponseCompressionSpec extends ZIOHttpSpec {
Headers(
Header.ContentType(MediaType.text.plain),
),
- Body.fromCharSequenceStream(
+ Body.fromCharSequenceStreamChunked(
ZStream
.unfold[Long, String](0L) { s =>
if (s < 1000) Some((s"$s\n", s + 1)) else None
diff --git a/zio-http/src/test/scala/zio/http/ServerSpec.scala b/zio-http/src/test/scala/zio/http/ServerSpec.scala
index c9dcefcd5..d46c52fe5 100644
--- a/zio-http/src/test/scala/zio/http/ServerSpec.scala
+++ b/zio-http/src/test/scala/zio/http/ServerSpec.scala
@@ -129,7 +129,7 @@ object ServerSpec extends HttpRunnableSpec {
val app =
Routes(RoutePattern.any -> handler((_: Path, req: Request) => Response(body = req.body))).toHttpApp
val res =
- app.deploy.body.mapZIO(_.asChunk.map(_.length)).run(body = Body.fromCharSequenceStream(dataStream))
+ app.deploy.body.mapZIO(_.asChunk.map(_.length)).run(body = Body.fromCharSequenceStreamChunked(dataStream))
assertZIO(res)(equalTo(MaxSize))
}
} +
@@ -337,13 +337,13 @@ object ServerSpec extends HttpRunnableSpec {
}
},
test("text streaming") {
- val res = Handler.fromStream(ZStream("a", "b", "c")).sandbox.toHttpApp.deploy.body.mapZIO(_.asString).run()
+ val res = Handler.fromStreamChunked(ZStream("a", "b", "c")).sandbox.toHttpApp.deploy.body.mapZIO(_.asString).run()
assertZIO(res)(equalTo("abc"))
},
test("echo streaming") {
val res = Routes
.singleton(handler { (_: Path, req: Request) =>
- Handler.fromStream(ZStream.fromZIO(req.body.asChunk).flattenChunks): Handler[
+ Handler.fromStreamChunked(ZStream.fromZIO(req.body.asChunk).flattenChunks): Handler[
Any,
Throwable,
(Path, Request),
@@ -361,7 +361,14 @@ object ServerSpec extends HttpRunnableSpec {
test("file-streaming") {
val path = getClass.getResource("/TestFile.txt").getPath
val res =
- Handler.fromStream(ZStream.fromPath(Paths.get(path))).sandbox.toHttpApp.deploy.body.mapZIO(_.asString).run()
+ Handler
+ .fromStreamChunked(ZStream.fromPath(Paths.get(path)))
+ .sandbox
+ .toHttpApp
+ .deploy
+ .body
+ .mapZIO(_.asString)
+ .run()
assertZIO(res)(equalTo("foo\nbar"))
} @@ TestAspect.os(os => !os.isWindows),
suite("html")(
@@ -426,7 +433,7 @@ object ServerSpec extends HttpRunnableSpec {
test("POST Request stream") {
val app: HttpApp[Any] = Routes.singleton {
handler { (_: Path, req: Request) =>
- Response(body = Body.fromStream(req.body.asStream))
+ Response(body = Body.fromStreamChunked(req.body.asStream))
}
}.toHttpApp
diff --git a/zio-http/src/test/scala/zio/http/internal/HttpGen.scala b/zio-http/src/test/scala/zio/http/internal/HttpGen.scala
index 70ca1ac3e..e8c56f22c 100644
--- a/zio-http/src/test/scala/zio/http/internal/HttpGen.scala
+++ b/zio-http/src/test/scala/zio/http/internal/HttpGen.scala
@@ -63,7 +63,8 @@ object HttpGen {
url <- HttpGen.url
headers <- Gen.listOf(HttpGen.header).map(Headers(_))
version <- httpVersion
- } yield Request(version, method, url, headers, Body.fromFile(file), None)
+ body <- Gen.fromZIO(Body.fromFile(file))
+ } yield Request(version, method, url, headers, body, None)
}
def genAbsoluteLocation: Gen[Any, Location.Absolute] = for {
@@ -97,7 +98,7 @@ object HttpGen {
cnt <- Gen
.fromIterable(
List(
- Body.fromStream(
+ Body.fromStreamChunked(
ZStream.fromIterable(list, chunkSize = 2).map(b => Chunk.fromArray(b.getBytes())).flattenChunks,
),
Body.fromString(list.mkString("")),
@@ -135,7 +136,7 @@ object HttpGen {
cnt <- Gen
.fromIterable(
List(
- Body.fromStream(
+ Body.fromStreamChunked(
ZStream.fromIterable(list, chunkSize = 2).map(b => Chunk.fromArray(b.getBytes())).flattenChunks,
),
Body.fromString(list.mkString("")),
diff --git a/zio-http/src/test/scala/zio/http/netty/NettyBodySpec.scala b/zio-http/src/test/scala/zio/http/netty/NettyBodySpec.scala
index 478d51cc2..6ce5760ca 100644
--- a/zio-http/src/test/scala/zio/http/netty/NettyBodySpec.scala
+++ b/zio-http/src/test/scala/zio/http/netty/NettyBodySpec.scala
@@ -31,12 +31,12 @@ object NettyBodySpec extends ZIOHttpSpec {
suite("fromAsync")(
test("success") {
val message = Chunk.fromArray("Hello World".getBytes(Charsets.Http))
- val chunk = NettyBody.fromAsync(async => async(message, isLast = true)).asChunk
+ val chunk = NettyBody.fromAsync(async => async(message, isLast = true), knownContentLength = None).asChunk
assertZIO(chunk)(equalTo(message))
},
test("fail") {
val exception = new RuntimeException("Some Error")
- val error = NettyBody.fromAsync(_ => throw exception).asChunk.flip
+ val error = NettyBody.fromAsync(_ => throw exception, knownContentLength = None).asChunk.flip
assertZIO(error)(equalTo(exception))
},
),
diff --git a/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala b/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala
index 11a93802d..eea6001ab 100644
--- a/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala
+++ b/zio-http/src/test/scala/zio/http/netty/NettyStreamBodySpec.scala
@@ -19,10 +19,7 @@ object NettyStreamBodySpec extends HttpRunnableSpec {
handler(
http.Response(
status = Status.Ok,
- // content length header is important,
- // in this case the server will not use chunked transfer encoding even if response is a stream
- headers = Headers(Header.ContentLength(len)),
- body = Body.fromStream(streams.next()),
+ body = Body.fromStream(streams.next(), len),
),
),
).sandbox.toHttpApp
diff --git a/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala b/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala
index 340d7674b..0f003c927 100644
--- a/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala
+++ b/zio-http/src/test/scala/zio/http/netty/client/NettyConnectionPoolSpec.scala
@@ -31,7 +31,7 @@ import zio.http.netty.NettyConfig
object NettyConnectionPoolSpec extends HttpRunnableSpec {
private val app = Routes(
- Method.POST / "streaming" -> handler((req: Request) => Response(body = Body.fromStream(req.body.asStream))),
+ Method.POST / "streaming" -> handler((req: Request) => Response(body = Body.fromStreamChunked(req.body.asStream))),
Method.GET / "slow" -> handler(ZIO.sleep(1.hour).as(Response.text("done"))),
Method.ANY / trailing -> handler((_: Path, req: Request) => req.body.asString.map(Response.text(_))),
).sandbox.toHttpApp
@@ -76,7 +76,7 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec {
.deploy(
Request(
method = Method.POST,
- body = Body.fromCharSequenceStream(stream),
+ body = Body.fromCharSequenceStreamChunked(stream),
headers = extraHeaders,
),
)
@@ -113,7 +113,7 @@ object NettyConnectionPoolSpec extends HttpRunnableSpec {
Request(
method = Method.POST,
url = URL.root / "streaming",
- body = Body.fromCharSequenceStream(stream),
+ body = Body.fromCharSequenceStreamChunked(stream),
headers = extraHeaders,
),
)