Skip to content

Commit

Permalink
feat: refactor XhrTransport to support WASM
Browse files Browse the repository at this point in the history
  • Loading branch information
r-durao-pvotal committed Jun 27, 2024
1 parent 4e65d4b commit 2ccd909
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 221 deletions.
25 changes: 15 additions & 10 deletions example/grpc-web/lib/app.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

import 'dart:async';
import 'dart:html';
import 'package:web/web.dart';

import 'src/generated/echo.pbgrpc.dart';

Expand Down Expand Up @@ -55,14 +55,19 @@ class EchoApp {
_addMessage(message, 'label-default pull-right');
}

void _addMessage(String message, String cssClass) {
final classes = cssClass.split(' ');
querySelector('#first')!.after(DivElement()
..classes.add('row')
..append(Element.tag('h2')
..append(SpanElement()
..classes.add('label')
..classes.addAll(classes)
..text = message)));
void _addMessage(String message, String cssClassArray) {
final cssClasses = cssClassArray.split(' ');
final spanLabel = HTMLSpanElement()..classList.add('label');
for (final cssClass in cssClasses) {
spanLabel.classList.add(cssClass);
}
spanLabel.text = message;
document.querySelector('#first')!.after(
HTMLDivElement()
..classList.add('row')
..append(
document.createElement('h2')..append(spanLabel),
),
);
}
}
1 change: 1 addition & 0 deletions example/grpc-web/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies:
grpc:
path: ../../
protobuf: ^3.0.0
web: ^0.5.1

dev_dependencies:
build_runner: ^2.0.0
Expand Down
8 changes: 4 additions & 4 deletions example/grpc-web/web/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:html';
import 'package:web/web.dart';

