Skip to content

Commit

Permalink
Merge branch 'main' into oom
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlar authored Dec 10, 2024
2 parents 0e47279 + 1f8ef1f commit 746a267
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 26 deletions.
10 changes: 10 additions & 0 deletions zio-http/jvm/src/test/scala/zio/http/ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ object ClientSpec extends RoutesRunnableSpec {
app.deploy(Request(headers = Headers(Header.Authorization.Unparsed("", "my-token")))).flatMap(_.body.asString)
assertZIO(responseContent)(equalTo("my-token"))
} @@ timeout(5.seconds),
test("URL and path manipulation on client level") {
for {
baseURL <- DynamicServer.httpURL
_ <-
Handler.ok.toRoutes.deployAndRequest { c =>
(c.updatePath(_ / "my-service") @@ ZClientAspect.requestLogging()).batched.get("/hello")
}.runZIO(())
loggedUrl <- ZTestLogger.logOutput.map(_.collectFirst { case m => m.annotations("url") }.mkString)
} yield assertTrue(loggedUrl == baseURL + "/my-service/hello")
},
)

override def spec = {
Expand Down
36 changes: 21 additions & 15 deletions zio-http/shared/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -567,21 +567,27 @@ object Body {

override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] =
ZStream.unwrap {
for {
file <- ZIO.attempt(file)
fs <- ZIO.attemptBlocking(new FileInputStream(file))
size <- ZIO.attemptBlocking(Math.min(chunkSize.toLong, file.length()).toInt)
} yield ZStream
.repeatZIOOption[Any, Throwable, Chunk[Byte]] {
for {
buffer <- ZIO.succeed(new Array[Byte](size))
len <- ZIO.attemptBlocking(fs.read(buffer)).mapError(Some(_))
bytes <-
if (len > 0) ZIO.succeed(Chunk.fromArray(buffer.slice(0, len)))
else ZIO.fail(None)
} yield bytes
}
.ensuring(ZIO.attemptBlocking(fs.close()).ignoreLogged)
ZIO.blocking {
for {
r <- ZIO.attempt {
val fs = new FileInputStream(file)
val size = Math.min(chunkSize.toLong, file.length()).toInt

(fs, size)
}
(fs, size) = r
} yield ZStream
.repeatZIOOption[Any, Throwable, Chunk[Byte]] {
for {
buffer <- ZIO.succeed(new Array[Byte](size))
len <- ZIO.attempt(fs.read(buffer)).mapError(Some(_))
bytes <-
if (len > 0) ZIO.succeed(Chunk.fromArray(buffer.slice(0, len)))
else ZIO.fail(None)
} yield bytes
}
.ensuring(ZIO.attempt(fs.close()).ignoreLogged)
}
}.flattenChunks

override def contentType(newContentType: Body.ContentType): Body = copy(contentType = Some(newContentType))
Expand Down
10 changes: 7 additions & 3 deletions zio-http/shared/src/main/scala/zio/http/ZClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ final case class ZClient[-Env, ReqEnv, -In, +Err, +Out](

def path(path: String): ZClient[Env, ReqEnv, In, Err, Out] = self.path(Path(path))

def path(path: Path): ZClient[Env, ReqEnv, In, Err, Out] =
copy(url = url.copy(path = path))
def path(path: Path): ZClient[Env, ReqEnv, In, Err, Out] = updatePath(_ => path)

def updatePath(f: Path => Path): ZClient[Env, ReqEnv, In, Err, Out] =
copy(url = url.copy(path = f(url.path)))

def patch(suffix: String)(implicit ev: Body <:< In, trace: Trace): ZIO[Env & ReqEnv, Err, Out] =
request(Method.PATCH, suffix)(ev(Body.empty))
Expand Down Expand Up @@ -263,6 +265,8 @@ final case class ZClient[-Env, ReqEnv, -In, +Err, +Out](
def uri(uri: URI): ZClient[Env, ReqEnv, In, Err, Out] = url(URL.fromURI(uri).getOrElse(URL.empty))

def url(url: URL): ZClient[Env, ReqEnv, In, Err, Out] = copy(url = url)

def updateURL(f: URL => URL): ZClient[Env, ReqEnv, In, Err, Out] = copy(url = f(url))
}

object ZClient extends ZClientPlatformSpecific {
Expand Down Expand Up @@ -670,7 +674,7 @@ object ZClient extends ZClientPlatformSpecific {
webSocketUrl <- url.scheme match {
case Some(Scheme.HTTP) | Some(Scheme.WS) | None => ZIO.succeed(url.scheme(Scheme.WS))
case Some(Scheme.WSS) | Some(Scheme.HTTPS) => ZIO.succeed(url.scheme(Scheme.WSS))
case _ => ZIO.fail(throw new IllegalArgumentException("URL's scheme MUST be WS(S) or HTTP(S)"))
case _ => ZIO.fail(new IllegalArgumentException("URL's scheme MUST be WS(S) or HTTP(S)"))
}
scope <- ZIO.scope
res <- requestAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ sealed trait HttpContentCodec[A] { self =>
ZIO.fromEither(codec.codec(config).decode(bytes))
}
case None =>
ZIO.fail(throw new IllegalArgumentException(s"No codec found for content type $contentType"))
ZIO.fail(new IllegalArgumentException(s"No codec found for content type $contentType"))
}
}

Expand All @@ -50,7 +50,7 @@ sealed trait HttpContentCodec[A] { self =>
ZIO.fromEither(codec.codec(config).decode(bytes))
}
case None =>
ZIO.fail(throw new IllegalArgumentException(s"No codec found for content type $contentType"))
ZIO.fail(new IllegalArgumentException(s"No codec found for content type $contentType"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,21 @@ private[codec] object EncoderDecoder {
private def decodeBody(config: CodecConfig, body: Body, inputs: Array[Any])(implicit
trace: Trace,
): Task[Unit] = {
val codecs = flattened.content
val isNonMultiPart = inputs.length < 2
if (isNonMultiPart) {
val codecs = flattened.content

if (inputs.length < 2) {
// non multi-part
codecs.headOption.map { codec =>
// noinspection SimplifyUnlessInspection
if (codecs.isEmpty) ZIO.unit
else {
val codec = codecs.head
codec
.decodeFromBody(body, config)
.mapBoth(
{ err => HttpCodecError.MalformedBody(err.getMessage(), Some(err)) },
{ err => HttpCodecError.MalformedBody(err.getMessage, Some(err)) },
result => inputs(0) = result,
)
}.getOrElse(ZIO.unit)
}
} else {
// multi-part
decodeForm(body.asMultipartFormStream, inputs, config) *> check(inputs)
Expand Down

0 comments on commit 746a267

Please sign in to comment.