Skip to content

Commit

Permalink
grpc-web: adds support for the grpc-web+proto protocol
Browse files Browse the repository at this point in the history
More modern implementations (such as connectrpc) of gRPC-Web clients no
longer rely on XHR, and thus don't implement the grpc-web-text protocol.
Instead they rely on `grpc-web+proto` or `grpc-web+json`. This patch
adds support for `grpc-web+proto`.
  • Loading branch information
iamricard committed May 22, 2024
1 parent 489c0f4 commit 3e4c056
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 57 deletions.
11 changes: 9 additions & 2 deletions core/src/main/java/io/grpc/internal/GrpcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public byte[] parseAsciiString(byte[] serialized) {
public static final String CONTENT_TYPE_GRPC = "application/grpc";

public static final String CONTENT_TYPE_GRPC_WEB = "application/grpc-web";
public static final String CONTENT_TYPE_GRPC_WEB_PROTO = "application/grpc-web+proto";
public static final String CONTENT_TYPE_GRPC_WEB_TEXT = "application/grpc-web-text";
public static final String CONTENT_TYPE_GRPC_WEB_TEXT_PROTO = "application/grpc-web-text+proto";

Expand Down Expand Up @@ -461,11 +462,17 @@ public static boolean isGrpcContentType(String contentType) {
return nextChar == '+' || nextChar == ';';
}

public static boolean isGrpcWebContentType(String contentType) {
public static boolean isGrpcWebProtoContentType(String contentType) {
if (contentType == null) {
return false;
}
return CONTENT_TYPE_GRPC_WEB_PROTO.equals(contentType.toLowerCase());
}

public static boolean isGrpcWebTextContentType(String contentType) {
if (contentType == null) {
return false;
}
// TODO: Also support grpc-web, grpc-web+proto, and grpc-web-text+proto.
return CONTENT_TYPE_GRPC_WEB_TEXT.equals(contentType.toLowerCase());
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/grpc/internal/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@

public enum Protocol {
GRPC,
GRPC_WEB_TEXT;
GRPC_WEB_TEXT,
GRPC_WEB_PROTO;
}
8 changes: 4 additions & 4 deletions netty/src/main/java/io/grpc/netty/NettyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final long permitKeepAliveTimeInNanos;
private final Attributes eagAttributes;
private final HttpStreamListener httpStreamListener;
private final boolean permitGrpcWebText;
private final boolean permitGrpcWeb;
private final ReferenceCounted sharedResourceReferenceCounter =
new SharedResourceReferenceCounter();
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
Expand Down Expand Up @@ -129,7 +129,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
Attributes eagAttributes, HttpStreamListener httpStreamListener, boolean permitGrpcWebText, InternalChannelz channelz) {
Attributes eagAttributes, HttpStreamListener httpStreamListener, boolean permitGrpcWeb, InternalChannelz channelz) {
this.addresses = checkNotNull(addresses, "addresses");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
checkNotNull(channelOptions, "channelOptions");
Expand Down Expand Up @@ -160,7 +160,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
this.httpStreamListener = httpStreamListener;
this.permitGrpcWebText = permitGrpcWebText;
this.permitGrpcWeb = permitGrpcWeb;
this.channelz = Preconditions.checkNotNull(channelz);
this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" :
String.valueOf(addresses));
Expand Down Expand Up @@ -263,7 +263,7 @@ public void initChannel(Channel ch) {
permitKeepAliveTimeInNanos,
eagAttributes,
httpStreamListener,
permitGrpcWebText);
permitGrpcWeb);
ServerTransportListener transportListener;
// This is to order callbacks on the listener, not to guard access to channel.
synchronized (NettyServer.this) {
Expand Down
8 changes: 4 additions & 4 deletions netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public final class NettyServerBuilder extends ForwardingServerBuilder<NettyServe
private long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5);
private Attributes eagAttributes = Attributes.EMPTY;
private HttpStreamListener httpStreamListener;
private boolean permitGrpcWebText;
private boolean permitGrpcWeb;

/**
* Creates a server builder that will bind to the given port.
Expand Down Expand Up @@ -669,7 +669,7 @@ NettyServer buildTransportServers(
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos,
maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos,
eagAttributes, httpStreamListener, permitGrpcWebText, this.serverImplBuilder.getChannelz());
eagAttributes, httpStreamListener, permitGrpcWeb, this.serverImplBuilder.getChannelz());
}

@VisibleForTesting
Expand Down Expand Up @@ -729,8 +729,8 @@ public NettyServerBuilder setHttpStreamListener(HttpStreamListener httpStreamLis
return this;
}

public NettyServerBuilder permitGrpcWebText(boolean permit) {
permitGrpcWebText = permit;
public NettyServerBuilder permitGrpcWeb(boolean permit) {
permitGrpcWeb = permit;
return this;
}
}
19 changes: 11 additions & 8 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ static NettyServerHandler newHandler(
long permitKeepAliveTimeInNanos,
Attributes eagAttributes,
HttpStreamListener httpStreamListener,
boolean permitGrpcWebText) {
boolean permitGrpcWeb) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
maxHeaderListSize);
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
Expand Down Expand Up @@ -204,7 +204,7 @@ static NettyServerHandler newHandler(
eagAttributes,
Ticker.systemTicker(),
httpStreamListener,
permitGrpcWebText);
permitGrpcWeb);
}

static NettyServerHandler newHandler(
Expand All @@ -229,7 +229,7 @@ static NettyServerHandler newHandler(
Attributes eagAttributes,
Ticker ticker,
HttpStreamListener httpStreamListener,
boolean permitGrpcWebText) {
boolean permitGrpcWeb) {
Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
flowControlWindow);
Expand Down Expand Up @@ -281,7 +281,7 @@ static NettyServerHandler newHandler(
autoFlowControl,
eagAttributes, ticker,
httpStreamListener,
permitGrpcWebText);
permitGrpcWeb);
}

private NettyServerHandler(
Expand All @@ -304,7 +304,7 @@ private NettyServerHandler(
Attributes eagAttributes,
Ticker ticker,
HttpStreamListener httpStreamListener,
boolean permitGrpcWebText) {
boolean permitGrpcWeb) {
super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
autoFlowControl, null, ticker);

Expand Down Expand Up @@ -349,8 +349,8 @@ public void onStreamClosed(Http2Stream stream) {
this.httpStreamListener = httpStreamListener;
this.permittedProtocols =
Sets.immutableEnumSet(
permitGrpcWebText
? EnumSet.of(Protocol.GRPC, Protocol.GRPC_WEB_TEXT)
permitGrpcWeb
? EnumSet.of(Protocol.GRPC, Protocol.GRPC_WEB_TEXT, Protocol.GRPC_WEB_PROTO)
: EnumSet.of(Protocol.GRPC));

streamKey = encoder.connection().newKey();
Expand Down Expand Up @@ -541,9 +541,12 @@ private Protocol detectGrpcProtocol(CharSequence contentType) {
if (GrpcUtil.isGrpcContentType(contentTypeString)) {
return Protocol.GRPC;
}
if (GrpcUtil.isGrpcWebContentType(contentTypeString)) {
if (GrpcUtil.isGrpcWebTextContentType(contentTypeString)) {
return Protocol.GRPC_WEB_TEXT;
}
if (GrpcUtil.isGrpcWebProtoContentType(contentTypeString)) {
return Protocol.GRPC_WEB_PROTO;
}
return null;
}

Expand Down
137 changes: 103 additions & 34 deletions netty/src/main/java/io/grpc/netty/NettyServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ public NettyServerStream(
TransportTracer transportTracer,
Protocol protocol) {
super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx);
this.sink = protocol == Protocol.GRPC_WEB_TEXT ? new GrpcWebTextSink() : new Sink();
this.sink =
protocol == Protocol.GRPC_WEB_TEXT
? new GrpcWebTextSink()
: protocol == Protocol.GRPC_WEB_PROTO ? new GrpcWebProtoSink() : new Sink();
this.state = checkNotNull(state, "transportState");
this.channel = checkNotNull(channel, "channel");
this.writeQueue = state.handler.getWriteQueue();
Expand Down Expand Up @@ -109,8 +112,7 @@ public void writeHeaders(Metadata headers) {
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeHeaders")) {
writeQueue.enqueue(
SendResponseHeadersCommand.createHeaders(
transportState(),
Utils.convertServerHeaders(headers, protocol)),
transportState(), Utils.convertServerHeaders(headers, protocol)),
true);
}
}
Expand All @@ -120,18 +122,20 @@ protected void writeFrameInternal(ByteBuf bytebuf, boolean flush, final int numM
final int numBytes = bytebuf.readableBytes();
// Add the bytes to outbound flow control.
onSendingBytes(numBytes);
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, false), flush)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
transportState().onSentBytes(numBytes);
if (future.isSuccess()) {
transportTracer.reportMessageSent(numMessages);
}
}
});
writeQueue
.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, false), flush)
.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
transportState().onSentBytes(numBytes);
if (future.isSuccess()) {
transportTracer.reportMessageSent(numMessages);
}
}
});
}

@Override
Expand Down Expand Up @@ -164,20 +168,84 @@ public void cancel(Status status) {
}
}

private class GrpcWebProtoSink extends Sink implements AbstractServerStream.Sink {
@Override
public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
// The grpc-web protocol is special in that it cannot use existing HTTP/2 facilities to post
// trailers. Instead, we send them as headers here and *also* as trailer frames.
if (!headersSent) {
// Note that writeHeaders automatically takes grpc-web into account by rewriting the given
// headers.
writeHeaders(trailers);
}
PerfMark.startTask("NettyServerStream$Sink.writeTrailers");
try {
// TODO: convertTrailers does not take the protocol into account. What to do?
Http2Headers http2Trailers = Utils.convertTrailers(trailers, true);

// TODO: What's the right size to use here?
ByteBuf bytebuf = channel.alloc().buffer(1024).touch();
// The trailers are sent as a base64-encoded packet with the following structure:
// byte ID(0x80)
// int length (total length of the HTTP headers not including the ID and length fields)
// HTTP headers as a list of <string>:<value>\r\n
bytebuf.writeByte(0x80); // TRAILER FRAME
// We don't know the size yet so we write 0 and patch it afterwards.
int address = bytebuf.writerIndex();
bytebuf.writeInt(0);

// We use ISO_8859_1 for the Charset to match Netty. See HpackEncoder.encodeStringLiteral.
for (Entry<CharSequence, CharSequence> entry : http2Trailers) {
bytebuf.writeCharSequence(
String.format("%s:%s\r\n", entry.getKey(), entry.getValue()),
StandardCharsets.ISO_8859_1);
}

// Patch the length.
int len = bytebuf.readableBytes() - 5;
bytebuf.setByte(address + 0, (len >> 24) & 0xff);
bytebuf.setByte(address + 1, (len >> 16) & 0xff);
bytebuf.setByte(address + 2, (len >> 8) & 0xff);
bytebuf.setByte(address + 3, len & 0xff);

int numBytes = bytebuf.readableBytes();
writeQueue
.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, true), true)
.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// TODO: I think we need to update flow control here. IS THAT CORRECT?
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
transportState().onSentBytes(numBytes);
// TODO: I think this isn't needed, but not sure. Check with upstream!
// if (future.isSuccess()) {
// transportTracer.reportMessageSent(numMessages);
// }
}
});
} finally {
PerfMark.stopTask("NettyServerStream$Sink.writeTrailers");
}
}
}

// TODO: remove when we're no longer using the grpc-web-text protocol anywhere
private class GrpcWebTextSink extends Sink implements AbstractServerStream.Sink {
@Override
protected void writeFrameInternal(ByteBuf unencoded, boolean flush, final int numMessages) {
// The frames already have the correct header (0x00 plus 4-byte length); we only need to
// encode them as Base64.
ByteBuf encoded =
Base64.encode(
unencoded,
unencoded.readerIndex(),
unencoded.readableBytes(),
false,
Base64Dialect.STANDARD,
channel.alloc())
.touch();
Base64.encode(
unencoded,
unencoded.readerIndex(),
unencoded.readableBytes(),
false,
Base64Dialect.STANDARD,
channel.alloc())
.touch();
unencoded.release();
super.writeFrameInternal(encoded, flush, numMessages);
}
Expand Down Expand Up @@ -285,17 +353,18 @@ public void runOnTransportThread(final Runnable r) {
r.run();
} else {
final Link link = PerfMark.linkOut();
eventLoop.execute(new Runnable() {
@Override
public void run() {
try (TaskCloseable ignore =
PerfMark.traceTask("NettyServerStream$TransportState.runOnTransportThread")) {
PerfMark.attachTag(tag);
PerfMark.linkIn(link);
r.run();
}
}
});
eventLoop.execute(
new Runnable() {
@Override
public void run() {
try (TaskCloseable ignore =
PerfMark.traceTask("NettyServerStream$TransportState.runOnTransportThread")) {
PerfMark.attachTag(tag);
PerfMark.linkIn(link);
r.run();
}
}
});
}
}

Expand Down
8 changes: 4 additions & 4 deletions netty/src/main/java/io/grpc/netty/NettyServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class NettyServerTransport implements ServerTransport {
private final long permitKeepAliveTimeInNanos;
private final Attributes eagAttributes;
private final HttpStreamListener httpStreamListener;
private final boolean permitGrpcWebText;
private final boolean permitGrpcWeb;
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer transportTracer;

Expand All @@ -103,7 +103,7 @@ class NettyServerTransport implements ServerTransport {
long permitKeepAliveTimeInNanos,
Attributes eagAttributes,
HttpStreamListener httpStreamListener,
boolean permitGrpcWebText) {
boolean permitGrpcWeb) {
this.channel = Preconditions.checkNotNull(channel, "channel");
this.channelUnused = channelUnused;
this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
Expand All @@ -124,7 +124,7 @@ class NettyServerTransport implements ServerTransport {
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
this.httpStreamListener = httpStreamListener;
this.permitGrpcWebText = permitGrpcWebText;
this.permitGrpcWeb = permitGrpcWeb;
SocketAddress remote = channel.remoteAddress();
this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null);
}
Expand Down Expand Up @@ -285,6 +285,6 @@ private NettyServerHandler createHandler(
permitKeepAliveTimeInNanos,
eagAttributes,
httpStreamListener,
permitGrpcWebText);
permitGrpcWeb);
}
}

0 comments on commit 3e4c056

Please sign in to comment.