From 9674ac229c105b99b9bd60bd52c963bc45520c4c Mon Sep 17 00:00:00 2001 From: Khoa Vo Date: Tue, 10 Oct 2023 21:54:41 +0300 Subject: [PATCH] Throw exception when reading from AsyncBody after the channel closed (#2399) (#2454) * Throw exception when reading from AsyncBody after the channel closed (#2399) Currently, when we call `Response.body.asStream`, we don't check to see if the corresponding response channel is open or not. If the channel has been closed and the user attempts to read the response body, it's possible for the body stream to hang (i.e never finish or throw an error), leading to the code hanging as illustrated in #2399. This behavior is undesirable since the user isn't aware of the channel closing (because we take care of that behind the scene), leading to the issue being hard to debug. This commit adds a check to `AsyncBodyReader.connect` so that we will throw an exception when the user attempts to read an unfinished response body from a closed channel. * fix: AsyncBodyReader emit failure when channel closes This is a bug which happened to me in CI: when the connect to the server closes, we must fail the response body stream so that the client doesn't hang forever trying to read from the stream. --- .../zio/http/netty/AsyncBodyReader.scala | 32 ++++++++++++++++--- .../src/test/scala/zio/http/ClientSpec.scala | 12 +++++++ .../zio/http/internal/HttpRunnableSpec.scala | 6 ++-- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala b/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala index 55e752106f..5c481f7a31 100644 --- a/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala +++ b/zio-http/src/main/scala/zio/http/netty/AsyncBodyReader.scala @@ -16,6 +16,8 @@ package zio.http.netty +import java.io.IOException + import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.{Chunk, ChunkBuilder, Trace, Unsafe} @@ -38,11 +40,22 @@ abstract class AsyncBodyReader(implicit trace: Trace) extends SimpleChannelInbou this.synchronized { state match { case State.Buffering => - state = State.Direct(callback) - buffer.result().foreach { case (chunk, isLast) => - callback(chunk, isLast) + val result: Chunk[(Chunk[Byte], Boolean)] = buffer.result() + val readingDone: Boolean = result.lastOption match { + case None => false + case Some((_, isLast)) => isLast } - ctx.read() + + if (ctx.channel.isOpen || readingDone) { + state = State.Direct(callback) + result.foreach { case (chunk, isLast) => + callback(chunk, isLast) + } + ctx.read() + } else { + throw new IllegalStateException("Attempting to read from a closed channel, which will never finish") + } + case State.Direct(_) => throw new IllegalStateException("Cannot connect twice") } @@ -91,6 +104,17 @@ abstract class AsyncBodyReader(implicit trace: Trace) extends SimpleChannelInbou } super.exceptionCaught(ctx, cause) } + + override def channelInactive(ctx: ChannelHandlerContext): Unit = { + this.synchronized { + state match { + case State.Buffering => + case State.Direct(callback) => + callback.fail(new IOException("Channel closed unexpectedly")) + } + } + ctx.fireChannelInactive() + } } object AsyncBodyReader { diff --git a/zio-http/src/test/scala/zio/http/ClientSpec.scala b/zio-http/src/test/scala/zio/http/ClientSpec.scala index f1a9eaef8b..8463f74598 100644 --- a/zio-http/src/test/scala/zio/http/ClientSpec.scala +++ b/zio-http/src/test/scala/zio/http/ClientSpec.scala @@ -86,6 +86,18 @@ object ClientSpec extends HttpRunnableSpec { loggedUrl <- ZTestLogger.logOutput.map(_.collectFirst { case m => m.annotations("url") }.mkString) } yield assertTrue(loggedUrl == s"$baseURL/") }, + test("reading of unfinished body must fail") { + val app = Handler.fromStream(ZStream.never).sandbox.toHttpApp + val requestCode = (client: Client) => + (for { + response <- ZIO.scoped(client(Request())) + _ <- response.body.asStream.runForeach { _ => ZIO.succeed(0) } + .timeout(60.second) // timeout just in case it hangs + } yield ()).fold(success = _ => false, failure = _ => true) + + val effect = app.deployAndRequest(requestCode).runZIO(()) + assertZIO(effect)(isTrue) + }, ) override def spec = { diff --git a/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala b/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala index f493e75137..817bc8e395 100644 --- a/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala +++ b/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala @@ -55,9 +55,9 @@ abstract class HttpRunnableSpec extends ZIOHttpSpec { self => } } yield response - def deployAndRequest( - call: Client => ZIO[Scope, Throwable, Response], - ): Handler[Client with DynamicServer with R with Scope, Throwable, Any, Response] = + def deployAndRequest[Output]( + call: Client => ZIO[Scope, Throwable, Output], + ): Handler[Client with DynamicServer with R with Scope, Throwable, Any, Output] = for { port <- Handler.fromZIO(DynamicServer.port) id <- Handler.fromZIO(DynamicServer.deploy[R](app))