-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_tcp.py
56 lines (47 loc) · 1.99 KB
/
test_tcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import multiprocessing
import pytest
import time
from communication_interfaces.exceptions import SocketConfigurationError
from communication_interfaces.sockets import TCPClientConfiguration, TCPClient, TCPServerConfiguration, TCPServer
class TestTCPCommunication:
server = TCPServer(TCPServerConfiguration(6000, 50, True))
client = TCPClient(TCPClientConfiguration("127.0.0.1", 6000, 50))
server_message = "Hello client"
client_message = "Hello server"
def serve(self):
self.server.open()
response = self.server.receive_bytes()
if response is None:
pytest.fail("No response from client")
assert response.decode("utf-8").rstrip("\x00") == self.client_message
assert self.server.send_bytes(self.server_message)
def test_comm(self):
t = multiprocessing.Process(target=self.serve)
t.start()
time.sleep(1.0)
self.client.open()
assert self.client.send_bytes(self.client_message)
response = self.client.receive_bytes()
assert response
assert response.decode("utf-8").rstrip("\x00") == self.server_message
buffer = ""
self.server.close()
with pytest.raises(SocketConfigurationError):
self.server.receive_bytes()
with pytest.raises(SocketConfigurationError):
self.server.send_bytes(buffer)
self.client.close()
with pytest.raises(SocketConfigurationError):
self.client.receive_bytes()
with pytest.raises(SocketConfigurationError):
self.client.send_bytes(buffer)
def test_not_open(self):
buffer = ""
with pytest.raises(SocketConfigurationError):
self.server.receive_bytes()
with pytest.raises(SocketConfigurationError):
self.server.send_bytes(buffer)
with pytest.raises(SocketConfigurationError):
self.client.receive_bytes()
with pytest.raises(SocketConfigurationError):
self.client.send_bytes(buffer)