@@ -27,7 +27,9 @@ bool _hasattr(object o, const char* name)
27
27
return PyObject_HasAttrString (o.ptr (), name);
28
28
}
29
29
30
- void _sock_connect_cb (object pymod_socket, object fut, object sock, object addr)
30
+ }
31
+
32
+ void event_loop::_sock_connect_cb (object pymod_socket, object fut, object sock, object addr)
31
33
{
32
34
try
33
35
{
@@ -61,11 +63,10 @@ void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr)
61
63
}
62
64
}
63
65
64
- void _sock_accept (event_loop& loop, object fut, object sock)
66
+ void event_loop:: _sock_accept (event_loop& loop, object fut, object sock)
65
67
{
66
68
int fd = extract<int >(sock.attr (" fileno" )());
67
- object conn;
68
- object address;
69
+ object conn, address;
69
70
try
70
71
{
71
72
object ret = sock.attr (" accept" )();
@@ -80,9 +81,7 @@ void _sock_accept(event_loop& loop, object fut, object sock)
80
81
|| PyErr_ExceptionMatches (PyExc_InterruptedError))
81
82
{
82
83
PyErr_Clear ();
83
- loop.add_reader (fd, make_function (bind (
84
- _sock_accept, boost::ref (loop), fut, sock),
85
- default_call_policies (), boost::mpl::vector<void , object>()));
84
+ loop._async_wait_fd (fd, bind (_sock_accept, boost::ref (loop), fut, sock), loop._write_key (fd));
86
85
}
87
86
else if (PyErr_ExceptionMatches (PyExc_SystemExit)
88
87
|| PyErr_ExceptionMatches (PyExc_KeyboardInterrupt))
@@ -94,43 +93,6 @@ void _sock_accept(event_loop& loop, object fut, object sock)
94
93
PyErr_Clear ();
95
94
fut.attr (" set_exception" )(std::current_exception ());
96
95
}
97
- }
98
- }
99
-
100
- }
101
-
102
- void event_loop::_add_reader_or_writer (int fd, object f, int key)
103
- {
104
- // add descriptor
105
- if (_descriptor_map.find (key) == _descriptor_map.end ())
106
- {
107
- _descriptor_map.emplace (key,
108
- std::move (std::make_unique<boost::asio::posix::stream_descriptor>(_strand.context (), fd))
109
- );
110
- }
111
-
112
- _descriptor_map.find (key)->second ->async_wait (boost::asio::posix::descriptor::wait_type::wait_read,
113
- boost::asio::bind_executor (_strand, [key, f, loop=this ] (const boost::system ::error_code& ec)
114
- {
115
- // move descriptor
116
- auto iter = loop->_descriptor_map .find (key);
117
- if (iter != loop->_descriptor_map .end ())
118
- {
119
- iter->second ->release ();
120
- loop->_descriptor_map .erase (iter);
121
- }
122
- loop->call_soon (f);
123
- }));
124
- return ;
125
- }
126
-
127
- void event_loop::_remove_reader_or_writer (int key)
128
- {
129
- auto iter = _descriptor_map.find (key);
130
- if (iter != _descriptor_map.end ())
131
- {
132
- iter->second ->release ();
133
- _descriptor_map.erase (iter);
134
96
}
135
97
}
136
98
@@ -155,14 +117,14 @@ object event_loop::sock_recv(object sock, size_t nbytes)
155
117
{
156
118
int fd = extract<int >(sock.attr (" fileno" )());
157
119
int fd_dup = dup (fd);
158
- object py_fut = _pymod_concurrent_future.attr (" Future" )();
159
- add_reader (fd_dup, make_function (
160
- [py_fut, nbytes, fd=fd_dup] (object obj) {
120
+ object py_fut = _py_wrap_future ( _pymod_concurrent_future.attr (" Future" )() );
121
+ _async_wait_fd (fd_dup,
122
+ [py_fut, nbytes, fd=fd_dup] {
161
123
std::vector<char > buffer (nbytes);
162
124
read (fd, buffer.data (), nbytes);
163
125
py_fut.attr (" set_result" )(object (handle<>(PyBytes_FromStringAndSize (buffer.data (), nbytes))));
164
126
},
165
- default_call_policies (), boost::mpl::vector< void , object>() ));
127
+ _read_key (fd ));
166
128
return py_fut;
167
129
}
168
130
@@ -171,14 +133,14 @@ object event_loop::sock_recv_into(object sock, object buffer)
171
133
int fd = extract<int >(sock.attr (" fileno" )());
172
134
int fd_dup = dup (fd);
173
135
ssize_t nbytes = len (buffer);
174
- object py_fut = _pymod_concurrent_future.attr (" Future" )();
175
- add_reader (fd_dup, make_function (
176
- [py_fut, nbytes, fd=fd_dup] (object obj) {
136
+ object py_fut = _py_wrap_future ( _pymod_concurrent_future.attr (" Future" )() );
137
+ _async_wait_fd (fd_dup,
138
+ [py_fut, nbytes, fd=fd_dup] {
177
139
std::vector<char > buffer (nbytes);
178
140
ssize_t nbytes_read = read (fd, buffer.data (), nbytes);
179
141
py_fut.attr (" set_result" )(nbytes_read);
180
- },
181
- default_call_policies (), boost::mpl::vector< void , object>() ));
142
+ },
143
+ _read_key (fd ));
182
144
return py_fut;
183
145
}
184
146
@@ -188,13 +150,13 @@ object event_loop::sock_sendall(object sock, object data)
188
150
int fd_dup = dup (fd);
189
151
char const * py_str = extract<char const *>(data.attr (" decode" )());
190
152
ssize_t py_str_len = len (data);
191
- object py_fut = _pymod_concurrent_future.attr (" Future" )();
192
- add_writer (fd_dup, make_function (
193
- [py_fut, fd, py_str, py_str_len] (object obj) {
153
+ object py_fut = _py_wrap_future ( _pymod_concurrent_future.attr (" Future" )() );
154
+ _async_wait_fd (fd_dup,
155
+ [py_fut, fd, py_str, py_str_len] {
194
156
write (fd, py_str, py_str_len);
195
157
py_fut.attr (" set_result" )(object ());
196
- },
197
- default_call_policies (), boost::mpl::vector< void , object>() ));
158
+ },
159
+ _write_key (fd ));
198
160
return py_fut;
199
161
}
200
162
@@ -205,22 +167,20 @@ object event_loop::sock_connect(object sock, object address)
205
167
{
206
168
// TODO: _ensure_resolve
207
169
}
208
- object fut = _pymod_concurrent_future.attr (" Future" )();
170
+ object py_fut = _py_wrap_future ( _pymod_concurrent_future.attr (" Future" )() );
209
171
int fd = extract<int >(sock.attr (" fileno" )());
210
172
try
211
173
{
212
174
sock.attr (" connect" )(address);
213
- fut .attr (" set_result" )(object ());
175
+ py_fut .attr (" set_result" )(object ());
214
176
}
215
177
catch (const error_already_set& e)
216
178
{
217
179
if (PyErr_ExceptionMatches (PyExc_BlockingIOError)
218
180
|| PyErr_ExceptionMatches (PyExc_InterruptedError))
219
181
{
220
182
PyErr_Clear ();
221
- add_writer (dup (fd), make_function (bind (
222
- _sock_connect_cb, _pymod_socket, fut, sock, address),
223
- default_call_policies (), boost::mpl::vector<void , object>()));
183
+ _async_wait_fd (dup (fd), bind (_sock_connect_cb, _pymod_socket, py_fut, sock, address), _write_key (fd));
224
184
}
225
185
else if (PyErr_ExceptionMatches (PyExc_SystemExit)
226
186
|| PyErr_ExceptionMatches (PyExc_KeyboardInterrupt))
@@ -230,17 +190,17 @@ object event_loop::sock_connect(object sock, object address)
230
190
else
231
191
{
232
192
PyErr_Clear ();
233
- fut .attr (" set_exception" )(std::current_exception ());
193
+ py_fut .attr (" set_exception" )(std::current_exception ());
234
194
}
235
195
}
236
- return fut ;
196
+ return py_fut ;
237
197
}
238
198
239
199
object event_loop::sock_accept (object sock)
240
200
{
241
- object fut = _pymod_concurrent_future.attr (" Future" )();
242
- _sock_accept (*this , fut , sock);
243
- return fut ;
201
+ object py_fut = _py_wrap_future ( _pymod_concurrent_future.attr (" Future" )() );
202
+ _sock_accept (*this , py_fut , sock);
203
+ return py_fut ;
244
204
}
245
205
246
206
// TODO: implement this
@@ -262,27 +222,23 @@ object event_loop::start_tls(object transport, object protocol, object sslcontex
262
222
263
223
object event_loop::getaddrinfo (object host, int port, int family, int type, int proto, int flags)
264
224
{
265
- object py_fut = _pymod_concurrent_future.attr (" Future" )();
266
- call_soon ( make_function (
267
- [this , py_fut, host, port, family, type, proto, flags] (object obj) {
225
+ object py_fut = _py_wrap_future ( _pymod_concurrent_future.attr (" Future" )() );
226
+ _strand. post (
227
+ [this , py_fut, host, port, family, type, proto, flags] {
268
228
object res = _pymod_socket.attr (" getaddrinfo" )(host, port, family, type, proto, flags);
269
229
py_fut.attr (" set_result" )(res);
270
- },
271
- default_call_policies (),
272
- boost::mpl::vector<void , object>()));
230
+ });
273
231
return py_fut;
274
232
}
275
233
276
234
object event_loop::getnameinfo (object sockaddr, int flags)
277
235
{
278
- object py_fut = _pymod_concurrent_future.attr (" Future" )();
279
- call_soon ( make_function (
280
- [this , py_fut, sockaddr, flags] (object obj) {
236
+ object py_fut = _py_wrap_future ( _pymod_concurrent_future.attr (" Future" )() );
237
+ _strand. post (
238
+ [this , py_fut, sockaddr, flags] {
281
239
object res = _pymod_socket.attr (" getnameinfo" )(sockaddr, flags);
282
240
py_fut.attr (" set_result" )(res);
283
- },
284
- default_call_policies (),
285
- boost::mpl::vector<void , object>()));
241
+ });
286
242
return py_fut;
287
243
}
288
244
@@ -345,7 +301,7 @@ void event_loop::default_exception_handler(object context)
345
301
dict kwargs;
346
302
args.append (str (" \n " ).join (log_lines));
347
303
kwargs[" exc_info" ] = exc_info;
348
- _pymod_logger .attr (" error" )(tuple (args), **kwargs);
304
+ _py_logger .attr (" error" )(tuple (args), **kwargs);
349
305
}
350
306
351
307
void event_loop::call_exception_handler (object context)
@@ -370,7 +326,7 @@ void event_loop::call_exception_handler(object context)
370
326
dict kwargs;
371
327
args.append (str (" Exception in default exception handler" ));
372
328
kwargs[" exc_info" ] = true ;
373
- _pymod_logger .attr (" error" )(tuple (args), **kwargs);
329
+ _py_logger .attr (" error" )(tuple (args), **kwargs);
374
330
}
375
331
}
376
332
}
@@ -416,7 +372,7 @@ void event_loop::call_exception_handler(object context)
416
372
boost::python::dict kwargs;
417
373
args.append (str (" Exception in default exception handler" ));
418
374
kwargs[" exc_info" ] = true ;
419
- _pymod_logger .attr (" error" )(tuple (args), **kwargs);
375
+ _py_logger .attr (" error" )(tuple (args), **kwargs);
420
376
}
421
377
}
422
378
}
0 commit comments