Skip to content

Commit 31264ff

Browse files
committed
add implementations for eventloop functions sock_*
1 parent 5747a3a commit 31264ff

File tree

2 files changed

+196
-14
lines changed

2 files changed

+196
-14
lines changed

include/boost/python/eventloop.hpp

+9-4
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,21 @@ class event_loop
6767
}
6868

6969

70-
void sock_recv(object sock, int bytes);
70+
object sock_recv(object sock, size_t nbytes);
7171

72-
void sock_recv_into(object sock, object buffer);
72+
size_t sock_recv_into(object sock, object buffer);
7373

74-
void sock_sendall(object sock, object data);
74+
object sock_sendall(object sock, object data);
7575

7676
void sock_connect(object sock, object address);
7777

78-
void sock_accept(object sock);
78+
object sock_accept(object sock);
7979

8080
void sock_sendfile(object sock, object file, int offset = 0, int count = 0, bool fallback = true);
8181

8282
private:
8383
int64_t _timer_id = 0;
84+
object _pymod_socket = import("socket");
8485
boost::asio::io_context::strand _strand;
8586
std::unordered_map<int, std::unique_ptr<boost::asio::steady_timer>> _id_to_timer_map;
8687
// read: key = fd * 2 + 0, write: key = fd * 2 + 1
@@ -89,6 +90,10 @@ class event_loop
8990

9091
void _add_reader_or_writer(int fd, object f, int key);
9192
void _remove_reader_or_writer(int key);
93+
static void _sock_connect_cb(
94+
event_loop& loop, std::promise<void>& prom, std::future<void>& fut, object sock, object addr);
95+
static void _sock_accept(
96+
event_loop& loop, std::promise<object>& prom, std::future<object>& fut, object sock);
9297
};
9398

9499
}}}

src/eventloop.cpp

+187-10
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,44 @@
66
// TODO:
77
// 1. posix::stream_descriptor need windows version
88
// 2. call_* need return async.Handle
9+
// 3. _ensure_fd_no_transport
10+
// 4. _ensure_resolve
911

1012
#include <boost/asio.hpp>
1113
#include <boost/bind.hpp>
1214
#include <boost/python.hpp>
1315
#include <boost/python/eventloop.hpp>
16+
#include <boost/mpl/vector.hpp>
17+
#include <Python.h>
1418

1519

1620
namespace boost { namespace python { namespace asio {
21+
namespace
22+
{
23+
24+
bool _hasattr(object o, const char* name)
25+
{
26+
return PyObject_HasAttrString(o.ptr(), name);
27+
}
28+
29+
void _sock_recv_handler(
30+
std::promise<std::vector<char>>& prom_data,
31+
std::promise<size_t>& prom_nbytes_read,
32+
size_t nbytes,
33+
int fd)
34+
{
35+
std::vector<char> buffer(nbytes);
36+
prom_nbytes_read.set_value(read(fd, buffer.data(), nbytes));
37+
prom_data.set_value(std::move(buffer));
38+
}
39+
40+
void _sock_send_handler(std::promise<size_t>& prom, int fd, const char *py_str, ssize_t len)
41+
{
42+
size_t nwrite = write(fd, py_str, len);
43+
prom.set_value(nwrite);
44+
}
45+
46+
}
1747

1848
void event_loop::_add_reader_or_writer(int fd, object f, int key)
1949
{
@@ -50,6 +80,85 @@ void event_loop::_remove_reader_or_writer(int key)
5080
}
5181
}
5282

83+
void event_loop::_sock_connect_cb(
84+
event_loop& loop, std::promise<void>& prom, std::future<void>& fut, object sock, object addr)
85+
{
86+
try
87+
{
88+
object err = sock.attr("getsockopt")(
89+
loop._pymod_socket.attr("SOL_SOCKET"), loop._pymod_socket.attr("SO_ERROR"));
90+
if (err != object(0)) {
91+
// TODO: print the address
92+
PyErr_SetString(PyExc_OSError, "Connect call failed {address}");
93+
}
94+
}
95+
catch (const error_already_set& e)
96+
{
97+
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
98+
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
99+
{
100+
PyErr_Clear();
101+
// pass
102+
}
103+
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
104+
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
105+
{
106+
// raise
107+
}
108+
else if (PyErr_ExceptionMatches(PyExc_BaseException))
109+
{
110+
PyErr_Clear();
111+
prom.set_exception(std::current_exception());
112+
}
113+
else
114+
{
115+
PyErr_Clear();
116+
prom.set_value();
117+
}
118+
}
119+
}
120+
121+
void event_loop::_sock_accept(
122+
event_loop& loop, std::promise<object>& prom, std::future<object>& fut, object sock)
123+
{
124+
int fd = extract<int>(sock.attr("fileno")());
125+
object conn;
126+
object address;
127+
try
128+
{
129+
object ret = sock.attr("accept")();
130+
conn = ret[0];
131+
address = ret[1];
132+
conn.attr("setblocking")(object(false));
133+
}
134+
catch (const error_already_set& e)
135+
{
136+
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
137+
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
138+
{
139+
PyErr_Clear();
140+
loop.add_reader(fd, make_function(bind(
141+
_sock_accept, boost::ref(loop), boost::ref(prom), boost::ref(fut), sock),
142+
default_call_policies(), boost::mpl::vector<void, object>()));
143+
}
144+
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
145+
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
146+
{
147+
// raise
148+
}
149+
else if (PyErr_ExceptionMatches(PyExc_BaseException))
150+
{
151+
PyErr_Clear();
152+
prom.set_exception(std::current_exception());
153+
}
154+
else
155+
{
156+
PyErr_Clear();
157+
prom.set_value(make_tuple(conn, address));
158+
}
159+
}
160+
}
161+
53162
void event_loop::call_later(double delay, object f)
54163
{
55164
// add timer
@@ -76,34 +185,102 @@ void event_loop::call_at(double when, object f)
76185
return call_soon(f);
77186
}
78187

79-
void event_loop::sock_recv(object sock, int bytes)
188+
object event_loop::sock_recv(object sock, size_t nbytes)
80189
{
81-
190+
int fd = extract<int>(sock.attr("fileno")());
191+
std::promise<std::vector<char>> prom_data;
192+
std::future<std::vector<char>> fut_data = prom_data.get_future();
193+
std::promise<size_t> prom_nbytes_read;
194+
std::future<size_t> fut_nbytes_read = prom_nbytes_read.get_future();
195+
add_reader(fd, make_function(bind(_sock_recv_handler,
196+
boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd),
197+
default_call_policies(), boost::mpl::vector<void, object>()));
198+
return object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes)));
82199
}
83200

