-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathBidirectionalStreamingTests.swift
114 lines (95 loc) · 3.95 KB
/
BidirectionalStreamingTests.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 2019, ComgineGRPC
// Licensed under the Apache License, Version 2.0
import XCTest
import Combine
import GRPC
import NIOHPACK
@testable import CombineGRPC
class BidirectionalStreamingTests: XCTestCase {
static var server: Server?
static var client: BidirectionalStreamingScenariosNIOClient?
static var retainedCancellables: Set<AnyCancellable> = []
override class func setUp() {
super.setUp()
server = try! makeTestServer(services: [BidirectionalStreamingTestsService()])
client = makeTestClient { channel, callOptions in
BidirectionalStreamingScenariosNIOClient(channel: channel, defaultCallOptions: callOptions)
}
}
override class func tearDown() {
try! client?.channel.close().wait()
try! server?.close().wait()
retainedCancellables.removeAll()
super.tearDown()
}
func testOk() {
let promise = expectation(description: "Call completes successfully")
let client = Self.client!
let requests = repeatElement(EchoRequest.with { $0.message = "hello"}, count: 3)
let requestStream = Publishers.Sequence<Repeated<EchoRequest>, Error>(sequence: requests).eraseToAnyPublisher()
GRPCExecutor()
.call(client.ok)(requestStream)
.filter { $0.message == "hello" }
.count()
.sink(
receiveCompletion: expectFinished(resolve: promise),
receiveValue: expectValue { count in count == 3 }
)
.store(in: &Self.retainedCancellables)
wait(for: [promise], timeout: 0.2)
}
func testFailedPrecondition() {
let promise = expectation(description: "Call fails with failed precondition status")
let failedPrecondition = Self.client!.failedPrecondition
let requests = repeatElement(EchoRequest.with { $0.message = "hello"}, count: 3)
let requestStream = Publishers.Sequence<Repeated<EchoRequest>, Error>(sequence: requests).eraseToAnyPublisher()
GRPCExecutor()
.call(failedPrecondition)(requestStream)
.sink(
receiveCompletion: resolve(promise, expectingFailure:
{ error in
error.status.code == .failedPrecondition && error.trailingMetadata?.first(name: "custom") == "info"
}),
receiveValue: { empty in
XCTFail("Call should not return a response")
}
)
.store(in: &Self.retainedCancellables)
wait(for: [promise], timeout: 0.2)
}
func testNoResponse() {
let promise = expectation(description: "Call fails with deadline exceeded status")
let client = Self.client!
let options = CallOptions(timeLimit: TimeLimit.timeout(.milliseconds(20)))
let requests = repeatElement(EchoRequest.with { $0.message = "hello"}, count: 3)
let requestStream = Publishers.Sequence<Repeated<EchoRequest>, Error>(sequence: requests).eraseToAnyPublisher()
GRPCExecutor(callOptions: Just(options).eraseToAnyPublisher())
.call(client.noResponse)(requestStream)
.sink(
receiveCompletion: expectRPCError(code: .deadlineExceeded, resolve: promise),
receiveValue: expectNoValue()
)
.store(in: &Self.retainedCancellables)
wait(for: [promise], timeout: 0.2)
}
func testClientStreamError() {
let promise = expectation(description: "Call fails with aborted status")
let client = Self.client!
struct ClientStreamError: Error {}
let requests = Fail<EchoRequest, Error>(error: ClientStreamError()).eraseToAnyPublisher()
GRPCExecutor()
.call(client.ok)(requests)
.sink(
receiveCompletion: expectRPCError(code: .aborted, resolve: promise),
receiveValue: expectNoValue()
)
.store(in: &Self.retainedCancellables)
wait(for: [promise], timeout: 0.2)
}
static var allTests = [
("Bidirectional streaming OK", testOk),
("Bidirectional streaming failed precondition", testFailedPrecondition),
("Bidirectional streaming no response", testNoResponse),
("Bidirectional streaming with client stream error, stream failed", testClientStreamError),
]
}