Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concat helper method to chain together multiple service implementations, #59 #256

Merged
merged 1 commit into from
Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions codegen/src/main/twirl/templates/JavaServer/Handler.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,23 @@ import akka.grpc.javadsl.GrpcExceptionHandler;
import static @{service.packageName}.@{service.name}.Serializers.*;

public class @{service.name}HandlerFactory {
private static final CompletionStage<HttpResponse> notFound = CompletableFuture.completedFuture(
HttpResponse.create().withStatus(StatusCodes.NOT_FOUND));

/**
* Creates a `HttpRequest` to `HttpResponse` handler that can be used in for example
* `Http.get(system).bindAndHandleAsync`. It ends with `StatusCodes.NotFound` if the request is not matching.
*
* Use `akka.grpc.javadsl.ServiceHandler.concatOrNotFound` when combining several services.
*/
public static Function<HttpRequest, CompletionStage<HttpResponse>> create(@service.name implementation, Materializer mat) {
return (req -> {
Iterator<String> segments = req.getUri().pathSegments().iterator();
if (segments.hasNext() && segments.next().equals(@{service.name}.name) && segments.hasNext()) {
String method = segments.next();
return handle(req, method, implementation, mat).exceptionally(e -> GrpcExceptionHandler.standard(e));
}
else
{
CompletableFuture<HttpResponse> result = new CompletableFuture<>();
result.completeExceptionally(new UnsupportedOperationException("Unexpected path"));
return result;
} else {
return notFound;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed this to NOT_FOUND because I guess that's a more valid response if a single handler is used, and felt better to use that as the signal to continue with next handler in concat rather than using exception

}
});
}
Expand Down
21 changes: 20 additions & 1 deletion codegen/src/main/twirl/templates/ScalaServer/Handler.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,26 @@ import akka.http.scaladsl.model.Uri.Path.Segment
import akka.stream.Materializer

object @{service.name}Handler {
def apply(implementation: @service.name)(implicit mat: Materializer): PartialFunction[HttpRequest, Future[HttpResponse]] = {
private val notFound = Future.successful(HttpResponse(StatusCodes.NotFound))

/**
* Creates a `HttpRequest` to `HttpResponse` handler that can be used in for example `Http().bindAndHandleAsync`
* for the generated partial function handler and ends with `StatusCodes.NotFound` if the request is not matching.
*
* Use `akka.grpc.scaladsl.ServiceHandler.concatOrNotFound` with `@{service.name}Handler.partial` when combining
* several services.
*/
def apply(implementation: @service.name)(implicit mat: Materializer): HttpRequest => Future[HttpResponse] =
partial(implementation).orElse { case _ => notFound }

/**
* Creates a partial `HttpRequest` to `HttpResponse` handler that can be combined with handlers of other
* services with `akka.grpc.scaladsl.ServiceHandler.concatOrNotFound` and then used in for example
* `Http().bindAndHandleAsync`.
*
* Use `@{service.name}Handler.apply` if the server is only handling one service.
*/
def partial(implementation: @service.name)(implicit mat: Materializer): PartialFunction[HttpRequest, Future[HttpResponse]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

implicit val ec: ExecutionContext = mat.executionContext
import @{service.name}.Serializers._

Expand Down
24 changes: 22 additions & 2 deletions docs/src/main/paradox/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ For a complete overview of the configuration options see the chapter for your bu

## Generate and implement

### Proto

Define the interfaces you want to implement in your project's
@sbt[`src/main/protobuf`]@gradle[`src/main/proto`]@maven[`src/main/proto`] file(s).

Expand Down Expand Up @@ -129,6 +131,8 @@ Maven
mvn akka-grpc:generate
```

### Service implementation

Implement the methods of the service interface in a new class:

Scala
Expand All @@ -137,9 +141,11 @@ Scala
Java
: @@snip [GreeterServiceImpl.java]($root$/../plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServiceImpl.java) { #full-service-impl }

### Server

That service can then be handled by an Akka HTTP server via the generated `GreeterServiceHandler`,
which is a @scala[partial ]function from `HttpRequest` to @scala[`Future[HttpResponse]`]@java[`CompletionStage<HttpResponse>`].
@scala[The partial function should be made total before giving it to Akka HTTP by for example providing 404 as as default response].
which is a function from `HttpRequest` to @scala[`Future[HttpResponse]`]@java[`CompletionStage<HttpResponse>`].
It returns 404 as response for requests that don't match the path of the service.

The server will run one instance of the implementation and that is then shared between requests,
this mean that it must be thread safe. In the sample above there is no mutable state, for more about safely implementing
Expand All @@ -159,6 +165,20 @@ Note that it's important to enable HTTP/2 in the configuration of the `ActorSyst
akka.http.server.preview.enable-http2 = on
```

# Multiple services

When a server handles several services the handlers must be combined with
@scala[`akka.grpc.scaladsl.ServiceHandler.concatOrNotFound`]@java[`akka.grpc.javadsl.ServiceHandler.concatOrNotFound`]

Scala
: @@snip [GreeterServiceImpl.scala]($root$/../plugin-tester-scala/src/main/scala/example/myapp/CombinedServer.scala) { #concatOrNotFound }

Java
: @@snip [GreeterServiceImpl.java]($root$/../plugin-tester-java/src/main/java/example/myapp/CombinedServer.java) { #import #concatOrNotFound }

@scala[Note that the `GreeterServiceHandler.partial` and `EchoServiceHandler.partial` are used instead of `apply`
methods to create partial functions that are combined by concatOrNotFound.]

## Running

To run the server with HTTP/2 enabled correctly, you will likely have to configure the Jetty ALPN
Expand Down
112 changes: 112 additions & 0 deletions plugin-tester-java/src/main/java/example/myapp/CombinedServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package example.myapp;

import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectWithHttps;
import akka.http.javadsl.ConnectionContext;
import akka.http.javadsl.HttpsConnectionContext;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyFactory;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.security.spec.PKCS8EncodedKeySpec;
import java.util.Base64;
import java.util.concurrent.CompletionStage;

//#import
import akka.grpc.javadsl.ServiceHandler;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.Function;

//#import

import example.myapp.helloworld.*;
import example.myapp.helloworld.grpc.*;
import example.myapp.echo.*;
import example.myapp.echo.grpc.*;

class CombinedServer {
public static void main(String[] args) throws Exception {
// important to enable HTTP/2 in ActorSystem's config
Config conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication());
ActorSystem sys = ActorSystem.create("HelloWorld", conf);
Materializer mat = ActorMaterializer.create(sys);

//#concatOrNotFound
Function<HttpRequest, CompletionStage<HttpResponse>> greeterService =
GreeterServiceHandlerFactory.create(new GreeterServiceImpl(mat), mat);
Function<HttpRequest, CompletionStage<HttpResponse>> echoService =
EchoServiceHandlerFactory.create(new EchoServiceImpl(), mat);
Function<HttpRequest, CompletionStage<HttpResponse>> serviceHandlers =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

ServiceHandler.concatOrNotFound(greeterService, echoService);

Http.get(sys).bindAndHandleAsync(
serviceHandlers,
ConnectWithHttps.toHostHttps("127.0.0.1", 8080).withCustomHttpsContext(serverHttpContext()),
mat)
//#concatOrNotFound
.thenAccept(binding -> {
System.out.println("gRPC server bound to: " + binding.localAddress());
});
}

private static HttpsConnectionContext serverHttpContext() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a few of these :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, finding a nicer alternative for that is part of #89

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it should hurt your eyes so that we make it happen :)

// FIXME how would end users do this? TestUtils.loadCert? issue #89
String keyEncoded = read(CombinedServer.class.getResourceAsStream("/certs/server1.key"))
.replace("-----BEGIN PRIVATE KEY-----\n", "")
.replace("-----END PRIVATE KEY-----\n", "")
.replace("\n", "");

byte[] decodedKey = Base64.getDecoder().decode(keyEncoded);

PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(decodedKey);

KeyFactory kf = KeyFactory.getInstance("RSA");
PrivateKey privateKey = kf.generatePrivate(spec);

CertificateFactory fact = CertificateFactory.getInstance("X.509");
Certificate cer = fact.generateCertificate(CombinedServer.class.getResourceAsStream("/certs/server1.pem"));

KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(null);
ks.setKeyEntry("private", privateKey, new char[0], new Certificate[]{cer});

KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
keyManagerFactory.init(ks, null);

SSLContext context = SSLContext.getInstance("TLS");
context.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom());

return ConnectionContext.https(context);
}

private static String read(InputStream in) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.max(64, in.available()));
byte[] buffer = new byte[32 * 1024];
int bytesRead = in.read(buffer);
while (bytesRead >= 0) {
baos.write(buffer, 0, bytesRead);
bytesRead = in.read(buffer);
}

byte[] bytes = baos.toByteArray();
return new String(bytes, "UTF-8");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package example.myapp.echo;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

import akka.NotUsed;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import example.myapp.echo.grpc.*;

public class EchoServiceImpl implements EchoService {

@Override
public CompletionStage<EchoMessage> echo(EchoMessage in) {
return CompletableFuture.completedFuture(in);
}
}

15 changes: 15 additions & 0 deletions plugin-tester-java/src/main/proto/echo.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "example.myapp.echo.grpc";

package echo;

// The greeting service definition.
service EchoService {
rpc Echo (EchoMessage) returns (EchoMessage) {}
}

message EchoMessage {
string payload = 1;
}
15 changes: 15 additions & 0 deletions plugin-tester-scala/src/main/protobuf/echo.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "example.myapp.echo.grpc";

package echo;

// The greeting service definition.
service EchoService {
rpc Echo (EchoMessage) returns (EchoMessage) {}
}

message EchoMessage {
string payload = 1;
}
117 changes: 117 additions & 0 deletions plugin-tester-scala/src/main/scala/example/myapp/CombinedServer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package example.myapp

import java.io.{ByteArrayOutputStream, FileInputStream, InputStream}
import java.nio.file.Files
import java.nio.file.Paths
import java.security.KeyFactory
import java.security.KeyStore
import java.security.SecureRandom
import java.security.cert.CertificateFactory
import java.security.spec.PKCS8EncodedKeySpec
import java.util.Base64

import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SSLContext

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import akka.actor.ActorSystem
import akka.grpc.scaladsl.ServiceHandler
import akka.http.scaladsl.Http
import akka.http.scaladsl.HttpsConnectionContext
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.stream.ActorMaterializer
import akka.stream.Materializer
import com.typesafe.config.ConfigFactory

import example.myapp.helloworld._
import example.myapp.helloworld.grpc._

import example.myapp.echo._
import example.myapp.echo.grpc._

//#concatOrNotFound
import akka.grpc.scaladsl.ServiceHandler

//#concatOrNotFound

object CombinedServer {

def main(args: Array[String]): Unit = {
// important to enable HTTP/2 in ActorSystem's config
val conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication())
implicit val sys: ActorSystem = ActorSystem("HelloWorld", conf)
implicit val mat: Materializer = ActorMaterializer()
implicit val ec: ExecutionContext = sys.dispatcher

//#concatOrNotFound
// explicit types not needed but included in example for clarity
val greeterService: PartialFunction[HttpRequest,Future[HttpResponse]] =
GreeterServiceHandler.partial(new GreeterServiceImpl(mat))
val echoService: PartialFunction[HttpRequest,Future[HttpResponse]] =
EchoServiceHandler.partial(new EchoServiceImpl)
val serviceHandlers: HttpRequest => Future[HttpResponse] =
ServiceHandler.concatOrNotFound(greeterService, echoService)

Http().bindAndHandleAsync(
serviceHandlers,
interface = "127.0.0.1",
port = 8080,
connectionContext = serverHttpContext()
)
//#concatOrNotFound
.foreach { binding =>
println(s"gRPC server bound to: ${binding.localAddress}")
}
}

private def serverHttpContext(): HttpsConnectionContext = {
// FIXME how would end users do this? TestUtils.loadCert? issue #89
val keyEncoded = read(CombinedServer.getClass.getResourceAsStream("/certs/server1.key"))
.replace("-----BEGIN PRIVATE KEY-----\n", "")
.replace("-----END PRIVATE KEY-----\n", "")
.replace("\n", "")

val decodedKey = Base64.getDecoder.decode(keyEncoded)

val spec = new PKCS8EncodedKeySpec(decodedKey)

val kf = KeyFactory.getInstance("RSA")
val privateKey = kf.generatePrivate(spec)

val fact = CertificateFactory.getInstance("X.509")
val cer = fact.generateCertificate(CombinedServer.getClass.getResourceAsStream("/certs/server1.pem"))

val ks = KeyStore.getInstance("PKCS12")
ks.load(null)
ks.setKeyEntry("private", privateKey, Array.empty, Array(cer))

val keyManagerFactory = KeyManagerFactory.getInstance("SunX509")
keyManagerFactory.init(ks, null)

val context = SSLContext.getInstance("TLS")
context.init(keyManagerFactory.getKeyManagers, null, new SecureRandom)

new HttpsConnectionContext(context)
}

private def read(in: InputStream): String = {
val bytes: Array[Byte] = {
val baos = new ByteArrayOutputStream(math.max(64, in.available()))
val buffer = Array.ofDim[Byte](32 * 1024)

var bytesRead = in.read(buffer)
while (bytesRead >= 0) {
baos.write(buffer, 0, bytesRead)
bytesRead = in.read(buffer)
}
baos.toByteArray
}
new String(bytes, "UTF-8")
}

}

Loading