import 'package:grpc/grpc_web.dart';
import 'package:grpc_web/app.dart';
Expand All @@ -23,10 +23,10 @@ void main() {
final service = EchoServiceClient(channel);
final app = EchoApp(service);

final button = querySelector('#send') as ButtonElement;
final button = document.querySelector('#send') as HTMLButtonElement;
button.onClick.listen((e) async {
final msg = querySelector('#msg') as TextInputElement;
final value = msg.value!.trim();
final msg = document.querySelector('#msg') as HTMLInputElement;
final value = msg.value.trim();
msg.value = '';

if (value.isEmpty) return;
Expand Down
2 changes: 1 addition & 1 deletion lib/grpc_or_grpcweb.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

import 'src/client/grpc_or_grpcweb_channel_grpc.dart'
if (dart.library.html) 'src/client/grpc_or_grpcweb_channel_web.dart';
if (dart.library.js_interop) 'src/client/grpc_or_grpcweb_channel_web.dart';
import 'src/client/http2_channel.dart';
import 'src/client/options.dart';

Expand Down
191 changes: 87 additions & 104 deletions lib/src/client/transport/xhr_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
// limitations under the License.

import 'dart:async';
import 'dart:html';
import 'dart:typed_data';

import 'package:meta/meta.dart';
import 'package:http/browser_client.dart';
import 'package:http/http.dart';

import '../../client/call.dart';
import '../../shared/message.dart';
Expand All @@ -30,11 +30,12 @@ import 'web_streams.dart';
const _contentTypeKey = 'Content-Type';

class XhrTransportStream implements GrpcTransportStream {
final HttpRequest _request;
final BrowserClient _client;
final Map<String, String> headers;
final Uri uri;
final ErrorHandler _onError;

final Function(XhrTransportStream stream) _onDone;
bool _headersReceived = false;
int _requestBytesRead = 0;
final StreamController<ByteBuffer> _incomingProcessor = StreamController();
final StreamController<GrpcMessage> _incomingMessages = StreamController();
final StreamController<List<int>> _outgoingMessages = StreamController();
Expand All @@ -45,92 +46,79 @@ class XhrTransportStream implements GrpcTransportStream {
@override
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;

XhrTransportStream(this._request,
{required ErrorHandler onError, required onDone})
: _onError = onError,
XhrTransportStream(
this._client,
this.headers,
this.uri, {
required ErrorHandler onError,
required onDone,
}) : _onError = onError,
_onDone = onDone {
_outgoingMessages.stream
.map(frame)
.listen((data) => _request.send(data), cancelOnError: true);

_request.onReadyStateChange.listen((data) {
if (_incomingProcessor.isClosed) {
return;
}
switch (_request.readyState) {
case HttpRequest.HEADERS_RECEIVED:
_onHeadersReceived();
break;
case HttpRequest.DONE:
_onRequestDone();
_close();
break;
}
});

_request.onError.listen((ProgressEvent event) {
if (_incomingProcessor.isClosed) {
return;
}
_onError(GrpcError.unavailable('XhrConnection connection-error'),
StackTrace.current);
terminate();
});

_request.onProgress.listen((_) {
if (_incomingProcessor.isClosed) {
return;
}
// Use response over responseText as most browsers don't support
// using responseText during an onProgress event.
final responseString = _request.response as String;
final bytes = Uint8List.fromList(
responseString.substring(_requestBytesRead).codeUnits)
.buffer;
_requestBytesRead = responseString.length;
_incomingProcessor.add(bytes);
});

_incomingProcessor.stream
.transform(GrpcWebDecoder())
.transform(grpcDecompressor())
.listen(_incomingMessages.add,
onError: _onError, onDone: _incomingMessages.close);
_outgoingMessages.stream.map(frame).listen(
(data) {
final request = Request('POST', uri);
request.headers.addAll(headers);
request.bodyBytes = data;
_client.send(request).then(
(streamedResponse) async {
final response = await Response.fromStream(streamedResponse);
_incomingMessages.add(GrpcMetadata(response.headers));
final valid = _validateResponseState(
response.statusCode,
response.headers,
rawResponse: response.body,
);
if (valid) {
if (!_incomingProcessor.isClosed) {
_incomingProcessor.add(response.bodyBytes.buffer);
_incomingProcessor.stream
.transform(GrpcWebDecoder())
.transform(grpcDecompressor())
.listen(
(data) {
_incomingMessages.add(data);
},
onError: _onError,
onDone: _incomingMessages.close,
);
}
}
_close();
},
onError: (_) {
if (_incomingProcessor.isClosed) {
return;
}
_onError(
GrpcError.unavailable('XhrConnection connection-error'),
StackTrace.current,
);
terminate();
},
);
},
cancelOnError: true,
);
}

bool _validateResponseState() {
bool _validateResponseState(
int? httpStatus,
Map<String, String> headers, {
Object? rawResponse,
}) {
try {
validateHttpStatusAndContentType(
_request.status, _request.responseHeaders,
rawResponse: _request.responseText);
httpStatus,
headers,
rawResponse: rawResponse,
);
return true;
} catch (e, st) {
_onError(e, st);
return false;
}
}

void _onHeadersReceived() {
_headersReceived = true;
if (!_validateResponseState()) {
return;
}
_incomingMessages.add(GrpcMetadata(_request.responseHeaders));
}

void _onRequestDone() {
if (!_headersReceived && !_validateResponseState()) {
return;
}
if (_request.response == null) {
_onError(
GrpcError.unavailable('XhrConnection request null response', null,
_request.responseText),
StackTrace.current);
return;
}
}

void _close() {
_incomingProcessor.close();
_outgoingMessages.close();
Expand All @@ -140,7 +128,7 @@ class XhrTransportStream implements GrpcTransportStream {
@override
Future<void> terminate() async {
_close();
_request.abort();
_client.close();
}
}

Expand All @@ -153,25 +141,18 @@ class XhrClientConnection implements ClientConnection {

@override
String get authority => uri.authority;

@override
String get scheme => uri.scheme;

void _initializeRequest(HttpRequest request, Map<String, String> metadata) {
for (final header in metadata.keys) {
request.setRequestHeader(header, metadata[header]!);
}
// Overriding the mimetype allows us to stream and parse the data
request.overrideMimeType('text/plain; charset=x-user-defined');
request.responseType = 'text';
}

@visibleForTesting
HttpRequest createHttpRequest() => HttpRequest();

@override
GrpcTransportStream makeRequest(String path, Duration? timeout,
Map<String, String> metadata, ErrorHandler onError,
{CallOptions? callOptions}) {
GrpcTransportStream makeRequest(
String path,
Duration? timeout,
Map<String, String> metadata,
ErrorHandler onError, {
CallOptions? callOptions,
}) {
// gRPC-web headers.
if (_getContentTypeHeader(metadata) == null) {
metadata['Content-Type'] = 'application/grpc-web+proto';
Expand All @@ -185,16 +166,18 @@ class XhrClientConnection implements ClientConnection {
requestUri = cors.moveHttpHeadersToQueryParam(metadata, requestUri);
}

final request = createHttpRequest();
request.open('POST', requestUri.toString());
final client = BrowserClient();
if (callOptions is WebCallOptions && callOptions.withCredentials == true) {
request.withCredentials = true;
client.withCredentials = true;
}
// Must set headers after calling open().
_initializeRequest(request, metadata);

final transportStream =
XhrTransportStream(request, onError: onError, onDone: _removeStream);
final transportStream = XhrTransportStream(
client,
metadata,
requestUri,
onError: onError,
onDone: _removeStream,
);
_requests.add(transportStream);
return transportStream;
}
Expand All @@ -205,7 +188,7 @@ class XhrClientConnection implements ClientConnection {

@override
Future<void> terminate() async {
for (var request in List.of(_requests)) {
for (final request in List.of(_requests)) {
request.terminate();
}
}
Expand All @@ -225,7 +208,7 @@ class XhrClientConnection implements ClientConnection {
}

MapEntry<String, String>? _getContentTypeHeader(Map<String, String> metadata) {
for (var entry in metadata.entries) {
for (final entry in metadata.entries) {
if (entry.key.toLowerCase() == _contentTypeKey.toLowerCase()) {
return entry;
}
Expand Down
2 changes: 0 additions & 2 deletions lib/src/shared/io_bits/io_bits_web.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

export 'dart:html' show HttpStatus;

/// Unavailable on the web
class InternetAddress {}

Expand Down
28 changes: 13 additions & 15 deletions lib/src/shared/status.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import 'package:grpc/src/generated/google/rpc/status.pb.dart';
import 'package:meta/meta.dart';
import 'package:protobuf/protobuf.dart';

import 'io_bits/io_bits.dart' show HttpStatus;

class StatusCode {
/// The operation completed successfully.
static const ok = 0;
Expand Down Expand Up @@ -131,19 +129,19 @@ class StatusCode {
/// Mapping taken from gRPC-Web JS implementation:
/// https://github.com/grpc/grpc-web/blob/master/javascript/net/grpc/web/statuscode.js
static const _httpStatusToGrpcStatus = <int, int>{
HttpStatus.ok: StatusCode.ok,
HttpStatus.badRequest: StatusCode.invalidArgument,
HttpStatus.unauthorized: StatusCode.unauthenticated,
HttpStatus.forbidden: StatusCode.permissionDenied,
HttpStatus.notFound: StatusCode.notFound,
HttpStatus.conflict: StatusCode.aborted,
HttpStatus.preconditionFailed: StatusCode.failedPrecondition,
HttpStatus.tooManyRequests: StatusCode.resourceExhausted,
HttpStatus.clientClosedRequest: StatusCode.cancelled,
HttpStatus.internalServerError: StatusCode.unknown,
HttpStatus.notImplemented: StatusCode.unimplemented,
HttpStatus.serviceUnavailable: StatusCode.unavailable,
HttpStatus.gatewayTimeout: StatusCode.deadlineExceeded,
200: StatusCode.ok,
400: StatusCode.invalidArgument,
401: StatusCode.unauthenticated,
403: StatusCode.permissionDenied,
404: StatusCode.notFound,
409: StatusCode.aborted,
412: StatusCode.failedPrecondition,
429: StatusCode.resourceExhausted,
499: StatusCode.cancelled,
500: StatusCode.unknown,
501: StatusCode.unimplemented,
503: StatusCode.unavailable,
504: StatusCode.deadlineExceeded,
};

/// Creates a gRPC Status code from a HTTP Status code
Expand Down
Loading

0 comments on commit 2ccd909

Please sign in to comment.