Skip to content

Commit

Permalink
deprecatre req and add streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
varshith257 committed Oct 31, 2024
1 parent df186d4 commit 3775973
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package zio.http.endpoint.cli

import java.nio.file.Path

import scala.annotation.nowarn

import zio._

import zio.http._
Expand All @@ -27,10 +29,13 @@ private[cli] object Retriever {
final case class URL(name: String, url: String, mediaType: MediaType) extends Retriever {

val request = Request.get(http.URL(http.Path.decode(url)))
override def retrieve(): ZIO[Client, Throwable, FormField] = for {
client <- ZIO.serviceWith[Client](_.batched)
chunk <- client.request(request).flatMap(_.body.asChunk)
} yield FormField.binaryField(name, chunk, mediaType)
@nowarn("cat=deprecation")
override def retrieve(): ZIO[Client, Throwable, FormField] = ZIO.scoped {
for {
client <- ZIO.serviceWith[Client](_.batched)
chunk <- client.request(request).flatMap(_.body.asChunk)
} yield FormField.binaryField(name, chunk, mediaType)
}
}

final case class File(name: String, path: Path, mediaType: MediaType) extends Retriever {
Expand Down
20 changes: 10 additions & 10 deletions zio-http/jvm/src/test/scala/zio/http/ClientStreamingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
for {
port <- server(streamingServer)
client <- ZIO.service[Client]
response <- client.request(
response <- client.batched(
Request.get(URL.decode(s"http://localhost:$port/simple-get").toOption.get),
)
body <- response.body.asString
Expand All @@ -72,7 +72,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
for {
port <- server(streamingServer)
client <- ZIO.service[Client]
response <- client.request(
response <- client.streaming(
Request.get(URL.decode(s"http://localhost:$port/streaming-get").toOption.get),
)
body <- response.body.asStream.chunks.map(chunk => new String(chunk.toArray)).runCollect
Expand All @@ -94,7 +94,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
port <- server(streamingServer)
client <- ZIO.service[Client]
response <- client
.request(
.batched(
Request.post(
URL.decode(s"http://localhost:$port/simple-post").toOption.get,
Body.fromStreamChunked(
Expand All @@ -110,7 +110,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
port <- server(streamingServer)
client <- ZIO.service[Client]
response <- client
.request(
.streaming(
Request.post(
URL.decode(s"http://localhost:$port/streaming-echo").toOption.get,
Body.fromStreamChunked(
Expand Down Expand Up @@ -146,7 +146,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
for {
boundary <- Boundary.randomUUID
response <- client
.request(
.batched(
Request
.post(
URL.decode(s"http://localhost:$port/form").toOption.get,
Expand Down Expand Up @@ -178,8 +178,8 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
boundary <- Boundary.randomUUID
stream = Form(fields.map(_._1): _*).multipartBytes(boundary)
bytes <- stream.runCollect
response <- client.batched
.request(
response <- client
.batched(
Request
.post(
URL.decode(s"http://localhost:$port/form").toOption.get,
Expand Down Expand Up @@ -219,7 +219,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
boundary <- Boundary.randomUUID
stream = form.multipartBytes(boundary).rechunk(chunkSize)
response <- client
.request(
.streaming(
Request
.post(
URL.decode(s"http://localhost:$port/form").toOption.get,
Expand All @@ -242,7 +242,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
port <- server(streamingServer)
client <- ZIO.service[Client]
response <- client
.request(
.streaming(
Request.post(
URL.decode(s"http://localhost:$port/simple-post").toOption.get,
Body.fromStreamChunked(ZStream.fail(new RuntimeException("Some error"))),
Expand All @@ -261,7 +261,7 @@ object ClientStreamingSpec extends RoutesRunnableSpec {
client <- ZIO.service[Client]
sync <- Promise.make[Nothing, Unit]
response <- client
.request(
.streaming(
Request.post(
URL.decode(s"http://localhost:$port/streaming-echo").toOption.get,
Body.fromStreamChunked(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object RequestStreamingServerSpec extends RoutesRunnableSpec {
val host = req.headers.get(Header.Host).get
val newRequest =
req.copy(url = req.url.path("/2").host(host.hostAddress).port(host.port.getOrElse(80)))
ZIO.serviceWithZIO[Client](_.request(newRequest))
ZIO.serviceWithZIO[Client](_.streaming(newRequest))
},
Method.POST / "2" -> handler { (req: Request) =>
req.body.asChunk.map { body =>
Expand Down
32 changes: 18 additions & 14 deletions zio-http/jvm/src/test/scala/zio/http/ZClientAspectSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,24 @@ object ZClientAspectSpec extends ZIOHttpSpec {
annotations.head.contains("duration_ms"),
),
),
test("followRedirects")(
for {
port <- Server.install(redir ++ routes)
baseClient <- ZIO.service[Client]
client = baseClient
.url(
URL(Path.empty, Location.Absolute(Scheme.HTTP, "localhost", Some(port))),
)
.batched @@ ZClientAspect.followRedirects(2)((resp, message) => ZIO.logInfo(message).as(resp))
response <- client.request(Request.get(URL.empty / "redirect"))
} yield assertTrue(
extractStatus(response) == Status.Ok,
),
),
test("followRedirects") {
@nowarn("cat=deprecation")
def followRedirectsTest = {
for {
port <- Server.install(redir ++ routes)
baseClient <- ZIO.service[Client]
client = baseClient
.url(
URL(Path.empty, Location.Absolute(Scheme.HTTP, "localhost", Some(port))),
)
.batched @@ ZClientAspect.followRedirects(2)((resp, message) => ZIO.logInfo(message).as(resp))
response <- client.request(Request.get(URL.empty / "redirect"))
} yield assertTrue(
extractStatus(response) == Status.Ok,
)
}
followRedirectsTest
},
).provide(
ZLayer.succeed(Server.Config.default.onAnyOpenPort),
Server.customized,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object NettyStreamBodySpec extends RoutesRunnableSpec {
}

def makeRequest(client: Client, port: Int) = client
.request(
.streaming(
Request.get(URL.decode(s"http://localhost:$port/with-content-length").toOption.get),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ object NettyConnectionPoolSpec extends RoutesRunnableSpec {
.toRoutes
.deployAndRequest { client =>
ZIO.foreachParDiscard(0 to 10) { _ =>
client.batched.request(Request()).flatMap(_.body.asArray).repeatN(200)
client.batched(Request()).flatMap(_.body.asArray).repeatN(200)
}
}(Request())
.as(assertCompletes)
Expand Down
32 changes: 29 additions & 3 deletions zio-http/shared/src/main/scala/zio/http/ZClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package zio.http

import java.net.{InetSocketAddress, URI}

import scala.annotation.nowarn

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

Expand All @@ -37,6 +39,7 @@ final case class ZClient[-Env, ReqEnv, -In, +Err, +Out](
bodyDecoder: ZClient.BodyDecoder[Env, Err, Out],
driver: ZClient.Driver[Env, ReqEnv, Err],
) extends HeaderOps[ZClient[Env, ReqEnv, In, Err, Out]] { self =>
@nowarn("cat=deprecation")
def apply(request: Request)(implicit ev: Body <:< In, trace: Trace): ZIO[Env & ReqEnv, Err, Out] =
self.request(request)

Expand Down Expand Up @@ -175,6 +178,7 @@ final case class ZClient[-Env, ReqEnv, -In, +Err, +Out](
): ZClient[Env, ReqEnv, In, Err2, Out] =
transform(bodyEncoder.refineOrDie(pf), bodyDecoder.refineOrDie(pf), driver.refineOrDie(pf))

@deprecated("Use `batched` or `streaming` instead", since = "3.0.0")
def request(request: Request)(implicit ev: Body <:< In, trace: Trace): ZIO[Env & ReqEnv, Err, Out] = {
def makeRequest(body: Body) = {
driver.request(
Expand Down Expand Up @@ -233,11 +237,33 @@ final case class ZClient[-Env, ReqEnv, -In, +Err, +Out](
ev2: ReqEnv =:= Scope,
): ZStream[R & Env, E0, A] = ZStream.unwrapScoped[R & Env] {
self
.request(request)
.streaming(request)
.asInstanceOf[ZIO[R & Env & Scope, Err, Out]]
.fold(ZStream.fail(_), f)
}

def streaming(
request: Request,
)(implicit ev: Body <:< In, trace: Trace, ev1: ReqEnv =:= Scope): ZIO[Env & ReqEnv, Err, Out] = {
def makeRequest(body: Body) = {
driver.request(
self.version ++ request.version,
request.method,
self.url ++ request.url,
self.headers ++ request.headers,
body,
sslConfig,
proxy,
)
}
if (bodyEncoder == ZClient.BodyEncoder.identity)
bodyDecoder.decodeZIO(makeRequest(request.body))
else
bodyEncoder
.encode(ev(request.body))
.flatMap(body => bodyDecoder.decodeZIO(makeRequest(body)))
}

def ssl(ssl: ClientSSLConfig): ZClient[Env, ReqEnv, In, Err, Out] =
copy(sslConfig = Some(ssl))

Expand Down Expand Up @@ -279,7 +305,7 @@ object ZClient extends ZClientPlatformSpecific {
* memory, allowing to stream response bodies
*/
def batched(request: Request)(implicit trace: Trace): ZIO[Client, Throwable, Response] =
ZIO.serviceWithZIO[Client](_.batched.request(request))
ZIO.serviceWithZIO[Client](_.batched(request))

def fromDriver[Env, ReqEnv, Err](driver: Driver[Env, ReqEnv, Err]): ZClient[Env, ReqEnv, Body, Err, Response] =
ZClient(
Expand Down Expand Up @@ -312,7 +338,7 @@ object ZClient extends ZClientPlatformSpecific {
* request's resources (i.e., `Scope`)
*/
def streaming(request: Request)(implicit trace: Trace): ZIO[Client & Scope, Throwable, Response] =
ZIO.serviceWithZIO[Client](_.request(request))
ZIO.serviceWithZIO[Client](_.batched(request))

/**
* Executes an HTTP request, and transforms the response to a `ZStream` using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[endpoint] final case class EndpointClient[P, I, E, O, A <: AuthType](
for {
authInput <- authProvider
config <- CodecConfig.codecRef.get
response <- client.request(withDefaultAcceptHeader(config, authInput)).orDie
response <- client.batched(withDefaultAcceptHeader(config, authInput)).orDie
} yield response

requested.flatMap { response =>
Expand Down

0 comments on commit 3775973

Please sign in to comment.