From 2d1dc26e44fb8e1f148b4eabc146fa8e32deb238 Mon Sep 17 00:00:00 2001 From: iamricard Date: Wed, 22 May 2024 12:58:30 +0200 Subject: [PATCH] grpc-web: adds support for the grpc-web+proto protocol More modern implementations (such as connectrpc) of gRPC-Web clients no longer rely on XHR, and thus don't implement the grpc-web-text protocol. Instead they rely on `grpc-web+proto` or `grpc-web+json`. This patch adds support for `grpc-web+proto`. --- .../main/java/io/grpc/internal/GrpcUtil.java | 11 +++- .../main/java/io/grpc/internal/Protocol.java | 3 +- .../main/java/io/grpc/netty/NettyServer.java | 8 +-- .../io/grpc/netty/NettyServerBuilder.java | 8 +-- .../io/grpc/netty/NettyServerHandler.java | 14 ++-- .../java/io/grpc/netty/NettyServerStream.java | 64 +++++++++++++++++++ .../io/grpc/netty/NettyServerTransport.java | 8 +-- 7 files changed, 94 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index dd360372f70c..f5807e2c09c9 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -173,6 +173,7 @@ public byte[] parseAsciiString(byte[] serialized) { public static final String CONTENT_TYPE_GRPC = "application/grpc"; public static final String CONTENT_TYPE_GRPC_WEB = "application/grpc-web"; + public static final String CONTENT_TYPE_GRPC_WEB_PROTO = "application/grpc-web+proto"; public static final String CONTENT_TYPE_GRPC_WEB_TEXT = "application/grpc-web-text"; public static final String CONTENT_TYPE_GRPC_WEB_TEXT_PROTO = "application/grpc-web-text+proto"; @@ -461,11 +462,17 @@ public static boolean isGrpcContentType(String contentType) { return nextChar == '+' || nextChar == ';'; } - public static boolean isGrpcWebContentType(String contentType) { + public static boolean isGrpcWebProtoContentType(String contentType) { + if (contentType == null) { + return false; + } + return CONTENT_TYPE_GRPC_WEB_PROTO.equals(contentType.toLowerCase()); + } + + public static boolean isGrpcWebTextContentType(String contentType) { if (contentType == null) { return false; } - // TODO: Also support grpc-web, grpc-web+proto, and grpc-web-text+proto. return CONTENT_TYPE_GRPC_WEB_TEXT.equals(contentType.toLowerCase()); } diff --git a/core/src/main/java/io/grpc/internal/Protocol.java b/core/src/main/java/io/grpc/internal/Protocol.java index 127ea37d9801..6718adff23bf 100644 --- a/core/src/main/java/io/grpc/internal/Protocol.java +++ b/core/src/main/java/io/grpc/internal/Protocol.java @@ -18,5 +18,6 @@ public enum Protocol { GRPC, - GRPC_WEB_TEXT; + GRPC_WEB_TEXT, + GRPC_WEB_PROTO; } diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 3259512dedb7..083c15f88657 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -101,7 +101,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private final long permitKeepAliveTimeInNanos; private final Attributes eagAttributes; private final HttpStreamListener httpStreamListener; - private final boolean permitGrpcWebText; + private final boolean permitGrpcWeb; private final ReferenceCounted sharedResourceReferenceCounter = new SharedResourceReferenceCounter(); private final List streamTracerFactories; @@ -129,7 +129,7 @@ class NettyServer implements InternalServer, InternalWithLogId { long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, - Attributes eagAttributes, HttpStreamListener httpStreamListener, boolean permitGrpcWebText, InternalChannelz channelz) { + Attributes eagAttributes, HttpStreamListener httpStreamListener, boolean permitGrpcWeb, InternalChannelz channelz) { this.addresses = checkNotNull(addresses, "addresses"); this.channelFactory = checkNotNull(channelFactory, "channelFactory"); checkNotNull(channelOptions, "channelOptions"); @@ -160,7 +160,7 @@ class NettyServer implements InternalServer, InternalWithLogId { this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes"); this.httpStreamListener = httpStreamListener; - this.permitGrpcWebText = permitGrpcWebText; + this.permitGrpcWeb = permitGrpcWeb; this.channelz = Preconditions.checkNotNull(channelz); this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" : String.valueOf(addresses)); @@ -263,7 +263,7 @@ public void initChannel(Channel ch) { permitKeepAliveTimeInNanos, eagAttributes, httpStreamListener, - permitGrpcWebText); + permitGrpcWeb); ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 630b5b3665ba..6baed5bd8478 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -118,7 +118,7 @@ public final class NettyServerBuilder extends ForwardingServerBuilder 0, "maxHeaderListSize must be positive: %s", maxHeaderListSize); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class); @@ -204,7 +204,7 @@ static NettyServerHandler newHandler( eagAttributes, Ticker.systemTicker(), httpStreamListener, - permitGrpcWebText); + permitGrpcWeb); } static NettyServerHandler newHandler( @@ -229,7 +229,7 @@ static NettyServerHandler newHandler( Attributes eagAttributes, Ticker ticker, HttpStreamListener httpStreamListener, - boolean permitGrpcWebText) { + boolean permitGrpcWeb) { Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", flowControlWindow); @@ -281,7 +281,7 @@ static NettyServerHandler newHandler( autoFlowControl, eagAttributes, ticker, httpStreamListener, - permitGrpcWebText); + permitGrpcWeb); } private NettyServerHandler( @@ -304,7 +304,7 @@ private NettyServerHandler( Attributes eagAttributes, Ticker ticker, HttpStreamListener httpStreamListener, - boolean permitGrpcWebText) { + boolean permitGrpcWeb) { super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(), autoFlowControl, null, ticker); @@ -349,7 +349,7 @@ public void onStreamClosed(Http2Stream stream) { this.httpStreamListener = httpStreamListener; this.permittedProtocols = Sets.immutableEnumSet( - permitGrpcWebText + permitGrpcWeb ? EnumSet.of(Protocol.GRPC, Protocol.GRPC_WEB_TEXT) : EnumSet.of(Protocol.GRPC)); @@ -541,7 +541,7 @@ private Protocol detectGrpcProtocol(CharSequence contentType) { if (GrpcUtil.isGrpcContentType(contentTypeString)) { return Protocol.GRPC; } - if (GrpcUtil.isGrpcWebContentType(contentTypeString)) { + if (GrpcUtil.isGrpcWebTextContentType(contentTypeString)) { return Protocol.GRPC_WEB_TEXT; } return null; diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 0a25638ee17f..9fbce992d8db 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -164,6 +164,70 @@ public void cancel(Status status) { } } + private class GrpcWebProtoSink extends Sink implements AbstractServerStream.Sink { + @Override + public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { + // The grpc-web protocol is special in that it cannot use existing HTTP/2 facilities to post + // trailers. Instead, we send them as headers here and *also* as trailer frames. + if (!headersSent) { + // Note that writeHeaders automatically takes grpc-web into account by rewriting the given + // headers. + writeHeaders(trailers); + } + PerfMark.startTask("NettyServerStream$Sink.writeTrailers"); + try { + // TODO: convertTrailers does not take the protocol into account. What to do? + Http2Headers http2Trailers = Utils.convertTrailers(trailers, true); + + // TODO: What's the right size to use here? + ByteBuf bytebuf = channel.alloc().buffer(1024).touch(); + // The trailers are sent as a base64-encoded packet with the following structure: + // byte ID(0x80) + // int length (total length of the HTTP headers not including the ID and length fields) + // HTTP headers as a list of :\r\n + bytebuf.writeByte(0x80); // TRAILER FRAME + // We don't know the size yet so we write 0 and patch it afterwards. + int address = bytebuf.writerIndex(); + bytebuf.writeInt(0); + + // We use ISO_8859_1 for the Charset to match Netty. See HpackEncoder.encodeStringLiteral. + for (Entry entry : http2Trailers) { + bytebuf.writeCharSequence( + String.format("%s:%s\r\n", entry.getKey(), entry.getValue()), + StandardCharsets.ISO_8859_1); + } + + // Patch the length. + int len = bytebuf.readableBytes() - 5; + bytebuf.setByte(address + 0, (len >> 24) & 0xff); + bytebuf.setByte(address + 1, (len >> 16) & 0xff); + bytebuf.setByte(address + 2, (len >> 8) & 0xff); + bytebuf.setByte(address + 3, len & 0xff); + + int numBytes = bytebuf.readableBytes(); + writeQueue + .enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, true), true) + .addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // TODO: I think we need to update flow control here. IS THAT CORRECT? + // Remove the bytes from outbound flow control, optionally notifying + // the client that they can send more bytes. + transportState().onSentBytes(numBytes); + // TODO: I think this isn't needed, but not sure. Check with upstream! + // if (future.isSuccess()) { + // transportTracer.reportMessageSent(numMessages); + // } + } + }); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.writeTrailers"); + } + } + } + + // TODO: remove when we're no longer using the grpc-web-text protocol anywhere private class GrpcWebTextSink extends Sink implements AbstractServerStream.Sink { @Override protected void writeFrameInternal(ByteBuf unencoded, boolean flush, final int numMessages) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 5168f4a47c76..2c5169667508 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -79,7 +79,7 @@ class NettyServerTransport implements ServerTransport { private final long permitKeepAliveTimeInNanos; private final Attributes eagAttributes; private final HttpStreamListener httpStreamListener; - private final boolean permitGrpcWebText; + private final boolean permitGrpcWeb; private final List streamTracerFactories; private final TransportTracer transportTracer; @@ -103,7 +103,7 @@ class NettyServerTransport implements ServerTransport { long permitKeepAliveTimeInNanos, Attributes eagAttributes, HttpStreamListener httpStreamListener, - boolean permitGrpcWebText) { + boolean permitGrpcWeb) { this.channel = Preconditions.checkNotNull(channel, "channel"); this.channelUnused = channelUnused; this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); @@ -124,7 +124,7 @@ class NettyServerTransport implements ServerTransport { this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); this.httpStreamListener = httpStreamListener; - this.permitGrpcWebText = permitGrpcWebText; + this.permitGrpcWeb = permitGrpcWeb; SocketAddress remote = channel.remoteAddress(); this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null); } @@ -285,6 +285,6 @@ private NettyServerHandler createHandler( permitKeepAliveTimeInNanos, eagAttributes, httpStreamListener, - permitGrpcWebText); + permitGrpcWeb); } }