84-
void event_loop::sock_recv_into(object sock, object buffer)
201+
size_t event_loop::sock_recv_into(object sock, object buffer)
85202
{
86-
203+
int fd = extract<int>(sock.attr("fileno")());
204+
ssize_t nbytes = len(buffer);
205+
std::promise<std::vector<char>> prom_data;
206+
std::future<std::vector<char>> fut_data = prom_data.get_future();
207+
std::promise<size_t> prom_nbytes_read;
208+
std::future<size_t> fut_nbytes_read = prom_nbytes_read.get_future();
209+
add_reader(fd, make_function(bind(_sock_recv_handler,
210+
boost::ref(prom_data), boost::ref(prom_nbytes_read), nbytes, fd),
211+
default_call_policies(), boost::mpl::vector<void, object>()));
212+
buffer = object(handle<>(PyBytes_FromStringAndSize(fut_data.get().data(), nbytes)));
213+
return fut_nbytes_read.get();
87214
}
88215

89-
void event_loop::sock_sendall(object sock, object data)
216+
object event_loop::sock_sendall(object sock, object data)
90217
{
91-
218+
int fd = extract<int>(sock.attr("fileno")());
219+
char const* py_str = extract<char const*>(data.attr("decode")());
220+
ssize_t py_str_len = len(data);
221+
std::promise<size_t> prom;
222+
std::future<size_t> fut = prom.get_future();
223+
add_writer(fd, make_function(bind(_sock_send_handler, std::ref(prom), fd, py_str, py_str_len),
224+
default_call_policies(), boost::mpl::vector<void, object>()));
225+
fut.wait();
226+
return object();
92227
}
93228

94229
void event_loop::sock_connect(object sock, object address)
95230
{
96-
231+
232+
if (!_hasattr(_pymod_socket, "AF_UNIX") || sock.attr("family") != _pymod_socket.attr("AF_UNIX"))
233+
{
234+
// TODO: _ensure_resolve
235+
}
236+
std::promise<void> prom;
237+
std::future<void> fut = prom.get_future();
238+
int fd = extract<int>(sock.attr("fileno")());
239+
try
240+
{
241+
sock.attr("connect")(address);
242+
}
243+
catch (const error_already_set& e)
244+
{
245+
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
246+
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
247+
{
248+
PyErr_Clear();
249+
add_writer(fd, make_function(bind(
250+
_sock_connect_cb, boost::ref(*this), boost::ref(prom), boost::ref(fut), sock, address),
251+
default_call_policies(), boost::mpl::vector<void, object>()));
252+
}
253+
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
254+
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
255+
{
256+
// raise
257+
}
258+
else if (PyErr_ExceptionMatches(PyExc_BaseException))
259+
{
260+
PyErr_Clear();
261+
prom.set_exception(std::current_exception());
262+
}
263+
else
264+
{
265+
PyErr_Clear();
266+
prom.set_value();
267+
}
268+
}
269+
fut.wait();
97270
}
98271

99-
void event_loop::sock_accept(object sock)
272+
object event_loop::sock_accept(object sock)
100273
{
101-
274+
std::promise<object> prom;
275+
std::future<object> fut = prom.get_future();
276+
_sock_accept(*this, prom, fut, sock);
277+
return fut.get();
102278
}
103279

280+
// TODO: implement this
104281
void event_loop::sock_sendfile(object sock, object file, int offset, int count, bool fallback)
105282
{
106-
283+
PyErr_SetString(PyExc_NotImplementedError, "Not implemented!");
107284
}
108285

109286
}}}

0 commit comments

Comments
 (0)