Skip to content

Commit

Permalink
feat: propagate FTL headers through kotlin-runtime
Browse files Browse the repository at this point in the history
There's some weirdness at play, in that the client uses okhttp3, and the
server uses grpc.io. This appears to be how Wire works, but there's
basically zero documentation so I can't be sure.
  • Loading branch information
alecthomas committed Aug 30, 2023
1 parent e264cbc commit e7f89d5
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 8 deletions.
2 changes: 0 additions & 2 deletions backend/common/rpc/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ const (
VerbHeader = "FTL-Verb"
// RequestIDHeader is the header used to pass the inbound request ID.
RequestIDHeader = "FTL-Request-ID"
// RequestOriginHeader is the header used to pass the origin of the request.
RequestOriginHeader = "FTL-Request-Origin"
)

func IsDirectRouted(header http.Header) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package xyz.block.ftl.client
import com.squareup.wire.GrpcClient
import okhttp3.OkHttpClient
import okhttp3.Protocol
import xyz.block.ftl.server.ServerInterceptor
import xyz.block.ftl.server.ftlRequestIdHeader
import xyz.block.ftl.server.ftlVerbHeader
import java.time.Duration

internal fun makeGrpcClient(endpoint: String): GrpcClient {
Expand All @@ -13,6 +16,15 @@ internal fun makeGrpcClient(endpoint: String): GrpcClient {
.writeTimeout(Duration.ofSeconds(10))
.callTimeout(Duration.ofSeconds(10))
.protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE))
.addInterceptor { chain ->
var requestBuilder = chain.request().newBuilder()
ServerInterceptor.callers.get()?.forEach {
requestBuilder = requestBuilder.addHeader(ftlVerbHeader, it)
}
requestBuilder = requestBuilder
.addHeader(ftlRequestIdHeader, ServerInterceptor.requestId.get())
chain.proceed(requestBuilder.build())
}
.build()
)
.baseUrl(endpoint)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package xyz.block.ftl.main

import io.grpc.ServerInterceptors
import io.grpc.netty.NettyServerBuilder
import xyz.block.ftl.client.GrpcVerbServiceClient
import xyz.block.ftl.client.makeGrpcClient
import xyz.block.ftl.registry.Registry
import xyz.block.ftl.server.Server
import xyz.block.ftl.server.ServerInterceptor
import java.net.InetSocketAddress
import java.net.URL

Expand All @@ -23,7 +25,7 @@ fun main() {
val verbRoutingClient = GrpcVerbServiceClient(makeGrpcClient(ftlEndpoint))
val server = Server(registry, verbRoutingClient)
val grpcServer = NettyServerBuilder.forAddress(addr)
.addService(server)
.addService(ServerInterceptors.intercept(server, ServerInterceptor()))
.build()
grpcServer.start()
grpcServer.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import xyz.block.ftl.client.VerbServiceClient
import xyz.block.ftl.registry.Registry
import xyz.block.ftl.registry.defaultJvmModuleName
import xyz.block.ftl.registry.toModel
import xyz.block.ftl.v1.CallRequest
import xyz.block.ftl.v1.CallResponse
import xyz.block.ftl.v1.PingRequest
import xyz.block.ftl.v1.PingResponse
import xyz.block.ftl.v1.VerbServiceWireGrpc
import xyz.block.ftl.v1.*

/**
* FTL verb server.
Expand All @@ -28,6 +24,14 @@ class Server(
}

override fun Call(request: CallRequest, response: StreamObserver<CallResponse>) {
var grpcContext = io.grpc.Context.current()

// Append caller to context.
request.verb?.let {
val callers = ServerInterceptor.callers.get() + "${it.module}.${it.name}"
grpcContext = grpcContext.withValue(ServerInterceptor.callers, callers)
}

val verbRef = request.verb
if (verbRef == null) {
response.onError(IllegalArgumentException("verb is required"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package xyz.block.ftl.server

import io.grpc.*
import io.grpc.ServerInterceptor

const val ftlVerbHeader = "FTL-Verb"
const val ftlRequestIdHeader = "FTL-Request-ID"

internal class ServerInterceptor : ServerInterceptor {

companion object {
internal var callersMetadata = Metadata.Key.of(ftlVerbHeader, Metadata.ASCII_STRING_MARSHALLER)
internal var requestIdMetadata = Metadata.Key.of(ftlRequestIdHeader, Metadata.ASCII_STRING_MARSHALLER)

internal var callers = Context.key<List<String>>(ftlVerbHeader)
internal var requestId = Context.key<String>(ftlRequestIdHeader)
}

override fun <ReqT : Any?, RespT : Any?> interceptCall(
call: ServerCall<ReqT, RespT>?,
headers: Metadata?,
next: ServerCallHandler<ReqT, RespT>?
): ServerCall.Listener<ReqT> {
var context = Context.current()

headers?.getAll(callersMetadata)?.apply {
context = context.withValue(callers, this.toList())
}
headers?.get(requestIdMetadata)?.apply {
context = context.withValue(requestId, this)
}

return Contexts.interceptCall(context, call, headers, next)
}
}

0 comments on commit e7f89d5

Please sign in to comment.