diff --git a/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala b/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala index 55e752106f..5c481f7a31 100644 --- a/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala +++ b/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala @@ -16,6 +16,8 @@ package zio.http.netty +import java.io.IOException + import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.{Chunk, ChunkBuilder, Trace, Unsafe} @@ -38,11 +40,22 @@ abstract class AsyncBodyReader(implicit trace: Trace) extends SimpleChannelInbou this.synchronized { state match { case State.Buffering => - state = State.Direct(callback) - buffer.result().foreach { case (chunk, isLast) => - callback(chunk, isLast) + val result: Chunk[(Chunk[Byte], Boolean)] = buffer.result() + val readingDone: Boolean = result.lastOption match { + case None => false + case Some((_, isLast)) => isLast } - ctx.read() + + if (ctx.channel.isOpen || readingDone) { + state = State.Direct(callback) + result.foreach { case (chunk, isLast) => + callback(chunk, isLast) + } + ctx.read() + } else { + throw new IllegalStateException("Attempting to read from a closed channel, which will never finish") + } + case State.Direct(_) => throw new IllegalStateException("Cannot connect twice") } @@ -91,6 +104,17 @@ abstract class AsyncBodyReader(implicit trace: Trace) extends SimpleChannelInbou } super.exceptionCaught(ctx, cause) } + + override def channelInactive(ctx: ChannelHandlerContext): Unit = { + this.synchronized { + state match { + case State.Buffering => + case State.Direct(callback) => + callback.fail(new IOException("Channel closed unexpectedly")) + } + } + ctx.fireChannelInactive() + } } object AsyncBodyReader { diff --git a/zio-http/src/test/scala/zio/http/ClientSpec.scala b/zio-http/src/test/scala/zio/http/ClientSpec.scala index f1a9eaef8b..8463f74598 100644 --- a/zio-http/src/test/scala/zio/http/ClientSpec.scala +++ b/zio-http/src/test/scala/zio/http/ClientSpec.scala @@ -86,6 +86,18 @@ object ClientSpec extends HttpRunnableSpec { loggedUrl <- ZTestLogger.logOutput.map(_.collectFirst { case m => m.annotations("url") }.mkString) } yield assertTrue(loggedUrl == s"$baseURL/") }, + test("reading of unfinished body must fail") { + val app = Handler.fromStream(ZStream.never).sandbox.toHttpApp + val requestCode = (client: Client) => + (for { + response <- ZIO.scoped(client(Request())) + _ <- response.body.asStream.runForeach { _ => ZIO.succeed(0) } + .timeout(60.second) // timeout just in case it hangs + } yield ()).fold(success = _ => false, failure = _ => true) + + val effect = app.deployAndRequest(requestCode).runZIO(()) + assertZIO(effect)(isTrue) + }, ) override def spec = { diff --git a/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala b/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala index f493e75137..817bc8e395 100644 --- a/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala +++ b/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala @@ -55,9 +55,9 @@ abstract class HttpRunnableSpec extends ZIOHttpSpec { self => } } yield response - def deployAndRequest( - call: Client => ZIO[Scope, Throwable, Response], - ): Handler[Client with DynamicServer with R with Scope, Throwable, Any, Response] = + def deployAndRequest[Output]( + call: Client => ZIO[Scope, Throwable, Output], + ): Handler[Client with DynamicServer with R with Scope, Throwable, Any, Output] = for { port <- Handler.fromZIO(DynamicServer.port) id <- Handler.fromZIO(DynamicServer.deploy[R](app))