diff --git a/example/grpc-web/lib/app.dart b/example/grpc-web/lib/app.dart index fce83c51..58aec444 100644 --- a/example/grpc-web/lib/app.dart +++ b/example/grpc-web/lib/app.dart @@ -14,7 +14,7 @@ // limitations under the License. import 'dart:async'; -import 'dart:html'; +import 'package:web/web.dart'; import 'src/generated/echo.pbgrpc.dart'; @@ -55,14 +55,19 @@ class EchoApp { _addMessage(message, 'label-default pull-right'); } - void _addMessage(String message, String cssClass) { - final classes = cssClass.split(' '); - querySelector('#first')!.after(DivElement() - ..classes.add('row') - ..append(Element.tag('h2') - ..append(SpanElement() - ..classes.add('label') - ..classes.addAll(classes) - ..text = message))); + void _addMessage(String message, String cssClassArray) { + final cssClasses = cssClassArray.split(' '); + final spanLabel = HTMLSpanElement()..classList.add('label'); + for (final cssClass in cssClasses) { + spanLabel.classList.add(cssClass); + } + spanLabel.text = message; + document.querySelector('#first')!.after( + HTMLDivElement() + ..classList.add('row') + ..append( + document.createElement('h2')..append(spanLabel), + ), + ); } } diff --git a/example/grpc-web/pubspec.yaml b/example/grpc-web/pubspec.yaml index e215c131..928f1945 100644 --- a/example/grpc-web/pubspec.yaml +++ b/example/grpc-web/pubspec.yaml @@ -9,6 +9,7 @@ dependencies: grpc: path: ../../ protobuf: ^3.0.0 + web: ^0.5.1 dev_dependencies: build_runner: ^2.0.0 diff --git a/example/grpc-web/web/main.dart b/example/grpc-web/web/main.dart index 24da3b6c..daec1662 100644 --- a/example/grpc-web/web/main.dart +++ b/example/grpc-web/web/main.dart @@ -12,7 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -import 'dart:html'; +import 'package:web/web.dart'; import 'package:grpc/grpc_web.dart'; import 'package:grpc_web/app.dart'; @@ -23,10 +23,10 @@ void main() { final service = EchoServiceClient(channel); final app = EchoApp(service); - final button = querySelector('#send') as ButtonElement; + final button = document.querySelector('#send') as HTMLButtonElement; button.onClick.listen((e) async { - final msg = querySelector('#msg') as TextInputElement; - final value = msg.value!.trim(); + final msg = document.querySelector('#msg') as HTMLInputElement; + final value = msg.value.trim(); msg.value = ''; if (value.isEmpty) return; diff --git a/lib/grpc_or_grpcweb.dart b/lib/grpc_or_grpcweb.dart index b23bed5d..049ceabf 100644 --- a/lib/grpc_or_grpcweb.dart +++ b/lib/grpc_or_grpcweb.dart @@ -14,7 +14,7 @@ // limitations under the License. import 'src/client/grpc_or_grpcweb_channel_grpc.dart' - if (dart.library.html) 'src/client/grpc_or_grpcweb_channel_web.dart'; + if (dart.library.js_interop) 'src/client/grpc_or_grpcweb_channel_web.dart'; import 'src/client/http2_channel.dart'; import 'src/client/options.dart'; diff --git a/lib/src/client/transport/xhr_transport.dart b/lib/src/client/transport/xhr_transport.dart index 16b0dca5..e0fe89e0 100644 --- a/lib/src/client/transport/xhr_transport.dart +++ b/lib/src/client/transport/xhr_transport.dart @@ -14,10 +14,10 @@ // limitations under the License. import 'dart:async'; -import 'dart:html'; import 'dart:typed_data'; -import 'package:meta/meta.dart'; +import 'package:http/browser_client.dart'; +import 'package:http/http.dart'; import '../../client/call.dart'; import '../../shared/message.dart'; @@ -30,11 +30,12 @@ import 'web_streams.dart'; const _contentTypeKey = 'Content-Type'; class XhrTransportStream implements GrpcTransportStream { - final HttpRequest _request; + final BrowserClient _client; + final Map headers; + final Uri uri; final ErrorHandler _onError; + final Function(XhrTransportStream stream) _onDone; - bool _headersReceived = false; - int _requestBytesRead = 0; final StreamController _incomingProcessor = StreamController(); final StreamController _incomingMessages = StreamController(); final StreamController> _outgoingMessages = StreamController(); @@ -45,64 +46,72 @@ class XhrTransportStream implements GrpcTransportStream { @override StreamSink> get outgoingMessages => _outgoingMessages.sink; - XhrTransportStream(this._request, - {required ErrorHandler onError, required onDone}) - : _onError = onError, + XhrTransportStream( + this._client, + this.headers, + this.uri, { + required ErrorHandler onError, + required onDone, + }) : _onError = onError, _onDone = onDone { - _outgoingMessages.stream - .map(frame) - .listen((data) => _request.send(data), cancelOnError: true); - - _request.onReadyStateChange.listen((data) { - if (_incomingProcessor.isClosed) { - return; - } - switch (_request.readyState) { - case HttpRequest.HEADERS_RECEIVED: - _onHeadersReceived(); - break; - case HttpRequest.DONE: - _onRequestDone(); - _close(); - break; - } - }); - - _request.onError.listen((ProgressEvent event) { - if (_incomingProcessor.isClosed) { - return; - } - _onError(GrpcError.unavailable('XhrConnection connection-error'), - StackTrace.current); - terminate(); - }); - - _request.onProgress.listen((_) { - if (_incomingProcessor.isClosed) { - return; - } - // Use response over responseText as most browsers don't support - // using responseText during an onProgress event. - final responseString = _request.response as String; - final bytes = Uint8List.fromList( - responseString.substring(_requestBytesRead).codeUnits) - .buffer; - _requestBytesRead = responseString.length; - _incomingProcessor.add(bytes); - }); - - _incomingProcessor.stream - .transform(GrpcWebDecoder()) - .transform(grpcDecompressor()) - .listen(_incomingMessages.add, - onError: _onError, onDone: _incomingMessages.close); + _outgoingMessages.stream.map(frame).listen( + (data) { + final request = Request('POST', uri); + request.headers.addAll(headers); + request.bodyBytes = data; + _client.send(request).then( + (streamedResponse) async { + final response = await Response.fromStream(streamedResponse); + _incomingMessages.add(GrpcMetadata(response.headers)); + final valid = _validateResponseState( + response.statusCode, + response.headers, + rawResponse: response.body, + ); + if (valid) { + if (!_incomingProcessor.isClosed) { + _incomingProcessor.add(response.bodyBytes.buffer); + _incomingProcessor.stream + .transform(GrpcWebDecoder()) + .transform(grpcDecompressor()) + .listen( + (data) { + _incomingMessages.add(data); + }, + onError: _onError, + onDone: _incomingMessages.close, + ); + } + } + _close(); + }, + onError: (_) { + if (_incomingProcessor.isClosed) { + return; + } + _onError( + GrpcError.unavailable('XhrConnection connection-error'), + StackTrace.current, + ); + terminate(); + }, + ); + }, + cancelOnError: true, + ); } - bool _validateResponseState() { + bool _validateResponseState( + int? httpStatus, + Map headers, { + Object? rawResponse, + }) { try { validateHttpStatusAndContentType( - _request.status, _request.responseHeaders, - rawResponse: _request.responseText); + httpStatus, + headers, + rawResponse: rawResponse, + ); return true; } catch (e, st) { _onError(e, st); @@ -110,27 +119,6 @@ class XhrTransportStream implements GrpcTransportStream { } } - void _onHeadersReceived() { - _headersReceived = true; - if (!_validateResponseState()) { - return; - } - _incomingMessages.add(GrpcMetadata(_request.responseHeaders)); - } - - void _onRequestDone() { - if (!_headersReceived && !_validateResponseState()) { - return; - } - if (_request.response == null) { - _onError( - GrpcError.unavailable('XhrConnection request null response', null, - _request.responseText), - StackTrace.current); - return; - } - } - void _close() { _incomingProcessor.close(); _outgoingMessages.close(); @@ -140,7 +128,7 @@ class XhrTransportStream implements GrpcTransportStream { @override Future terminate() async { _close(); - _request.abort(); + _client.close(); } } @@ -153,25 +141,18 @@ class XhrClientConnection implements ClientConnection { @override String get authority => uri.authority; + @override String get scheme => uri.scheme; - void _initializeRequest(HttpRequest request, Map metadata) { - for (final header in metadata.keys) { - request.setRequestHeader(header, metadata[header]!); - } - // Overriding the mimetype allows us to stream and parse the data - request.overrideMimeType('text/plain; charset=x-user-defined'); - request.responseType = 'text'; - } - - @visibleForTesting - HttpRequest createHttpRequest() => HttpRequest(); - @override - GrpcTransportStream makeRequest(String path, Duration? timeout, - Map metadata, ErrorHandler onError, - {CallOptions? callOptions}) { + GrpcTransportStream makeRequest( + String path, + Duration? timeout, + Map metadata, + ErrorHandler onError, { + CallOptions? callOptions, + }) { // gRPC-web headers. if (_getContentTypeHeader(metadata) == null) { metadata['Content-Type'] = 'application/grpc-web+proto'; @@ -185,16 +166,18 @@ class XhrClientConnection implements ClientConnection { requestUri = cors.moveHttpHeadersToQueryParam(metadata, requestUri); } - final request = createHttpRequest(); - request.open('POST', requestUri.toString()); + final client = BrowserClient(); if (callOptions is WebCallOptions && callOptions.withCredentials == true) { - request.withCredentials = true; + client.withCredentials = true; } // Must set headers after calling open(). - _initializeRequest(request, metadata); - - final transportStream = - XhrTransportStream(request, onError: onError, onDone: _removeStream); + final transportStream = XhrTransportStream( + client, + metadata, + requestUri, + onError: onError, + onDone: _removeStream, + ); _requests.add(transportStream); return transportStream; } @@ -205,7 +188,7 @@ class XhrClientConnection implements ClientConnection { @override Future terminate() async { - for (var request in List.of(_requests)) { + for (final request in List.of(_requests)) { request.terminate(); } } @@ -225,7 +208,7 @@ class XhrClientConnection implements ClientConnection { } MapEntry? _getContentTypeHeader(Map metadata) { - for (var entry in metadata.entries) { + for (final entry in metadata.entries) { if (entry.key.toLowerCase() == _contentTypeKey.toLowerCase()) { return entry; } diff --git a/lib/src/shared/io_bits/io_bits_web.dart b/lib/src/shared/io_bits/io_bits_web.dart index e8dba5a9..210c2ca0 100644 --- a/lib/src/shared/io_bits/io_bits_web.dart +++ b/lib/src/shared/io_bits/io_bits_web.dart @@ -13,8 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -export 'dart:html' show HttpStatus; - /// Unavailable on the web class InternetAddress {} diff --git a/lib/src/shared/status.dart b/lib/src/shared/status.dart index e0474ac1..c9eed65a 100644 --- a/lib/src/shared/status.dart +++ b/lib/src/shared/status.dart @@ -23,8 +23,6 @@ import 'package:grpc/src/generated/google/rpc/status.pb.dart'; import 'package:meta/meta.dart'; import 'package:protobuf/protobuf.dart'; -import 'io_bits/io_bits.dart' show HttpStatus; - class StatusCode { /// The operation completed successfully. static const ok = 0; @@ -131,19 +129,19 @@ class StatusCode { /// Mapping taken from gRPC-Web JS implementation: /// https://github.com/grpc/grpc-web/blob/master/javascript/net/grpc/web/statuscode.js static const _httpStatusToGrpcStatus = { - HttpStatus.ok: StatusCode.ok, - HttpStatus.badRequest: StatusCode.invalidArgument, - HttpStatus.unauthorized: StatusCode.unauthenticated, - HttpStatus.forbidden: StatusCode.permissionDenied, - HttpStatus.notFound: StatusCode.notFound, - HttpStatus.conflict: StatusCode.aborted, - HttpStatus.preconditionFailed: StatusCode.failedPrecondition, - HttpStatus.tooManyRequests: StatusCode.resourceExhausted, - HttpStatus.clientClosedRequest: StatusCode.cancelled, - HttpStatus.internalServerError: StatusCode.unknown, - HttpStatus.notImplemented: StatusCode.unimplemented, - HttpStatus.serviceUnavailable: StatusCode.unavailable, - HttpStatus.gatewayTimeout: StatusCode.deadlineExceeded, + 200: StatusCode.ok, + 400: StatusCode.invalidArgument, + 401: StatusCode.unauthenticated, + 403: StatusCode.permissionDenied, + 404: StatusCode.notFound, + 409: StatusCode.aborted, + 412: StatusCode.failedPrecondition, + 429: StatusCode.resourceExhausted, + 499: StatusCode.cancelled, + 500: StatusCode.unknown, + 501: StatusCode.unimplemented, + 503: StatusCode.unavailable, + 504: StatusCode.deadlineExceeded, }; /// Creates a gRPC Status code from a HTTP Status code diff --git a/pubspec.yaml b/pubspec.yaml index 2d4c55fd..e253d3ec 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -17,6 +17,7 @@ dependencies: http2: ^2.2.0 protobuf: '>=2.0.0 <4.0.0' clock: ^1.1.1 + web: ^0.5.1 dev_dependencies: build_runner: ^2.0.0 diff --git a/test/client_tests/client_xhr_transport_test.dart b/test/client_tests/client_xhr_transport_test.dart index 4b3f8587..0e353775 100644 --- a/test/client_tests/client_xhr_transport_test.dart +++ b/test/client_tests/client_xhr_transport_test.dart @@ -15,23 +15,22 @@ @TestOn('browser') import 'dart:async'; -import 'dart:html'; import 'package:async/async.dart'; +import 'package:grpc/grpc_connection_interface.dart'; import 'package:grpc/src/client/call.dart'; import 'package:grpc/src/client/transport/xhr_transport.dart'; import 'package:grpc/src/shared/message.dart'; -import 'package:grpc/src/shared/status.dart'; import 'package:mockito/mockito.dart'; import 'package:stream_transform/stream_transform.dart'; import 'package:test/test.dart'; +import 'package:web/web.dart'; -final readyStateChangeEvent = - Event('readystatechange', canBubble: false, cancelable: false); +final readyStateChangeEvent = Event('readystatechange'); final progressEvent = ProgressEvent('onloadstart'); -class MockHttpRequest extends Mock implements HttpRequest { - MockHttpRequest({int? code}) : status = code ?? 200; +class MockGrpcTransportStream extends Mock implements XhrTransportStream { + MockGrpcTransportStream({int? code}) : status = code ?? 200; // ignore: close_sinks StreamController readyStateChangeController = StreamController(); @@ -39,26 +38,21 @@ class MockHttpRequest extends Mock implements HttpRequest { StreamController progressController = StreamController(); - @override Stream get onReadyStateChange => readyStateChangeController.stream; - @override Stream get onProgress => progressController.stream; - @override Stream get onError => StreamController().stream; - @override final int status; - @override int get readyState => super.noSuchMethod(Invocation.getter(#readyState), returnValue: -1); - @override - Map get responseHeaders => - super.noSuchMethod(Invocation.getter(#responseHeaders), - returnValue: {}); + Map get responseHeaders => super.noSuchMethod( + Invocation.getter(#responseHeaders), + returnValue: {}, + ); } class MockXhrClientConnection extends XhrClientConnection { @@ -66,12 +60,18 @@ class MockXhrClientConnection extends XhrClientConnection { : _statusCode = code ?? 200, super(Uri.parse('test:8080')); - late MockHttpRequest latestRequest; + late MockGrpcTransportStream latestRequest; final int _statusCode; @override - HttpRequest createHttpRequest() { - final request = MockHttpRequest(code: _statusCode); + GrpcTransportStream makeRequest( + String path, + Duration? timeout, + Map metadata, + ErrorHandler onError, { + CallOptions? callOptions, + }) { + final request = MockGrpcTransportStream(code: _statusCode); latestRequest = request; return request; } @@ -86,17 +86,27 @@ void main() { final connection = MockXhrClientConnection(); - connection.makeRequest('path', Duration(seconds: 10), metadata, - (error, _) => fail(error.toString())); - - verify(connection.latestRequest - .setRequestHeader('Content-Type', 'application/grpc-web+proto')); - verify(connection.latestRequest - .setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1')); - verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1')); - verify(connection.latestRequest - .overrideMimeType('text/plain; charset=x-user-defined')); - verify(connection.latestRequest.responseType = 'text'); + connection.makeRequest( + 'path', + Duration(seconds: 10), + metadata, + (error, _) => fail( + error.toString(), + ), + ); + + expect( + connection.latestRequest.headers['Content-Type'], + 'application/grpc-web+proto', + ); + expect( + connection.latestRequest.headers['X-User-Agent'], + 'grpc-web-dart/0.1', + ); + expect( + connection.latestRequest.headers['X-Grpc-Web'], + '1', + ); }); test( @@ -105,16 +115,22 @@ void main() { final metadata = {'header_1': 'value_1', 'header_2': 'value_2'}; final connection = MockXhrClientConnection(); - connection.makeRequest('path', Duration(seconds: 10), metadata, - (error, _) => fail(error.toString()), - callOptions: WebCallOptions(bypassCorsPreflight: true)); + connection.makeRequest( + 'path', + Duration(seconds: 10), + metadata, + (error, _) => fail(error.toString()), + callOptions: WebCallOptions(bypassCorsPreflight: true), + ); expect(metadata, isEmpty); - verify(connection.latestRequest.open('POST', - 'test:path?%24httpHeaders=header_1%3Avalue_1%0D%0Aheader_2%3Avalue_2%0D%0AContent-Type%3Aapplication%2Fgrpc-web%2Bproto%0D%0AX-User-Agent%3Agrpc-web-dart%2F0.1%0D%0AX-Grpc-Web%3A1%0D%0A')); - verify(connection.latestRequest - .overrideMimeType('text/plain; charset=x-user-defined')); - verify(connection.latestRequest.responseType = 'text'); + print(connection.latestRequest.uri.toString()); + expect( + connection.latestRequest.uri.toString().contains( + 'test:path?%24httpHeaders=header_1%3Avalue_1%0D%0Aheader_2%3Avalue_2%0D%0AContent-Type%3Aapplication%2Fgrpc-web%2Bproto%0D%0AX-User-Agent%3Agrpc-web-dart%2F0.1%0D%0AX-Grpc-Web%3A1%0D%0A', + ), + isTrue, + ); }); test( @@ -168,27 +184,36 @@ void main() { final metadata = {'header_1': 'value_1', 'header_2': 'value_2'}; final connection = MockXhrClientConnection(); - connection.makeRequest('path', Duration(seconds: 10), metadata, - (error, _) => fail(error.toString()), - callOptions: WebCallOptions(withCredentials: true)); - - expect(metadata, { - 'header_1': 'value_1', - 'header_2': 'value_2', - 'Content-Type': 'application/grpc-web+proto', - 'X-User-Agent': 'grpc-web-dart/0.1', - 'X-Grpc-Web': '1' - }); - verify(connection.latestRequest - .setRequestHeader('Content-Type', 'application/grpc-web+proto')); - verify(connection.latestRequest - .setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1')); - verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1')); - verify(connection.latestRequest.open('POST', 'test:path')); - verify(connection.latestRequest.withCredentials = true); - verify(connection.latestRequest - .overrideMimeType('text/plain; charset=x-user-defined')); - verify(connection.latestRequest.responseType = 'text'); + connection.makeRequest( + 'path', + Duration(seconds: 10), + metadata, + (error, _) => fail(error.toString()), + callOptions: WebCallOptions(withCredentials: true), + ); + + expect( + metadata, + { + 'header_1': 'value_1', + 'header_2': 'value_2', + 'Content-Type': 'application/grpc-web+proto', + 'X-User-Agent': 'grpc-web-dart/0.1', + 'X-Grpc-Web': '1' + }, + ); + expect( + connection.latestRequest.headers['Content-Type'], + 'application/grpc-web+proto', + ); + expect( + connection.latestRequest.headers['X-User-Agent'], + 'grpc-web-dart/0.1', + ); + expect( + connection.latestRequest.headers['X-Grpc-Web'], + '1', + ); }); test('Sent data converted to stream properly', () async { @@ -199,16 +224,23 @@ void main() { final connection = MockXhrClientConnection(); - final stream = connection.makeRequest('path', Duration(seconds: 10), - metadata, (error, _) => fail(error.toString())); + final stream = connection.makeRequest( + 'path', + Duration(seconds: 10), + metadata, + (error, _) => fail(error.toString()), + ); final data = List.filled(10, 0); stream.outgoingMessages.add(data); await stream.terminate(); final expectedData = frame(data); - expect(verify(connection.latestRequest.send(captureAny)).captured.single, - expectedData); + + expect( + connection.latestRequest.incomingMessages.single, + expectedData, + ); }); test('Stream handles headers properly', () async { @@ -224,12 +256,18 @@ void main() { (error, _) => fail(error.toString())); when(transport.latestRequest.responseHeaders).thenReturn(responseHeaders); - when(transport.latestRequest.response) - .thenReturn(String.fromCharCodes(frame([]))); + when(transport.latestRequest.incomingMessages).thenReturn( + Stream.value( + GrpcData( + frame([]), + isCompressed: false, + ), + ), + ); // Set expectation for request readyState and generate two readyStateChange // events, so that incomingMessages stream completes. - final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + final readyStates = [2, 4]; when(transport.latestRequest.readyState) .thenAnswer((_) => readyStates.removeAt(0)); transport.latestRequest.readyStateChangeController @@ -266,11 +304,18 @@ void main() { final encodedString = String.fromCharCodes(encodedTrailers); when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); - when(connection.latestRequest.response).thenReturn(encodedString); + when(connection.latestRequest.incomingMessages).thenReturn( + Stream.value( + GrpcData( + frame(encodedString.codeUnits), + isCompressed: false, + ), + ), + ); // Set expectation for request readyState and generate events so that // incomingMessages stream completes. - final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + final readyStates = [2, 4]; when(connection.latestRequest.readyState) .thenAnswer((_) => readyStates.removeAt(0)); connection.latestRequest.readyStateChangeController @@ -302,11 +347,18 @@ void main() { final encodedString = String.fromCharCodes(encoded); when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); - when(connection.latestRequest.response).thenReturn(encodedString); + when(connection.latestRequest.incomingMessages).thenReturn( + Stream.value( + GrpcData( + frame(encodedString.codeUnits), + isCompressed: false, + ), + ), + ); // Set expectation for request readyState and generate events so that // incomingMessages stream completes. - final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + final readyStates = [2, 4]; when(connection.latestRequest.readyState) .thenAnswer((_) => readyStates.removeAt(0)); connection.latestRequest.readyStateChangeController @@ -336,12 +388,18 @@ void main() { requestHeaders, (error, _) => fail(error.toString())); final data = List.filled(10, 224); when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); - when(connection.latestRequest.response) - .thenReturn(String.fromCharCodes(frame(data))); + when(connection.latestRequest.incomingMessages).thenReturn( + Stream.value( + GrpcData( + frame(data), + isCompressed: false, + ), + ), + ); // Set expectation for request readyState and generate events, so that // incomingMessages stream completes. - final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + final readyStates = [2, 4]; when(connection.latestRequest.readyState) .thenAnswer((_) => readyStates.removeAt(0)); connection.latestRequest.readyStateChangeController @@ -368,8 +426,13 @@ void main() { const errorDetails = 'error details'; when(connection.latestRequest.responseHeaders) .thenReturn({'content-type': 'application/grpc+proto'}); - when(connection.latestRequest.readyState).thenReturn(HttpRequest.DONE); - when(connection.latestRequest.responseText).thenReturn(errorDetails); + + when(connection.latestRequest.readyState).thenReturn(4); + when(connection.latestRequest.incomingMessages).thenReturn( + Stream.value( + GrpcData(errorDetails.codeUnits, isCompressed: false), + ), + ); connection.latestRequest.readyStateChangeController .add(readyStateChangeEvent); await errorReceived.future; @@ -396,21 +459,28 @@ void main() { data.map((d) => String.fromCharCodes(frame(d))).toList(); when(connection.latestRequest.responseHeaders).thenReturn(metadata); - when(connection.latestRequest.readyState) - .thenReturn(HttpRequest.HEADERS_RECEIVED); + when(connection.latestRequest.readyState).thenReturn(2); // At first invocation the response should be the the first message, after // that first + last messages. var first = true; - when(connection.latestRequest.response).thenAnswer((_) { - if (first) { - first = false; - return encodedStrings[0]; - } - return encodedStrings[0] + encodedStrings[1]; - }); - - final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(connection.latestRequest.incomingMessages).thenAnswer( + (_) { + late GrpcData data; + if (first) { + first = false; + data = GrpcData(encodedStrings[0].codeUnits, isCompressed: false); + } + data = GrpcData( + (encodedStrings[0] + encodedStrings[1]).codeUnits, + isCompressed: false, + ); + + return Stream.value(data); + }, + ); + + final readyStates = [2, 4]; when(connection.latestRequest.readyState) .thenAnswer((_) => readyStates.removeAt(0));