diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index dd360372f70..f5807e2c09c 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -173,6 +173,7 @@ public byte[] parseAsciiString(byte[] serialized) { public static final String CONTENT_TYPE_GRPC = "application/grpc"; public static final String CONTENT_TYPE_GRPC_WEB = "application/grpc-web"; + public static final String CONTENT_TYPE_GRPC_WEB_PROTO = "application/grpc-web+proto"; public static final String CONTENT_TYPE_GRPC_WEB_TEXT = "application/grpc-web-text"; public static final String CONTENT_TYPE_GRPC_WEB_TEXT_PROTO = "application/grpc-web-text+proto"; @@ -461,11 +462,17 @@ public static boolean isGrpcContentType(String contentType) { return nextChar == '+' || nextChar == ';'; } - public static boolean isGrpcWebContentType(String contentType) { + public static boolean isGrpcWebProtoContentType(String contentType) { + if (contentType == null) { + return false; + } + return CONTENT_TYPE_GRPC_WEB_PROTO.equals(contentType.toLowerCase()); + } + + public static boolean isGrpcWebTextContentType(String contentType) { if (contentType == null) { return false; } - // TODO: Also support grpc-web, grpc-web+proto, and grpc-web-text+proto. return CONTENT_TYPE_GRPC_WEB_TEXT.equals(contentType.toLowerCase()); } diff --git a/core/src/main/java/io/grpc/internal/Protocol.java b/core/src/main/java/io/grpc/internal/Protocol.java index 127ea37d980..6718adff23b 100644 --- a/core/src/main/java/io/grpc/internal/Protocol.java +++ b/core/src/main/java/io/grpc/internal/Protocol.java @@ -18,5 +18,6 @@ public enum Protocol { GRPC, - GRPC_WEB_TEXT; + GRPC_WEB_TEXT, + GRPC_WEB_PROTO; } diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 3259512dedb..083c15f8865 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -101,7 +101,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private final long permitKeepAliveTimeInNanos; private final Attributes eagAttributes; private final HttpStreamListener httpStreamListener; - private final boolean permitGrpcWebText; + private final boolean permitGrpcWeb; private final ReferenceCounted sharedResourceReferenceCounter = new SharedResourceReferenceCounter(); private final List streamTracerFactories; @@ -129,7 +129,7 @@ class NettyServer implements InternalServer, InternalWithLogId { long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, - Attributes eagAttributes, HttpStreamListener httpStreamListener, boolean permitGrpcWebText, InternalChannelz channelz) { + Attributes eagAttributes, HttpStreamListener httpStreamListener, boolean permitGrpcWeb, InternalChannelz channelz) { this.addresses = checkNotNull(addresses, "addresses"); this.channelFactory = checkNotNull(channelFactory, "channelFactory"); checkNotNull(channelOptions, "channelOptions"); @@ -160,7 +160,7 @@ class NettyServer implements InternalServer, InternalWithLogId { this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes"); this.httpStreamListener = httpStreamListener; - this.permitGrpcWebText = permitGrpcWebText; + this.permitGrpcWeb = permitGrpcWeb; this.channelz = Preconditions.checkNotNull(channelz); this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" : String.valueOf(addresses)); @@ -263,7 +263,7 @@ public void initChannel(Channel ch) { permitKeepAliveTimeInNanos, eagAttributes, httpStreamListener, - permitGrpcWebText); + permitGrpcWeb); ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 630b5b3665b..6baed5bd847 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -118,7 +118,7 @@ public final class NettyServerBuilder extends ForwardingServerBuilder 0, "maxHeaderListSize must be positive: %s", maxHeaderListSize); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class); @@ -204,7 +204,7 @@ static NettyServerHandler newHandler( eagAttributes, Ticker.systemTicker(), httpStreamListener, - permitGrpcWebText); + permitGrpcWeb); } static NettyServerHandler newHandler( @@ -229,7 +229,7 @@ static NettyServerHandler newHandler( Attributes eagAttributes, Ticker ticker, HttpStreamListener httpStreamListener, - boolean permitGrpcWebText) { + boolean permitGrpcWeb) { Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", flowControlWindow); @@ -281,7 +281,7 @@ static NettyServerHandler newHandler( autoFlowControl, eagAttributes, ticker, httpStreamListener, - permitGrpcWebText); + permitGrpcWeb); } private NettyServerHandler( @@ -304,7 +304,7 @@ private NettyServerHandler( Attributes eagAttributes, Ticker ticker, HttpStreamListener httpStreamListener, - boolean permitGrpcWebText) { + boolean permitGrpcWeb) { super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(), autoFlowControl, null, ticker); @@ -349,8 +349,8 @@ public void onStreamClosed(Http2Stream stream) { this.httpStreamListener = httpStreamListener; this.permittedProtocols = Sets.immutableEnumSet( - permitGrpcWebText - ? EnumSet.of(Protocol.GRPC, Protocol.GRPC_WEB_TEXT) + permitGrpcWeb + ? EnumSet.of(Protocol.GRPC, Protocol.GRPC_WEB_TEXT, Protocol.GRPC_WEB_PROTO) : EnumSet.of(Protocol.GRPC)); streamKey = encoder.connection().newKey(); @@ -541,9 +541,12 @@ private Protocol detectGrpcProtocol(CharSequence contentType) { if (GrpcUtil.isGrpcContentType(contentTypeString)) { return Protocol.GRPC; } - if (GrpcUtil.isGrpcWebContentType(contentTypeString)) { + if (GrpcUtil.isGrpcWebTextContentType(contentTypeString)) { return Protocol.GRPC_WEB_TEXT; } + if (GrpcUtil.isGrpcWebProtoContentType(contentTypeString)) { + return Protocol.GRPC_WEB_PROTO; + } return null; } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 0a25638ee17..af2f3c86952 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -71,7 +71,10 @@ public NettyServerStream( TransportTracer transportTracer, Protocol protocol) { super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx); - this.sink = protocol == Protocol.GRPC_WEB_TEXT ? new GrpcWebTextSink() : new Sink(); + this.sink = + protocol == Protocol.GRPC_WEB_TEXT + ? new GrpcWebTextSink() + : protocol == Protocol.GRPC_WEB_PROTO ? new GrpcWebProtoSink() : new Sink(); this.state = checkNotNull(state, "transportState"); this.channel = checkNotNull(channel, "channel"); this.writeQueue = state.handler.getWriteQueue(); @@ -109,8 +112,7 @@ public void writeHeaders(Metadata headers) { try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeHeaders")) { writeQueue.enqueue( SendResponseHeadersCommand.createHeaders( - transportState(), - Utils.convertServerHeaders(headers, protocol)), + transportState(), Utils.convertServerHeaders(headers, protocol)), true); } } @@ -120,18 +122,20 @@ protected void writeFrameInternal(ByteBuf bytebuf, boolean flush, final int numM final int numBytes = bytebuf.readableBytes(); // Add the bytes to outbound flow control. onSendingBytes(numBytes); - writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, false), flush) - .addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // Remove the bytes from outbound flow control, optionally notifying - // the client that they can send more bytes. - transportState().onSentBytes(numBytes); - if (future.isSuccess()) { - transportTracer.reportMessageSent(numMessages); - } - } - }); + writeQueue + .enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, false), flush) + .addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // Remove the bytes from outbound flow control, optionally notifying + // the client that they can send more bytes. + transportState().onSentBytes(numBytes); + if (future.isSuccess()) { + transportTracer.reportMessageSent(numMessages); + } + } + }); } @Override @@ -164,20 +168,84 @@ public void cancel(Status status) { } } + private class GrpcWebProtoSink extends Sink implements AbstractServerStream.Sink { + @Override + public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { + // The grpc-web protocol is special in that it cannot use existing HTTP/2 facilities to post + // trailers. Instead, we send them as headers here and *also* as trailer frames. + if (!headersSent) { + // Note that writeHeaders automatically takes grpc-web into account by rewriting the given + // headers. + writeHeaders(trailers); + } + PerfMark.startTask("NettyServerStream$Sink.writeTrailers"); + try { + // TODO: convertTrailers does not take the protocol into account. What to do? + Http2Headers http2Trailers = Utils.convertTrailers(trailers, true); + + // TODO: What's the right size to use here? + ByteBuf bytebuf = channel.alloc().buffer(1024).touch(); + // The trailers are sent as a base64-encoded packet with the following structure: + // byte ID(0x80) + // int length (total length of the HTTP headers not including the ID and length fields) + // HTTP headers as a list of :\r\n + bytebuf.writeByte(0x80); // TRAILER FRAME + // We don't know the size yet so we write 0 and patch it afterwards. + int address = bytebuf.writerIndex(); + bytebuf.writeInt(0); + + // We use ISO_8859_1 for the Charset to match Netty. See HpackEncoder.encodeStringLiteral. + for (Entry entry : http2Trailers) { + bytebuf.writeCharSequence( + String.format("%s:%s\r\n", entry.getKey(), entry.getValue()), + StandardCharsets.ISO_8859_1); + } + + // Patch the length. + int len = bytebuf.readableBytes() - 5; + bytebuf.setByte(address + 0, (len >> 24) & 0xff); + bytebuf.setByte(address + 1, (len >> 16) & 0xff); + bytebuf.setByte(address + 2, (len >> 8) & 0xff); + bytebuf.setByte(address + 3, len & 0xff); + + int numBytes = bytebuf.readableBytes(); + writeQueue + .enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, true), true) + .addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // TODO: I think we need to update flow control here. IS THAT CORRECT? + // Remove the bytes from outbound flow control, optionally notifying + // the client that they can send more bytes. + transportState().onSentBytes(numBytes); + // TODO: I think this isn't needed, but not sure. Check with upstream! + // if (future.isSuccess()) { + // transportTracer.reportMessageSent(numMessages); + // } + } + }); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.writeTrailers"); + } + } + } + + // TODO: remove when we're no longer using the grpc-web-text protocol anywhere private class GrpcWebTextSink extends Sink implements AbstractServerStream.Sink { @Override protected void writeFrameInternal(ByteBuf unencoded, boolean flush, final int numMessages) { // The frames already have the correct header (0x00 plus 4-byte length); we only need to // encode them as Base64. ByteBuf encoded = - Base64.encode( - unencoded, - unencoded.readerIndex(), - unencoded.readableBytes(), - false, - Base64Dialect.STANDARD, - channel.alloc()) - .touch(); + Base64.encode( + unencoded, + unencoded.readerIndex(), + unencoded.readableBytes(), + false, + Base64Dialect.STANDARD, + channel.alloc()) + .touch(); unencoded.release(); super.writeFrameInternal(encoded, flush, numMessages); } @@ -285,17 +353,18 @@ public void runOnTransportThread(final Runnable r) { r.run(); } else { final Link link = PerfMark.linkOut(); - eventLoop.execute(new Runnable() { - @Override - public void run() { - try (TaskCloseable ignore = - PerfMark.traceTask("NettyServerStream$TransportState.runOnTransportThread")) { - PerfMark.attachTag(tag); - PerfMark.linkIn(link); - r.run(); - } - } - }); + eventLoop.execute( + new Runnable() { + @Override + public void run() { + try (TaskCloseable ignore = + PerfMark.traceTask("NettyServerStream$TransportState.runOnTransportThread")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); + r.run(); + } + } + }); } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 5168f4a47c7..2c516966750 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -79,7 +79,7 @@ class NettyServerTransport implements ServerTransport { private final long permitKeepAliveTimeInNanos; private final Attributes eagAttributes; private final HttpStreamListener httpStreamListener; - private final boolean permitGrpcWebText; + private final boolean permitGrpcWeb; private final List streamTracerFactories; private final TransportTracer transportTracer; @@ -103,7 +103,7 @@ class NettyServerTransport implements ServerTransport { long permitKeepAliveTimeInNanos, Attributes eagAttributes, HttpStreamListener httpStreamListener, - boolean permitGrpcWebText) { + boolean permitGrpcWeb) { this.channel = Preconditions.checkNotNull(channel, "channel"); this.channelUnused = channelUnused; this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); @@ -124,7 +124,7 @@ class NettyServerTransport implements ServerTransport { this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); this.httpStreamListener = httpStreamListener; - this.permitGrpcWebText = permitGrpcWebText; + this.permitGrpcWeb = permitGrpcWeb; SocketAddress remote = channel.remoteAddress(); this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null); } @@ -285,6 +285,6 @@ private NettyServerHandler createHandler( permitKeepAliveTimeInNanos, eagAttributes, httpStreamListener, - permitGrpcWebText); + permitGrpcWeb); } }