-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
concat helper method to chain together multiple service implementations, #59 #256
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,26 @@ import akka.http.scaladsl.model.Uri.Path.Segment | |
import akka.stream.Materializer | ||
|
||
object @{service.name}Handler { | ||
def apply(implementation: @service.name)(implicit mat: Materializer): PartialFunction[HttpRequest, Future[HttpResponse]] = { | ||
private val notFound = Future.successful(HttpResponse(StatusCodes.NotFound)) | ||
|
||
/** | ||
* Creates a `HttpRequest` to `HttpResponse` handler that can be used in for example `Http().bindAndHandleAsync` | ||
* for the generated partial function handler and ends with `StatusCodes.NotFound` if the request is not matching. | ||
* | ||
* Use `akka.grpc.scaladsl.ServiceHandler.concatOrNotFound` with `@{service.name}Handler.partial` when combining | ||
* several services. | ||
*/ | ||
def apply(implementation: @service.name)(implicit mat: Materializer): HttpRequest => Future[HttpResponse] = | ||
partial(implementation).orElse { case _ => notFound } | ||
|
||
/** | ||
* Creates a partial `HttpRequest` to `HttpResponse` handler that can be combined with handlers of other | ||
* services with `akka.grpc.scaladsl.ServiceHandler.concatOrNotFound` and then used in for example | ||
* `Http().bindAndHandleAsync`. | ||
* | ||
* Use `@{service.name}Handler.apply` if the server is only handling one service. | ||
*/ | ||
def partial(implementation: @service.name)(implicit mat: Materializer): PartialFunction[HttpRequest, Future[HttpResponse]] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
implicit val ec: ExecutionContext = mat.executionContext | ||
import @{service.name}.Serializers._ | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
package example.myapp; | ||
|
||
import akka.actor.ActorSystem; | ||
import akka.http.javadsl.ConnectWithHttps; | ||
import akka.http.javadsl.ConnectionContext; | ||
import akka.http.javadsl.HttpsConnectionContext; | ||
import akka.stream.ActorMaterializer; | ||
import akka.stream.Materializer; | ||
import com.typesafe.config.Config; | ||
import com.typesafe.config.ConfigFactory; | ||
|
||
import javax.net.ssl.KeyManagerFactory; | ||
import javax.net.ssl.SSLContext; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.FileInputStream; | ||
import java.nio.file.Files; | ||
import java.nio.file.Paths; | ||
import java.security.KeyFactory; | ||
import java.security.KeyStore; | ||
import java.security.PrivateKey; | ||
import java.security.SecureRandom; | ||
import java.security.cert.Certificate; | ||
import java.security.cert.CertificateFactory; | ||
import java.security.spec.PKCS8EncodedKeySpec; | ||
import java.util.Base64; | ||
import java.util.concurrent.CompletionStage; | ||
|
||
//#import | ||
import akka.grpc.javadsl.ServiceHandler; | ||
import akka.http.javadsl.Http; | ||
import akka.http.javadsl.model.HttpRequest; | ||
import akka.http.javadsl.model.HttpResponse; | ||
import akka.japi.Function; | ||
|
||
//#import | ||
|
||
import example.myapp.helloworld.*; | ||
import example.myapp.helloworld.grpc.*; | ||
import example.myapp.echo.*; | ||
import example.myapp.echo.grpc.*; | ||
|
||
class CombinedServer { | ||
public static void main(String[] args) throws Exception { | ||
// important to enable HTTP/2 in ActorSystem's config | ||
Config conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on") | ||
.withFallback(ConfigFactory.defaultApplication()); | ||
ActorSystem sys = ActorSystem.create("HelloWorld", conf); | ||
Materializer mat = ActorMaterializer.create(sys); | ||
|
||
//#concatOrNotFound | ||
Function<HttpRequest, CompletionStage<HttpResponse>> greeterService = | ||
GreeterServiceHandlerFactory.create(new GreeterServiceImpl(mat), mat); | ||
Function<HttpRequest, CompletionStage<HttpResponse>> echoService = | ||
EchoServiceHandlerFactory.create(new EchoServiceImpl(), mat); | ||
Function<HttpRequest, CompletionStage<HttpResponse>> serviceHandlers = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
ServiceHandler.concatOrNotFound(greeterService, echoService); | ||
|
||
Http.get(sys).bindAndHandleAsync( | ||
serviceHandlers, | ||
ConnectWithHttps.toHostHttps("127.0.0.1", 8080).withCustomHttpsContext(serverHttpContext()), | ||
mat) | ||
//#concatOrNotFound | ||
.thenAccept(binding -> { | ||
System.out.println("gRPC server bound to: " + binding.localAddress()); | ||
}); | ||
} | ||
|
||
private static HttpsConnectionContext serverHttpContext() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a few of these :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, finding a nicer alternative for that is part of #89 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it should hurt your eyes so that we make it happen :) |
||
// FIXME how would end users do this? TestUtils.loadCert? issue #89 | ||
String keyEncoded = read(CombinedServer.class.getResourceAsStream("/certs/server1.key")) | ||
.replace("-----BEGIN PRIVATE KEY-----\n", "") | ||
.replace("-----END PRIVATE KEY-----\n", "") | ||
.replace("\n", ""); | ||
|
||
byte[] decodedKey = Base64.getDecoder().decode(keyEncoded); | ||
|
||
PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(decodedKey); | ||
|
||
KeyFactory kf = KeyFactory.getInstance("RSA"); | ||
PrivateKey privateKey = kf.generatePrivate(spec); | ||
|
||
CertificateFactory fact = CertificateFactory.getInstance("X.509"); | ||
Certificate cer = fact.generateCertificate(CombinedServer.class.getResourceAsStream("/certs/server1.pem")); | ||
|
||
KeyStore ks = KeyStore.getInstance("PKCS12"); | ||
ks.load(null); | ||
ks.setKeyEntry("private", privateKey, new char[0], new Certificate[]{cer}); | ||
|
||
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509"); | ||
keyManagerFactory.init(ks, null); | ||
|
||
SSLContext context = SSLContext.getInstance("TLS"); | ||
context.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom()); | ||
|
||
return ConnectionContext.https(context); | ||
} | ||
|
||
private static String read(InputStream in) throws IOException { | ||
ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.max(64, in.available())); | ||
byte[] buffer = new byte[32 * 1024]; | ||
int bytesRead = in.read(buffer); | ||
while (bytesRead >= 0) { | ||
baos.write(buffer, 0, bytesRead); | ||
bytesRead = in.read(buffer); | ||
} | ||
|
||
byte[] bytes = baos.toByteArray(); | ||
return new String(bytes, "UTF-8"); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package example.myapp.echo; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionStage; | ||
import java.util.stream.Collectors; | ||
|
||
import akka.NotUsed; | ||
import akka.stream.Materializer; | ||
import akka.stream.javadsl.Sink; | ||
import akka.stream.javadsl.Source; | ||
|
||
import example.myapp.echo.grpc.*; | ||
|
||
public class EchoServiceImpl implements EchoService { | ||
|
||
@Override | ||
public CompletionStage<EchoMessage> echo(EchoMessage in) { | ||
return CompletableFuture.completedFuture(in); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
syntax = "proto3"; | ||
|
||
option java_multiple_files = true; | ||
option java_package = "example.myapp.echo.grpc"; | ||
|
||
package echo; | ||
|
||
// The greeting service definition. | ||
service EchoService { | ||
rpc Echo (EchoMessage) returns (EchoMessage) {} | ||
} | ||
|
||
message EchoMessage { | ||
string payload = 1; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
syntax = "proto3"; | ||
|
||
option java_multiple_files = true; | ||
option java_package = "example.myapp.echo.grpc"; | ||
|
||
package echo; | ||
|
||
// The greeting service definition. | ||
service EchoService { | ||
rpc Echo (EchoMessage) returns (EchoMessage) {} | ||
} | ||
|
||
message EchoMessage { | ||
string payload = 1; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package example.myapp | ||
|
||
import java.io.{ByteArrayOutputStream, FileInputStream, InputStream} | ||
import java.nio.file.Files | ||
import java.nio.file.Paths | ||
import java.security.KeyFactory | ||
import java.security.KeyStore | ||
import java.security.SecureRandom | ||
import java.security.cert.CertificateFactory | ||
import java.security.spec.PKCS8EncodedKeySpec | ||
import java.util.Base64 | ||
|
||
import javax.net.ssl.KeyManagerFactory | ||
import javax.net.ssl.SSLContext | ||
|
||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.Future | ||
import akka.actor.ActorSystem | ||
import akka.grpc.scaladsl.ServiceHandler | ||
import akka.http.scaladsl.Http | ||
import akka.http.scaladsl.HttpsConnectionContext | ||
import akka.http.scaladsl.model.StatusCodes | ||
import akka.http.scaladsl.model.HttpRequest | ||
import akka.http.scaladsl.model.HttpResponse | ||
import akka.stream.ActorMaterializer | ||
import akka.stream.Materializer | ||
import com.typesafe.config.ConfigFactory | ||
|
||
import example.myapp.helloworld._ | ||
import example.myapp.helloworld.grpc._ | ||
|
||
import example.myapp.echo._ | ||
import example.myapp.echo.grpc._ | ||
|
||
//#concatOrNotFound | ||
import akka.grpc.scaladsl.ServiceHandler | ||
|
||
//#concatOrNotFound | ||
|
||
object CombinedServer { | ||
|
||
def main(args: Array[String]): Unit = { | ||
// important to enable HTTP/2 in ActorSystem's config | ||
val conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on") | ||
.withFallback(ConfigFactory.defaultApplication()) | ||
implicit val sys: ActorSystem = ActorSystem("HelloWorld", conf) | ||
implicit val mat: Materializer = ActorMaterializer() | ||
implicit val ec: ExecutionContext = sys.dispatcher | ||
|
||
//#concatOrNotFound | ||
// explicit types not needed but included in example for clarity | ||
val greeterService: PartialFunction[HttpRequest,Future[HttpResponse]] = | ||
GreeterServiceHandler.partial(new GreeterServiceImpl(mat)) | ||
val echoService: PartialFunction[HttpRequest,Future[HttpResponse]] = | ||
EchoServiceHandler.partial(new EchoServiceImpl) | ||
val serviceHandlers: HttpRequest => Future[HttpResponse] = | ||
ServiceHandler.concatOrNotFound(greeterService, echoService) | ||
|
||
Http().bindAndHandleAsync( | ||
serviceHandlers, | ||
interface = "127.0.0.1", | ||
port = 8080, | ||
connectionContext = serverHttpContext() | ||
) | ||
//#concatOrNotFound | ||
.foreach { binding => | ||
println(s"gRPC server bound to: ${binding.localAddress}") | ||
} | ||
} | ||
|
||
private def serverHttpContext(): HttpsConnectionContext = { | ||
// FIXME how would end users do this? TestUtils.loadCert? issue #89 | ||
val keyEncoded = read(CombinedServer.getClass.getResourceAsStream("/certs/server1.key")) | ||
.replace("-----BEGIN PRIVATE KEY-----\n", "") | ||
.replace("-----END PRIVATE KEY-----\n", "") | ||
.replace("\n", "") | ||
|
||
val decodedKey = Base64.getDecoder.decode(keyEncoded) | ||
|
||
val spec = new PKCS8EncodedKeySpec(decodedKey) | ||
|
||
val kf = KeyFactory.getInstance("RSA") | ||
val privateKey = kf.generatePrivate(spec) | ||
|
||
val fact = CertificateFactory.getInstance("X.509") | ||
val cer = fact.generateCertificate(CombinedServer.getClass.getResourceAsStream("/certs/server1.pem")) | ||
|
||
val ks = KeyStore.getInstance("PKCS12") | ||
ks.load(null) | ||
ks.setKeyEntry("private", privateKey, Array.empty, Array(cer)) | ||
|
||
val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") | ||
keyManagerFactory.init(ks, null) | ||
|
||
val context = SSLContext.getInstance("TLS") | ||
context.init(keyManagerFactory.getKeyManagers, null, new SecureRandom) | ||
|
||
new HttpsConnectionContext(context) | ||
} | ||
|
||
private def read(in: InputStream): String = { | ||
val bytes: Array[Byte] = { | ||
val baos = new ByteArrayOutputStream(math.max(64, in.available())) | ||
val buffer = Array.ofDim[Byte](32 * 1024) | ||
|
||
var bytesRead = in.read(buffer) | ||
while (bytesRead >= 0) { | ||
baos.write(buffer, 0, bytesRead) | ||
bytesRead = in.read(buffer) | ||
} | ||
baos.toByteArray | ||
} | ||
new String(bytes, "UTF-8") | ||
} | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed this to NOT_FOUND because I guess that's a more valid response if a single handler is used, and felt better to use that as the signal to continue with next handler in
concat
rather than using exception