Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 46 additions & 27 deletions core/uwsgi.c
Original file line number Diff line number Diff line change
Expand Up @@ -1319,38 +1319,57 @@ void kill_them_all(int signum) {

// gracefully destroy
void gracefully_kill_them_all(int signum) {
if (uwsgi_instance_is_dying) return;
uwsgi.status.gracefully_destroying = 1;

int waitpid_status;
// unsubscribe if needed
uwsgi_unsubscribe_all();

if (uwsgi_instance_is_dying) return;
uwsgi.status.gracefully_destroying = 1;
uwsgi_log_verbose("graceful shutdown triggered...\n");

// unsubscribe if needed
uwsgi_unsubscribe_all();

uwsgi_log_verbose("graceful shutdown triggered...\n");

int i;
for (i = 1; i <= uwsgi.numproc; i++) {
if (uwsgi.workers[i].pid > 0) {
int i;
for (i = 1; i <= uwsgi.numproc; i++) {
if (uwsgi.workers[i].pid > 0) {
if (uwsgi.shutdown_sockets)
uwsgi.workers[i].shutdown_sockets = 1;
uwsgi_curse(i, SIGHUP);
}
uwsgi.workers[i].shutdown_sockets = 1;
uwsgi_curse(i, SIGHUP);
}
}

for (i = 0; i < uwsgi.mules_cnt; i++) {
if (uwsgi.mules[i].pid > 0) {
uwsgi_curse_mule(i, SIGHUP);
}
}

// avoid breaking other child process signal handling logic by doing nohang checks on the workers
// until they are all done.
int keep_waiting = 1;
while (keep_waiting == 1) {
int still_running = 0;
int errors = 0;
for (i = 1; i <= uwsgi.numproc; i++) {
if (uwsgi.workers[i].pid > 0) {
pid_t rval = waitpid(uwsgi.workers[i].pid, NULL, WNOHANG);
if (rval == uwsgi.workers[i].pid) {
uwsgi.workers[i].pid = 0;
} else if (rval == 0) {
still_running++;
} else if (rval < 0) {
errors++;
}
for (i = 0; i < uwsgi.mules_cnt; i++) {
if (uwsgi.mules[i].pid > 0) {
uwsgi_curse_mule(i, SIGHUP);
}
}

for (i = 1; i <= uwsgi.numproc; i++) {
if (uwsgi.workers[i].pid > 0) {
waitpid(uwsgi.workers[i].pid, &waitpid_status, 0);
}
}

uwsgi_destroy_processes();
}
}

// exit out if everything is done or we got errors as we can't do much about the errors at this point
if (still_running == 0 || errors > 0) {
keep_waiting = 0;
break;
}
sleep(1);
}

uwsgi_destroy_processes();
}


Expand Down
129 changes: 129 additions & 0 deletions plugins/python/tracebacker.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,133 @@ char *uwsgi_python_get_thread_name(PyObject *thread_id) {
return NULL;
}

#ifdef UWSGI_PY311

void *uwsgi_python_tracebacker_thread(void *foobar) {
struct iovec iov;
struct sockaddr_un so_sun;
socklen_t so_sun_len = 0;
PyObject *new_thread = NULL, *globalsDict = NULL, *localsDict = NULL, *codeFunc = NULL;

memset(&iov, 0, sizeof(iov));

// this function locks the GIL
new_thread = uwsgi_python_setup_thread("uWSGITraceBacker");
if (!new_thread) {
UWSGI_RELEASE_GIL;
return NULL;
}

char *str_wid = uwsgi_num2str(uwsgi.mywid);
char *sock_path = uwsgi_concat2(up.tracebacker, str_wid);
free(str_wid);

int current_defer_accept = uwsgi.no_defer_accept;
uwsgi.no_defer_accept = 1;
int fd = bind_to_unix(sock_path, uwsgi.listen_queue, uwsgi.chmod_socket, uwsgi.abstract_socket);
uwsgi.no_defer_accept = current_defer_accept;
if (fd < 0) {
goto cleanup;
}

char *codestr =
"def uwsgi_tracebacker_process_frame(frame):\n"
" import io, traceback\n"
"\n"
" lines = 0\n"
" rval = io.StringIO()\n"
" rval.write('*** uWSGI Python tracebacker output ***\\n')\n"
" ss = traceback.extract_stack(frame)\n"
" sslines = ss.format()\n"
" if sslines:\n"
" rval.writelines(sslines)\n"
" lines += 1\n"
" return rval.getvalue() if lines > 0 else None\n";
globalsDict = PyDict_New();
localsDict = PyDict_New();
PyObject *result = PyRun_String(codestr, Py_file_input, globalsDict, localsDict);
if (!result) {
uwsgi_log("!!! Failed to define uwsgi_tracebacker_lines function\n");
PyErr_Print();
} else {
codeFunc = PyDict_GetItemString(localsDict, "uwsgi_tracebacker_process_frame");
Py_DECREF(result);
}
UWSGI_RELEASE_GIL;

if (!codeFunc) {
uwsgi_log("** failed to initialize python tracebacker thread for worker %d'n", uwsgi.mywid);
goto cleanup;
}

uwsgi_log("** python3.11 tracebacker for worker %d listening on %s\n", uwsgi.mywid, sock_path);

while (uwsgi.shutdown_sockets == 0) {
int client_fd = accept(fd, (struct sockaddr *) &so_sun, &so_sun_len);
if (client_fd < 0) {
uwsgi_error("accept()");
goto loopcontinue;
}

UWSGI_GET_GIL;

PyFrameObject *threadFrame = PyThreadState_GetFrame(up.main_thread);
if (threadFrame) {
PyObject *tb = PyObject_CallFunctionObjArgs(codeFunc, threadFrame, NULL);
if (!tb) {
uwsgi_log("pytracebacker(%d): uwsgi_tracebacker failed\n", uwsgi.mywid);
PyErr_Print();
} else {
if (tb != Py_None) {
PyObject *tb_bytes = PyUnicode_AsUTF8String(tb);
if (tb_bytes) {
char* tb_bytes_cstr = PyBytes_AS_STRING(tb_bytes); // internal ptr, don't free
int tb_bytes_len = PyBytes_GET_SIZE(tb_bytes);
if (tb_bytes_cstr) {
iov.iov_base = tb_bytes_cstr;
iov.iov_len = tb_bytes_len;
int bytes_written = writev(client_fd, &iov, 1);
if (bytes_written < 0) {
uwsgi_error("writev()");
} else if (bytes_written < tb_bytes_len) {
uwsgi_log("pytracebacker(%d): only wrote %d/%d bytes\n", uwsgi.mywid, bytes_written, tb_bytes_len);
} else {
uwsgi_log("**!! pytracebacker(%d): sent traceback\n", uwsgi.mywid);
}
}
Py_DECREF(tb_bytes);
}
}
Py_DECREF(tb);
}
Py_DECREF(threadFrame);
}

UWSGI_RELEASE_GIL;

loopcontinue:
if (client_fd >= 0) {
close(client_fd);
}
}

cleanup:
UWSGI_GET_GIL;
Py_XDECREF(codeFunc);
Py_XDECREF(globalsDict);
Py_XDECREF(localsDict);
Py_DECREF(new_thread);
UWSGI_RELEASE_GIL;

if (sock_path) free(sock_path);
if (fd >= 0) close(fd);

return NULL;
}

#else

// old pre 3.11 version
void *uwsgi_python_tracebacker_thread(void *foobar) {

struct iovec iov[11];
Expand Down Expand Up @@ -288,3 +415,5 @@ void *uwsgi_python_tracebacker_thread(void *foobar) {
}
return NULL;
}

#endif