diff --git a/core/async.c b/core/async.c index db8949429..d58a5b2e1 100644 --- a/core/async.c +++ b/core/async.c @@ -42,6 +42,7 @@ void uwsgi_async_init() { // optimization, this array maps file descriptor to requests uwsgi.async_waiting_fd_table = uwsgi_calloc(sizeof(struct wsgi_request *) * uwsgi.max_fd); uwsgi.async_proto_fd_table = uwsgi_calloc(sizeof(struct wsgi_request *) * uwsgi.max_fd); + uwsgi.async_idle_fd_table = uwsgi_calloc(sizeof(struct wsgi_request *) * uwsgi.max_fd); } @@ -53,6 +54,10 @@ struct wsgi_request *find_wsgi_req_by_fd(int fd) { return uwsgi.async_waiting_fd_table[fd]; } +struct wsgi_request *find_wsgi_req_idle_by_fd(int fd) { + return uwsgi.async_idle_fd_table[fd]; +} + static void runqueue_remove(struct uwsgi_async_request *u_request) { struct uwsgi_async_request *parent = u_request->prev; @@ -125,7 +130,11 @@ void async_reset_request(struct wsgi_request *wsgi_req) { struct uwsgi_async_fd *uaf = wsgi_req->waiting_fds; while (uaf) { - event_queue_del_fd(uwsgi.async_queue, uaf->fd, uaf->event); + if (uaf->fd == wsgi_req->fd) { + event_queue_idle_fd(uwsgi.async_queue, uaf->fd); + } else { + event_queue_del_fd(uwsgi.async_queue, uaf->fd, uaf->event); + } uwsgi.async_waiting_fd_table[uaf->fd] = NULL; struct uwsgi_async_fd *current_uaf = uaf; uaf = current_uaf->next; @@ -199,7 +208,11 @@ int async_add_fd_read(struct wsgi_request *wsgi_req, int fd, int timeout) { } uwsgi.async_waiting_fd_table[fd] = wsgi_req; wsgi_req->async_force_again = 1; - return event_queue_add_fd_read(uwsgi.async_queue, fd); + if (fd == wsgi_req->fd) { + return event_queue_fd_write_to_read(uwsgi.async_queue, fd); + } else { + return event_queue_add_fd_read(uwsgi.async_queue, fd); + } } static int async_wait_fd_read(int fd, int timeout) { @@ -307,7 +320,11 @@ int async_add_fd_write(struct wsgi_request *wsgi_req, int fd, int timeout) { uwsgi.async_waiting_fd_table[fd] = wsgi_req; wsgi_req->async_force_again = 1; - return event_queue_add_fd_write(uwsgi.async_queue, fd); + if (fd == wsgi_req->fd) { + return event_queue_fd_read_to_write(uwsgi.async_queue, fd); + } else { + return event_queue_add_fd_write(uwsgi.async_queue, fd); + } } static int async_wait_fd_write(int fd, int timeout) { @@ -533,6 +550,15 @@ void async_loop() { uwsgi_sock = uwsgi_sock->next; } + if (event_queue_interesting_fd_is_closed(events, i)) { + uwsgi.wsgi_req = find_wsgi_req_idle_by_fd(interesting_fd); + if (uwsgi.wsgi_req) { + uwsgi.wsgi_req->async_closed = 1; + runqueue_push(uwsgi.wsgi_req); + continue; + } + } + if (!is_a_new_connection) { // proto event uwsgi.wsgi_req = find_wsgi_req_proto_by_fd(interesting_fd); @@ -544,7 +570,8 @@ void async_loop() { if (!proto_parser_status) { // remove fd from event poll and fd proto table uwsgi.async_proto_fd_table[interesting_fd] = NULL; - event_queue_del_fd(uwsgi.async_queue, interesting_fd, event_queue_read()); + event_queue_idle_fd(uwsgi.async_queue, interesting_fd); + uwsgi.async_idle_fd_table[interesting_fd] = uwsgi.wsgi_req; // put request in the runqueue (set it as UWSGI_OK to signal the first run) uwsgi.wsgi_req->async_status = UWSGI_OK; runqueue_push(uwsgi.wsgi_req); diff --git a/core/event.c b/core/event.c index 6e3024501..e26acd462 100644 --- a/core/event.c +++ b/core/event.c @@ -516,7 +516,7 @@ int event_queue_add_fd_read(int eq, int fd) { struct epoll_event ee; memset(&ee, 0, sizeof(struct epoll_event)); - ee.events = EPOLLIN; + ee.events = EPOLLIN | EPOLLRDHUP; ee.data.fd = fd; if (epoll_ctl(eq, EPOLL_CTL_ADD, fd, &ee)) { @@ -532,7 +532,7 @@ int event_queue_fd_write_to_read(int eq, int fd) { struct epoll_event ee; memset(&ee, 0, sizeof(struct epoll_event)); - ee.events = EPOLLIN; + ee.events = EPOLLIN | EPOLLRDHUP; ee.data.fd = fd; if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) { @@ -548,7 +548,7 @@ int event_queue_fd_read_to_write(int eq, int fd) { struct epoll_event ee; memset(&ee, 0, sizeof(struct epoll_event)); - ee.events = EPOLLOUT; + ee.events = EPOLLOUT | EPOLLRDHUP; ee.data.fd = fd; if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) { @@ -564,7 +564,7 @@ int event_queue_fd_readwrite_to_read(int eq, int fd) { struct epoll_event ee; memset(&ee, 0, sizeof(struct epoll_event)); - ee.events = EPOLLIN; + ee.events = EPOLLIN | EPOLLRDHUP; ee.data.fd = fd; if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) { @@ -580,7 +580,7 @@ int event_queue_fd_readwrite_to_write(int eq, int fd) { struct epoll_event ee; memset(&ee, 0, sizeof(struct epoll_event)); - ee.events = EPOLLOUT; + ee.events = EPOLLOUT | EPOLLRDHUP; ee.data.fd = fd; if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) { @@ -597,7 +597,7 @@ int event_queue_fd_read_to_readwrite(int eq, int fd) { struct epoll_event ee; memset(&ee, 0, sizeof(struct epoll_event)); - ee.events = EPOLLIN | EPOLLOUT; + ee.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; ee.data.fd = fd; if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) { @@ -613,7 +613,7 @@ int event_queue_fd_write_to_readwrite(int eq, int fd) { struct epoll_event ee; memset(&ee, 0, sizeof(struct epoll_event)); - ee.events = EPOLLIN | EPOLLOUT; + ee.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; ee.data.fd = fd; if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) { @@ -642,12 +642,28 @@ int event_queue_del_fd(int eq, int fd, int event) { return 0; } +int event_queue_idle_fd(int eq, int fd) { + + struct epoll_event ee; + + memset(&ee, 0, sizeof(struct epoll_event)); + ee.data.fd = fd; + ee.events = EPOLLRDHUP; + + if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) { + uwsgi_error("epoll_ctl()"); + return -1; + } + + return 0; +} + int event_queue_add_fd_write(int eq, int fd) { struct epoll_event ee; memset(&ee, 0, sizeof(struct epoll_event)); - ee.events = EPOLLOUT; + ee.events = EPOLLOUT | EPOLLRDHUP; ee.data.fd = fd; if (epoll_ctl(eq, EPOLL_CTL_ADD, fd, &ee)) { @@ -692,6 +708,13 @@ int event_queue_interesting_fd_is_write(void *events, int id) { return 0; } +int event_queue_interesting_fd_is_closed(void *events, int id) { + struct epoll_event *ee = (struct epoll_event *) events; + if (ee[id].events & EPOLLRDHUP) { + return 1; + } + return 0; +} int event_queue_wait_multi(int eq, int timeout, void *events, int nevents) { diff --git a/plugins/python/uwsgi_python.h b/plugins/python/uwsgi_python.h index aca1f83b7..12104fc1e 100644 --- a/plugins/python/uwsgi_python.h +++ b/plugins/python/uwsgi_python.h @@ -62,6 +62,8 @@ #define uwsgi_py_write_set_exception(x) if (!uwsgi.disable_write_exception) { PyErr_SetString(PyExc_IOError, "write error"); }; #define uwsgi_py_write_exception(x) uwsgi_py_write_set_exception(x); uwsgi_manage_exception(x, 0); +#define uwsgi_py_closed_set_exception(x) }; +#define uwsgi_py_closed_exception(x) PyErr_SetString(PyExc_IOError, "Connection reset by peer"); uwsgi_manage_exception(x, 0); #define uwsgi_py_check_write_errors if (wsgi_req->write_errors > 0 && uwsgi.write_errors_exception_only) {\ uwsgi_py_write_set_exception(wsgi_req);\ diff --git a/plugins/python/wsgi_subhandler.c b/plugins/python/wsgi_subhandler.c index 32a3aee7e..759b8a083 100644 --- a/plugins/python/wsgi_subhandler.c +++ b/plugins/python/wsgi_subhandler.c @@ -268,6 +268,11 @@ int uwsgi_response_subhandler_wsgi(struct wsgi_request *wsgi_req) { PyObject *pychunk; + if (wsgi_req->async_closed) { + uwsgi_py_closed_exception(wsgi_req); + goto clear; + } + // return or yield ? // in strict mode we do not optimize apps directly returning strings (or bytes) if (!up.wsgi_strict) { diff --git a/uwsgi.h b/uwsgi.h index 2b32cad79..808efd911 100755 --- a/uwsgi.h +++ b/uwsgi.h @@ -1498,6 +1498,7 @@ struct wsgi_request { int async_id; int async_status; + int async_closed; int switches; size_t write_pos; @@ -2366,6 +2367,7 @@ struct uwsgi_server { // async commodity struct wsgi_request **async_waiting_fd_table; struct wsgi_request **async_proto_fd_table; + struct wsgi_request **async_idle_fd_table; struct uwsgi_async_request *async_runqueue; struct uwsgi_async_request *async_runqueue_last; @@ -3407,6 +3409,7 @@ int event_queue_init(void); void *event_queue_alloc(int); int event_queue_add_fd_read(int, int); int event_queue_add_fd_write(int, int); +int event_queue_idle_fd(int, int); int event_queue_del_fd(int, int, int); int event_queue_wait(int, int, int *); int event_queue_wait_multi(int, int, void *, int); @@ -3420,6 +3423,7 @@ int event_queue_fd_read_to_readwrite(int, int); int event_queue_fd_write_to_readwrite(int, int); int event_queue_interesting_fd_is_read(void *, int); int event_queue_interesting_fd_is_write(void *, int); +int event_queue_interesting_fd_is_closed(void *, int); int event_queue_add_timer(int, int *, int); int event_queue_add_timer_hr(int, int *, int, long);