Skip to content

Commit

Permalink
Make EndpointExecutor work with batched clients (zio#3187)
Browse files Browse the repository at this point in the history
  • Loading branch information
987Nabil committed Feb 18, 2025
1 parent f3334d6 commit 8b229d8
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object EndpointExamples extends ZIOAppDefault {
val locator =
EndpointLocator.fromURL(URL.decode("http://localhost:8080").toOption.get)

val executor: EndpointExecutor[Any, Unit] =
val executor: EndpointExecutor[Any, Unit, Scope] =
EndpointExecutor(client, locator)

val x1: Invocation[Int, Int, ZNothing, Int, None] = getUser(42)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ object TestCliClient extends zio.ZIOAppDefault with TestCliEndpoints {
Client.default,
)

def clientExample: URIO[EndpointExecutor[Any, Unit], Unit] =
def clientExample: URIO[EndpointExecutor[Any, Unit, Scope], Unit] =
for {
executor <- ZIO.service[EndpointExecutor[Any, Unit]]
executor <- ZIO.service[EndpointExecutor[Any, Unit, Scope]]
_ <- ZIO.scoped(executor(getUser(42, Location.parse("some-location").toOption.get))).debug("result1")
_ <- ZIO.scoped(executor(getUserPosts(42, 200, "adam")).debug("result2"))
_ <- ZIO.scoped(executor(createUser(User(2, "john", Some("[email protected]"))))).debug("result3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object UnionRoundtripSpec extends ZIOHttpSpec {
port <- Server.install(route)
executorLayer = ZLayer(ZIO.service[Client].map(makeExecutor(_, port)))
out <- ZIO
.service[EndpointExecutor[Any, Unit]]
.service[EndpointExecutor[Any, Unit, Scope]]
.flatMap { executor =>
executor.apply(endpoint.apply(in))
}
Expand Down
37 changes: 19 additions & 18 deletions zio-http/jvm/src/test/scala/zio/http/endpoint/RoundtripSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object RoundtripSpec extends ZIOHttpSpec {

implicit val outsSchema: Schema[Outs] = DeriveSchema.gen[Outs]

def makeExecutor(client: Client, port: Int) = {
def makeExecutor(client: ZClient[Any, Any, Body, Throwable, Response], port: Int) = {
val locator = EndpointLocator.fromURL(
URL.decode(s"http://localhost:$port").toOption.get,
)
Expand All @@ -91,18 +91,18 @@ object RoundtripSpec extends ZIOHttpSpec {
route: Routes[Any, Nothing],
in: In,
out: Out,
): ZIO[Client with Server with Scope, Err, TestResult] =
): ZIO[ZClient[Any, Any, Body, Throwable, Response] with Server with Scope, Err, TestResult] =
testEndpointZIO(endpoint, route, in, outF = { (value: Out) => assert(out)(equalTo(value)) })

def testEndpointZIO[P, In, Err, Out](
endpoint: Endpoint[P, In, Err, Out, AuthType.None],
route: Routes[Any, Nothing],
in: In,
outF: Out => ZIO[Any, Err, TestResult],
): zio.ZIO[Server with Client with Scope, Err, TestResult] =
): zio.ZIO[Server with ZClient[Any, Any, Body, Throwable, Response] with Scope, Err, TestResult] =
for {
port <- Server.install(route @@ Middleware.requestLogging())
client <- ZIO.service[Client]
client <- ZIO.service[ZClient[Any, Any, Body, Throwable, Response]]
executor = makeExecutor(client, port)
out <- executor(endpoint.apply(in))
result <- outF(out)
Expand All @@ -112,11 +112,11 @@ object RoundtripSpec extends ZIOHttpSpec {
route: Routes[Any, Nothing],
in: Request,
outF: Response => ZIO[Any, Err, TestResult],
): zio.ZIO[Server with Client with Scope, Err, TestResult] = {
): zio.ZIO[Server with ZClient[Any, Any, Body, Throwable, Response] with Scope, Err, TestResult] = {
for {
port <- Server.install(route @@ Middleware.requestLogging())
client <- ZIO.service[Client]
out <- client.batched(in.updateURL(_.host("localhost").port(port))).orDie
client <- ZIO.service[ZClient[Any, Any, Body, Throwable, Response]]
out <- client(in.updateURL(_.host("localhost").port(port))).orDie
result <- outF(out)
} yield result
}
Expand All @@ -126,24 +126,24 @@ object RoundtripSpec extends ZIOHttpSpec {
route: Routes[Any, Nothing],
in: In,
err: Err,
): ZIO[Client with Server with Scope, Out, TestResult] =
): ZIO[ZClient[Any, Any, Body, Throwable, Response] with Server with Scope, Out, TestResult] =
testEndpointErrorZIO(endpoint, route, in, errorF = { (value: Err) => assert(err)(equalTo(value)) })

def testEndpointErrorZIO[P, In, Err, Out](
endpoint: Endpoint[P, In, Err, Out, AuthType.None],
route: Routes[Any, Nothing],
in: In,
errorF: Err => ZIO[Any, Nothing, TestResult],
): ZIO[Client with Server with Scope, Out, TestResult] =
): ZIO[ZClient[Any, Any, Body, Throwable, Response] with Server with Scope, Out, TestResult] =
for {
port <- Server.install(route)
executorLayer = ZLayer(ZIO.service[Client].map(makeExecutor(_, port)))
executorLayer = ZLayer(ZIO.service[ZClient[Any, Any, Body, Throwable, Response]].map(makeExecutor(_, port)))
out <- ZIO
.service[EndpointExecutor[Any, Unit]]
.service[EndpointExecutor[Any, Unit, Any]]
.flatMap { executor =>
executor.apply(endpoint.apply(in))
}
.provideSome[Client with Scope](executorLayer)
.provideSome[ZClient[Any, Any, Body, Throwable, Response] with Scope](executorLayer)
.flip
result <- errorF(out)
} yield result
Expand All @@ -154,7 +154,7 @@ object RoundtripSpec extends ZIOHttpSpec {
string: String,
strings: Chunk[String] = Chunk("defaultString"),
)
implicit val paramsSchema: Schema[Params] = DeriveSchema.gen[Params]
implicit val paramsSchema: Schema[Params] = DeriveSchema.gen[Params]

def spec: Spec[Any, Any] =
suite("RoundtripSpec")(
Expand Down Expand Up @@ -315,7 +315,7 @@ object RoundtripSpec extends ZIOHttpSpec {

for {
port <- Server.install(handler.toRoutes)
client <- ZIO.service[Client]
client <- ZIO.service[ZClient[Any, Any, Body, Throwable, Response]]
response <- client(
Request.post(
url = URL.decode(s"http://localhost:$port/123/xyz/456/abc?details=789").toOption.get,
Expand Down Expand Up @@ -462,13 +462,13 @@ object RoundtripSpec extends ZIOHttpSpec {

for {
port <- Server.install(routes)
executorLayer = ZLayer(ZIO.serviceWith[Client](makeExecutor(_, port)))
executorLayer = ZLayer(ZIO.serviceWith[ZClient[Any, Any, Body, Throwable, Response]](makeExecutor(_, port)))

cause <- ZIO
.serviceWithZIO[EndpointExecutor[Any, Unit]] { executor =>
.serviceWithZIO[EndpointExecutor[Any, Unit, Any]] { executor =>
executor.apply(endpointWithAnotherSignature.apply(42))
}
.provideSome[Client with Scope](executorLayer)
.provideSome[ZClient[Any, Any, Body, Throwable, Response] with Scope](executorLayer)
.cause
} yield assertTrue(
cause.prettyPrint.contains(
Expand Down Expand Up @@ -564,7 +564,8 @@ object RoundtripSpec extends ZIOHttpSpec {
).provide(
Server.customized,
ZLayer.succeed(Server.Config.default.onAnyOpenPort.enableRequestStreaming),
Client.customized.map(env => ZEnvironment(env.get @@ clientDebugAspect)),
Client.customized.map(env => ZEnvironment(env.get @@ clientDebugAspect)) >>>
ZLayer(ZIO.serviceWith[Client](_.batched)),
ClientDriver.shared,
// NettyDriver.customized,
ZLayer.succeed(NettyConfig.defaultWithFastShutdown),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import zio.http.endpoint.internal.EndpointClient
* endpoint invocation, and executing the invocation, returning the final
* result, or failing with a pre-defined RPC error.
*/
final case class EndpointExecutor[R, Auth](
client: Client,
final case class EndpointExecutor[R, Auth, ReqEnv](
client: ZClient[Any, ReqEnv, Body, Throwable, Response],
locator: EndpointLocator,
authProvider: ZIO[R, Nothing, Auth],
) {
Expand Down Expand Up @@ -63,7 +63,7 @@ final case class EndpointExecutor[R, Auth](
combiner: Combiner[I, invocation.endpoint.authType.ClientRequirement],
ev: Auth <:< invocation.endpoint.authType.ClientRequirement,
trace: Trace,
): ZIO[R with Scope, E, B] = {
): ZIO[R & ReqEnv, E, B] = {
getClient(invocation.endpoint).orDie.flatMap { endpointClient =>
endpointClient.execute(
client,
Expand All @@ -80,7 +80,7 @@ final case class EndpointExecutor[R, Auth](
invocation: Invocation[P, I, E, B, AuthType.None],
)(implicit
trace: Trace,
): ZIO[Scope, E, B] = {
): ZIO[ReqEnv, E, B] = {
getClient(invocation.endpoint).orDie.flatMap { endpointClient =>
endpointClient.execute(client, invocation, ZIO.unit)(
Combiner.rightUnit[I].asInstanceOf[Combiner[I, endpointClient.endpoint.authType.ClientRequirement]],
Expand All @@ -90,12 +90,19 @@ final case class EndpointExecutor[R, Auth](
}
}
object EndpointExecutor {
def apply(client: Client, locator: EndpointLocator): EndpointExecutor[Any, Unit] =
def apply[ReqEnv](
client: ZClient[Any, ReqEnv, Body, Throwable, Response],
locator: EndpointLocator,
): EndpointExecutor[Any, Unit, ReqEnv] =
EndpointExecutor(client, locator, ZIO.unit)

def apply[Auth](client: Client, locator: EndpointLocator, auth: Auth)(implicit
def apply[Auth, ReqEnv](
client: ZClient[Any, ReqEnv, Body, Throwable, Response],
locator: EndpointLocator,
auth: Auth,
)(implicit
trace: Trace,
): EndpointExecutor[Any, Auth] =
): EndpointExecutor[Any, Auth, ReqEnv] =
EndpointExecutor(client, locator, ZIO.succeed(auth))

final case class Config(url: URL)
Expand All @@ -114,15 +121,17 @@ object EndpointExecutor {

def make[R: Tag, Auth: Tag](serviceName: String, authProvider: URIO[R, Auth])(implicit
trace: Trace,
): ZLayer[Client, zio.Config.Error, EndpointExecutor[R, Auth]] =
): ZLayer[Client, zio.Config.Error, EndpointExecutor[R, Auth, Scope]] =
ZLayer {
for {
client <- ZIO.service[Client]
config <- ZIO.config(Config.config.nested(serviceName))
} yield EndpointExecutor(client, EndpointLocator.fromURL(config.url), authProvider)
}

def make(serviceName: String)(implicit trace: Trace): ZLayer[Client, zio.Config.Error, EndpointExecutor[Any, Unit]] =
def make(
serviceName: String,
)(implicit trace: Trace): ZLayer[Client, zio.Config.Error, EndpointExecutor[Any, Unit, Scope]] =
ZLayer {
for {
client <- ZIO.service[Client]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ private[endpoint] final case class EndpointClient[P, I, E, O, A <: AuthType](
endpointRoot: URL,
endpoint: Endpoint[P, I, E, O, A],
) {
def execute[R](
client: Client,
def execute[R, ReqEnv](
client: ZClient[Any, ReqEnv, Body, Throwable, Response],
invocation: Invocation[P, I, E, O, A],
authProvider: URIO[R, endpoint.authType.ClientRequirement],
)(implicit
combiner: Combiner[I, endpoint.authType.ClientRequirement],
trace: Trace,
): ZIO[R with Scope, E, O] = {
): ZIO[R & ReqEnv, E, O] = {
def request0(config: CodecConfig, authInput: endpoint.authType.ClientRequirement) = {
val input = if (authInput.isInstanceOf[Unit]) invocation.input else combiner.combine(invocation.input, authInput)
endpoint
Expand Down

0 comments on commit 8b229d8

Please sign in to comment.