diff --git a/.gitignore b/.gitignore index 32e6109572..fbe39a452e 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,9 @@ core/dot_h.c /uWSGI.egg-info/ check/check_* !check/*.c + +.vscode +/plugins/cffi/_* +/plugins/cffi/cffi_plugin.c + +.uwsgi_plugins_builder \ No newline at end of file diff --git a/buildconf/cffi.ini b/buildconf/cffi.ini new file mode 100644 index 0000000000..36ce08a9c3 --- /dev/null +++ b/buildconf/cffi.ini @@ -0,0 +1,3 @@ +[uwsgi] +main_plugin = cffi +inherit = base diff --git a/plugins/cffi/Makefile b/plugins/cffi/Makefile new file mode 100644 index 0000000000..9856fa5453 --- /dev/null +++ b/plugins/cffi/Makefile @@ -0,0 +1,13 @@ +PYTHON := python + +cffi_plugin.c: cffi_plugin.py cffi_init.py uwsgiplugin.py _constants.h _uwsgi.h types.h module_bundle.py uwsgi.py + $(PYTHON) cffi_plugin.py + +_constants.h: ../../uwsgi.h constants.py + $(PYTHON) constants.py + +_uwsgi.h: _uwsgi_preprocessed.h filtercdefs.py + $(PYTHON) filtercdefs.py _uwsgi_preprocessed.h > _uwsgi.h + +clean: + rm _uwsgi.h _uwsgi_preprocessed.h _constants.h cffi_plugin.c diff --git a/plugins/cffi/build_bundle.py b/plugins/cffi/build_bundle.py new file mode 100755 index 0000000000..3e570c5cd2 --- /dev/null +++ b/plugins/cffi/build_bundle.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +""" +Create bundle to be used as cffi init code. +""" + +import os.path +import gzip +import base64 +import sys +import pprint + +BUNDLED = {"_init": "cffi_init.py", "uwsgi": "uwsgi.py"} +OUTPUT = {} + + +def bundler(out): + for key, filename in BUNDLED.items(): + with open(filename, "rb") as file: + data = file.read() + data = gzip.compress(data) + data = base64.b64encode(data).decode("latin1") + OUTPUT[key] = data + + with open("module_bundle.py", "r") as loader: + for line in loader: + out.write(line) + if line.startswith("# MODULES"): + out.write("\nMODULES =") + pprint.pprint(OUTPUT, out) + + +if __name__ == "__main__": + bundler(sys.stdout) diff --git a/plugins/cffi/cffi_asgi.py b/plugins/cffi/cffi_asgi.py new file mode 100644 index 0000000000..539408a050 --- /dev/null +++ b/plugins/cffi/cffi_asgi.py @@ -0,0 +1,173 @@ +""" +event-loop independent ASGI functions +""" + +from _uwsgi import ffi, lib + + +def asgi_start_response(wsgi_req, status, headers): + status = b"%d" % status + lib.uwsgi_response_prepare_headers(wsgi_req, ffi.new("char[]", status), len(status)) + for (header, value) in headers: + lib.uwsgi_response_add_header( + wsgi_req, + ffi.new("char[]", header), + len(header), + ffi.new("char[]", value), + len(value), + ) + + +def asgi_scope_http(wsgi_req): + """ + Create the ASGI scope for a http or websockets connection. + """ + environ = {} + headers = [] + iov = wsgi_req.hvec + for i in range(0, wsgi_req.var_cnt, 2): + key, value = ( + ffi.string(ffi.cast("char*", iov[i].iov_base), iov[i].iov_len), + ffi.string(ffi.cast("char*", iov[i + 1].iov_base), iov[i + 1].iov_len), + ) + if key.startswith(b"HTTP_"): + # replace cgi-style _ with http-style - + headers.append((key[5:].lower().replace(b"_", b"-"), value)) + else: + environ[key.decode("ascii")] = value + + REMOTE_PORT = 0 + if "REMOTE_PORT" in environ: + REMOTE_PORT = int(environ["REMOTE_PORT"]) + SERVER_PORT = 0 + if "SERVER_PORT" in environ: + SERVER_PORT = int(environ["SERVER_PORT"]) + + scope = { + "type": "http", + "asgi": {"spec_version": "2.1"}, + "http_version": environ["SERVER_PROTOCOL"][len("HTTP/") :].decode("utf-8"), + "method": environ["REQUEST_METHOD"].decode("utf-8"), + "scheme": "http", + "path": environ["PATH_INFO"].decode("utf-8"), + "raw_path": environ["REQUEST_URI"], + "query_string": environ["QUERY_STRING"], + "root_path": environ["SCRIPT_NAME"].decode("utf-8"), + "headers": headers, + # some references to REMOTE_PORT but not always in environ + "client": (environ["REMOTE_ADDR"].decode("utf-8"), REMOTE_PORT), + "server": (environ["SERVER_NAME"].decode("utf-8"), SERVER_PORT), + "extensions": {"environ": environ}, + } + + if environ.get("HTTPS") in (b"on",): + scope["scheme"] = "https" + + if wsgi_req.http_sec_websocket_key != ffi.NULL: + scope["type"] = "websocket" + if scope["scheme"] == "https": + scope["scheme"] = "wss" + else: + scope["scheme"] = "ws" + + return scope + + +def websocket_recv_nb(wsgi_req): + """ + uwsgi.websocket_recv_nb() + """ + ub = lib.uwsgi_websocket_recv_nb(wsgi_req) + if ub == ffi.NULL: + raise IOError("unable to receive websocket message") + ret = ffi.buffer(ub.buf, ub.pos)[:] + lib.uwsgi_buffer_destroy(ub) + return ret + + +def websocket_handler(wsgi_req, _send, _ready): + """ + Return (send, receive) for ASGI websocket. + + wsgi_req: the request + _send: await _send(event) + _ready: await next message + """ + + closed = False + + async def receiver(): + nonlocal closed + + yield {"type": "websocket.connect"} + + msg = None + while True: + try: + print("rx, closed=", closed) + msg = websocket_recv_nb(wsgi_req) + except IOError: + closed = True + await _send({"type": "websocket.close"}) + yield { + "type": "websocket.disconnect", + "code": 1000, + } # todo lookup code + # don't raise, keep receivin' ? + continue # or return? + if msg: + # check wsgi_req->websocket_opcode for text / binary + value = {"type": "websocket.receive"} + # * %x0 denotes a continuation frame + # * %x1 denotes a text frame + # * %x2 denotes a binary frame + opcode = wsgi_req.websocket_opcode + if opcode == 1: + value["text"] = msg.decode("utf-8") + elif opcode == 2: + value["bytes"] = msg + else: + print("surprise opcode", opcode) + yield value + else: + print("no msg", wsgi_req.websocket_opcode) + await _ready() + + receive = receiver().__anext__ + + async def send(event): + nonlocal closed + print("ws send", event) + if closed: + print("ignore send on closed ws") + + elif event["type"] == "websocket.accept": + if ( + lib.uwsgi_websocket_handshake( + wsgi_req, ffi.NULL, 0, ffi.NULL, 0, ffi.NULL, 0 + ) + < 0 + ): + closed = True + await _send({"type": "websocket.close"}) + raise IOError("unable to send websocket handshake") + + elif event["type"] == "websocket.send": + # ok to call during any part of app? + if "bytes" in event: + msg = event["bytes"] + websocket_send = lib.uwsgi_websocket_send_binary + else: + msg = event["text"].encode("utf-8") + websocket_send = lib.uwsgi_websocket_send + if websocket_send(wsgi_req, ffi.new("char[]", msg), len(msg)) < 0: + closed = True + await _send({"type": "websocket.close"}) + raise IOError("unable to send websocket message") + + elif event["type"] == "websocket.close": + print("asked to close in send") + closed = True + await _send(event) + + return (send, receive) diff --git a/plugins/cffi/cffi_asyncio.py b/plugins/cffi/cffi_asyncio.py new file mode 100644 index 0000000000..ebcf119fb0 --- /dev/null +++ b/plugins/cffi/cffi_asyncio.py @@ -0,0 +1,544 @@ +""" +Direct port of plugins/asyncio.c +""" + +import os +import sys +import asyncio +import inspect +import greenlet + +from cffi_asgi import asgi_scope_http, asgi_start_response, websocket_handler +from _uwsgi import ffi, lib, _applications, WSGIfilewrapper, WSGIinput + +uwsgi = lib.uwsgi + +wsgi_apps = _applications + + +def get_loop(): + return asyncio.get_event_loop() + + +# testing +def request_id(): + return ( + lib.uwsgi.workers[lib.uwsgi.mywid].cores[lib.uwsgi.wsgi_req.async_id].requests + ) + + +# keep alive +_ASYNCIO = ffi.new("char[]", "asyncio".encode("utf-8")) + +timeout_handles = {} + + +def store_ob_timeout(wsgi_req, ob_timeout): + timeout_handles[wsgi_req.async_id] = ob_timeout + + +def get_ob_timeout(wsgi_req): + return timeout_handles.pop(wsgi_req.async_id) + + +def print_exc(): + import traceback + + traceback.print_exc() + + +def setup_asyncio(threads): + setattr(uwsgi, "async", threads) # keyword + if uwsgi.socket_timeout < 30: + uwsgi.socket_timeout = 30 + + uwsgi.loop = _ASYNCIO + + +def free_req_queue(wsgi_req): + """ + A #define that uses wsgi_req from context. + #define free_req_queue uwsgi.async_queue_unused_ptr++; uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = wsgi_req + """ + lib.uwsgi.async_queue_unused_ptr = lib.uwsgi.async_queue_unused_ptr + 1 + lib.uwsgi.async_queue_unused[lib.uwsgi.async_queue_unused_ptr] = wsgi_req + + +@ffi.def_extern() +def uwsgi_asyncio_wait_read_hook(fd, timeout): + print("enter read hook") + wsgi_req = lib.uwsgi.current_wsgi_req() + loop = get_loop() + loop.add_reader(fd, uwsgi_asyncio_hook_fd, wsgi_req) + try: + ob_timeout = loop.call_later(timeout, hook_timeout, wsgi_req) + try: + + # to loop + if uwsgi.schedule_to_main: + print("wait_read_hook", fd) + uwsgi.schedule_to_main(wsgi_req) + else: + print("no read hook", uwsgi.schedule_to_mani) + # from loop + finally: + ob_timeout.cancel() + + if wsgi_req.async_timed_out: + return 0 + + return 1 + + except: + print_exc() + return -1 + + finally: + # returns False if not added + loop.remove_reader(fd) + + +@ffi.def_extern() +def uwsgi_asyncio_wait_write_hook(fd, timeout): + print("enter write hook") + wsgi_req = uwsgi.current_wsgi_req() + loop = get_loop() + loop.add_writer(fd, uwsgi_asyncio_hook_fd, wsgi_req) + try: + ob_timeout = loop.call_later(timeout, hook_timeout, wsgi_req) + try: + # to loop + if uwsgi.schedule_to_main: + print("wait_write_hook", fd) + uwsgi.schedule_to_main(wsgi_req) + # from loop + + finally: + ob_timeout.cancel() + + if wsgi_req.async_timed_out: + return 0 + + return 1 + + except: + print_exc() + return -1 + + finally: + loop.remove_writer(fd) + + +def uwsgi_asyncio_request(wsgi_req, timed_out): + try: + loop = get_loop() + uwsgi.wsgi_req = wsgi_req + + ob_timeout = get_ob_timeout(wsgi_req) + ob_timeout.cancel() + + if timed_out > 0: + loop.remove_reader(wsgi_req.fd) + raise IOError("timed out") # goto end + + status = wsgi_req.socket.proto(wsgi_req) + + if status > 0: + ob_timeout = loop.call_later( + uwsgi.socket_timeout, uwsgi_asyncio_request, wsgi_req, True + ) + store_ob_timeout(wsgi_req, ob_timeout) + # goto again + print("again a", request_id()) + return None + + else: + loop.remove_reader(wsgi_req.fd) + + if status == 0: + # we call this two time... overengineering :( + uwsgi.async_proto_fd_table[wsgi_req.fd] = ffi.NULL + uwsgi.schedule_to_req() + # goto again + # print("again b", request_id(), wsgi_req.async_id) + return None + + except IOError: + loop.remove_reader(wsgi_req.fd) + print_exc() + + except: + print("other exception") + print_exc() + + # end + uwsgi.async_proto_fd_table[wsgi_req.fd] = ffi.NULL + lib.uwsgi_close_request(uwsgi.wsgi_req) + free_req_queue(wsgi_req) + + +def uwsgi_asyncio_accept(uwsgi_sock): + uwsgi = lib.uwsgi + wsgi_req = lib.find_first_available_wsgi_req() + if wsgi_req == ffi.NULL: + lib.uwsgi_async_queue_is_full(lib.uwsgi_now()) + return None + + uwsgi.wsgi_req = wsgi_req + lib.wsgi_req_setup(wsgi_req, wsgi_req.async_id, uwsgi_sock) + uwsgi.workers[uwsgi.mywid].cores[wsgi_req.async_id].in_request = 1 + + if lib.wsgi_req_simple_accept(wsgi_req, uwsgi_sock.fd): + uwsgi.workers[uwsgi.mywid].cores[wsgi_req.async_id].in_request = 0 + free_req_queue(wsgi_req) + return None + + wsgi_req.start_of_request = lib.uwsgi_micros() + wsgi_req.start_of_request_in_sec = wsgi_req.start_of_request // 1000000 + + # enter harakiri mode + if uwsgi.harakiri_options.workers > 0: + lib.set_harakiri(wsgi_req, uwsgi.harakiri_options.workers) + + uwsgi.async_proto_fd_table[wsgi_req.fd] = wsgi_req + + loop = get_loop() + + # add callback for protocol + try: + loop.add_reader(wsgi_req.fd, uwsgi_asyncio_request, wsgi_req, False) + ob_timeout = loop.call_later( + uwsgi.socket_timeout, uwsgi_asyncio_request, wsgi_req, True + ) + store_ob_timeout(wsgi_req, ob_timeout) + + except: + loop.remove_reader(wsgi_req.fd) + free_req_queue(wsgi_req) + print_exc() + raise + + +def uwsgi_asyncio_hook_fd(wsgi_req): + uwsgi = lib.uwsgi + uwsgi.wsgi_req = wsgi_req + uwsgi.schedule_to_req() + + +def uwsgi_asyncio_hook_timeout(wsgi_req): + uwsgi = lib.uwsgi + uwssgi.wsgi_req = wsgi_req + uwsgi.wsgi_req.async_timed_out = 1 + uwsgi.schedule_to_req() + + +def uwsgi_asyncio_hook_fix(wsgi_req): + uwsgi.wsgi_req = wsgi_req + uwsgi.schedule_to_req() + + +@ffi.def_extern() +def uwsgi_asyncio_schedule_fix(wsgi_req): + loop = get_loop() + loop.call_soon(uwsgi_asyncio_hook_fix, wsgi_req) + + +@ffi.def_extern() +def asyncio_loop(): + """ + Set up asyncio. + """ + + if not uwsgi.has_threads and uwsgi.mywid == 1: + print( + "!!! Running asyncio without threads IS NOT recommended, enable " + "them with --enable-threads !!!\n" + ) + + if uwsgi.socket_timeout < 30: + print( + "!!! Running asyncio with a socket-timeout lower than 30 seconds " + "is not recommended, tune it with --socket-timeout !!!\n" + ) + + if not uwsgi.async_waiting_fd_table: + uwsgi.async_waiting_fd_table = lib.uwsgi_calloc( + ffi.sizeof("struct wsgi_request *") * uwsgi.max_fd + ) + if not uwsgi.async_proto_fd_table: + uwsgi.async_proto_fd_table = lib.uwsgi_calloc( + ffi.sizeof("struct wsgi_request *") * uwsgi.max_fd + ) + + uwsgi.wait_write_hook = lib.uwsgi_asyncio_wait_write_hook + uwsgi.wait_read_hook = lib.uwsgi_asyncio_wait_read_hook + + assert lib.uwsgi.wait_write_hook == lib.uwsgi_asyncio_wait_write_hook + + if getattr(uwsgi, "async") < 1: # keyword + print("the async loop engine requires async mode (--async )\n") + raise SystemExit(1) + + if not uwsgi.schedule_to_main: + print( + "*** DANGER *** async mode without coroutine/greenthread engine loaded !!!\n" + ) + + # call uwsgi_cffi_setup_greenlets() first: + if not uwsgi.schedule_to_req: + uwsgi.schedule_to_req = lib.async_schedule_to_req + else: + uwsgi.schedule_fix = lib.uwsgi_asyncio_schedule_fix + + loop = get_loop() + uwsgi_sock = uwsgi.sockets + while uwsgi_sock != ffi.NULL: + loop.add_reader(uwsgi_sock.fd, uwsgi_asyncio_accept, uwsgi_sock) + uwsgi_sock = uwsgi_sock.next + + loop.run_forever() + + +def handle_asgi_request(wsgi_req, app): + scope = asgi_scope_http(wsgi_req) + loop = asyncio.get_event_loop() + channel = asyncio.Queue(1) + gc = greenlet.getcurrent() + + async def _send(event): + await channel.put(event) + + async def send_task(): + while True: + event = await channel.get() + gc.switch(event) + + if scope["type"] == "websocket": + ws_event = asyncio.Event() + loop.add_reader(wsgi_req.fd, ws_event.set) + + async def _ready(): + await ws_event.wait() + ws_event.clear() + + send, receive = websocket_handler(wsgi_req, _send, _ready) + + elif scope["type"] == "http": + + send = _send + + async def receive(): + return {"type": "http.request"} + + send_task = loop.create_task(send_task()) + app_task = loop.create_task(app(scope, receive, send)) + + try: + while True: + event = gc.parent.switch() + if event["type"] == "http.response.start": + # raw uwsgi function accepts bytes + asgi_start_response(wsgi_req, event["status"], event["headers"]) + elif event["type"] == "http.response.body": + data = event["body"] + lib.uwsgi_response_write_body_do( + wsgi_req, ffi.new("char[]", data), len(data) + ) + if not event.get("more_body"): + break + elif event["type"] == "websocket.close": + ws_event.set() # unclear if this helps receive() to see the close + break + else: + print("loop event", event) + + send_task.cancel() + finally: + loop.remove_reader(wsgi_req.fd) + + return lib.UWSGI_OK + + +def to_network(native): + return native.encode("latin1") + + +def iscoroutine(app): + """ + Could app be ASGI? + """ + return inspect.iscoroutinefunction(app) or inspect.iscoroutinefunction(app.__call__) + + +ASGI_CALLABLE = ffi.cast("void *", 2) + + +@ffi.def_extern() +def uwsgi_cffi_request(wsgi_req): + try: + return _uwsgi_cffi_request(wsgi_req) + except: + print_exc() + return lib.UWSGI_OK + + +def _uwsgi_cffi_request(wsgi_req): + """ + the WSGI request handler + """ + + # if (wsgi_req->async_force_again) { + # wi = &uwsgi_apps[wsgi_req->app_id]; + # wsgi_req->async_force_again = 0; + # UWSGI_GET_GIL + # // get rid of timeout + # if (wsgi_req->async_timed_out) { + # PyDict_SetItemString(wsgi_req->async_environ, "x-wsgiorg.fdevent.timeout", Py_True); + # wsgi_req->async_timed_out = 0; + # } + # else { + # PyDict_SetItemString(wsgi_req->async_environ, "x-wsgiorg.fdevent.timeout", Py_None); + # } + + # if (wsgi_req->async_ready_fd) { + # PyDict_SetItemString(wsgi_req->async_environ, "uwsgi.ready_fd", PyInt_FromLong(wsgi_req->async_last_ready_fd)); + # wsgi_req->async_ready_fd = 0; + # } + # else { + # PyDict_SetItemString(wsgi_req->async_environ, "uwsgi.ready_fd", Py_None); + # } + # int ret = manage_python_response(wsgi_req); + # if (ret == UWSGI_OK) goto end; + # UWSGI_RELEASE_GIL + # if (ret == UWSGI_AGAIN) { + # wsgi_req->async_force_again = 1; + # } + # return ret; + # } + + if wsgi_req.async_force_again: + wsgi_req.async_force_again = 0 + # just close it + try: + ob_timeout = get_ob_timeout(wsgi_req) + ob_timeout.cancel() + except KeyError: + pass + asyncio.get_event_loop().remove_reader(wsgi_req.fd) + return lib.UWSGI_OK + + def writer(data): + lib.uwsgi_response_write_body_do(wsgi_req, ffi.new("char[]", data), len(data)) + + def start_response(status, headers, exc_info=None): + if exc_info: + traceback.print_exception(*exc_info) + status = to_network(status) + lib.uwsgi_response_prepare_headers( + wsgi_req, ffi.new("char[]", status), len(status) + ) + for hh in headers: + header, value = to_network(hh[0]), to_network(hh[1]) + lib.uwsgi_response_add_header( + wsgi_req, + ffi.new("char[]", header), + len(hh[0]), + ffi.new("char[]", value), + len(hh[1]), + ) + return writer + + if lib.uwsgi_parse_vars(wsgi_req): + return -1 + + # check dynamic + # check app_id + app_id = lib.uwsgi_get_app_id( + wsgi_req, wsgi_req.appid, wsgi_req.appid_len, lib.cffi_plugin.modifier1 + ) + if app_id == -1 and not lib.uwsgi.no_default_app and lib.uwsgi.default_app > -1: + # and default app modifier1 == our modifier1 + app_id = lib.uwsgi.default_app + wsgi_req.app_id = app_id + + app_mount = "" + # app_mount can be something while app_id is -1 + if wsgi_req.appid != ffi.NULL: + app_mount = ffi.string(wsgi_req.appid).decode("utf-8") + + # uwsgi app struct + wi = lib.uwsgi.workers[lib.uwsgi.mywid].apps[app_id] + wi.requests += 1 # we might wind up here more often than expected + app = wsgi_apps.get(app_id) + + # (see python wsgi_handlers.c) + + if wi.callable == ASGI_CALLABLE: + handle_asgi_request(wsgi_req, app) + return lib.UWSGI_OK + + environ = {} + iov = wsgi_req.hvec + for i in range(0, wsgi_req.var_cnt, 2): + environ[ + ffi.string(ffi.cast("char*", iov[i].iov_base), iov[i].iov_len).decode( + "latin1" + ) + ] = ffi.string( + ffi.cast("char*", iov[i + 1].iov_base), iov[i + 1].iov_len + ).decode( + "latin1" + ) + + # check bytes on environ... + environ["wsgi.version"] = (1, 0) + scheme = "http" + if "HTTPS" in environ: + if environ["HTTPS"] in ("on", "ON", "On", "1", "true", "TRUE", "True"): + scheme = "https" + environ["wsgi.url_scheme"] = environ.get("UWSGI_SCHEME", scheme) + environ["wsgi.input"] = WSGIinput(wsgi_req) + environ["wsgi.errors"] = sys.stderr + environ["wsgi.run_once"] = False + environ["wsgi.file_wrapper"] = lambda f, chunksize=0: WSGIfilewrapper( + wsgi_req, f, chunksize + ) + environ["wsgi.multithread"] = True + environ["wsgi.multiprocess"] = True + + environ["uwsgi.core"] = wsgi_req.async_id + environ["uwsgi.node"] = ffi.string(lib.uwsgi.hostname).decode("latin1") + + try: + response = app(environ, start_response) + except: + print_exc() + # can I get a 500? + # will also get here when a websocket closes + wsgi_req.async_force_again = 1 + return lib.UWSGI_AGAIN + + if type(response) is bytes: + writer(response) + else: + try: + if isinstance(response, WSGIfilewrapper): + response.sendfile() + else: + for chunk in response: + if isinstance(chunk, WSGIfilewrapper): + try: + chunk.sendfile() + finally: + chunk.close() + else: + writer(chunk) + finally: + if hasattr(response, "close"): + response.close() + + return lib.UWSGI_OK + + +def async_init(): + lib.uwsgi_register_loop(_ASYNCIO, lib.asyncio_loop) diff --git a/plugins/cffi/cffi_greenlets.py b/plugins/cffi/cffi_greenlets.py new file mode 100644 index 0000000000..35ebc5fdde --- /dev/null +++ b/plugins/cffi/cffi_greenlets.py @@ -0,0 +1,141 @@ +""" +Greenlets and continulets support; pick one. +""" + +from _uwsgi import ffi, lib + +# this is the dictionary of continulets (one per-core) +uwsgi_pypy_continulets = {} + + +def uwsgi_pypy_continulet_wrapper(cont): + lib.async_schedule_to_req_green() + + +@ffi.def_extern() +def uwsgi_pypy_continulet_schedule(): + id = lib.uwsgi.wsgi_req.async_id + modifier1 = lib.uwsgi.wsgi_req.uh.modifier1 + + # generate a new continulet + if not lib.uwsgi.wsgi_req.suspended: + from _continuation import continulet + + uwsgi_pypy_continulets[id] = continulet(uwsgi_pypy_continulet_wrapper) + lib.uwsgi.wsgi_req.suspended = 1 + + # this is called in the main stack + if lib.uwsgi.p[modifier1].suspend: + lib.uwsgi.p[modifier1].suspend(ffi.NULL) + + # let's switch + uwsgi_pypy_continulets[id].switch() + + # back to the main stack + if lib.uwsgi.p[modifier1].resume: + lib.uwsgi.p[modifier1].resume(ffi.NULL) + + +@ffi.def_extern() +def uwsgi_pypy_continulet_switch(wsgi_req): + id = wsgi_req.async_id + modifier1 = wsgi_req.uh.modifier1 + + # this is called in the current continulet + if lib.uwsgi.p[modifier1].suspend: + lib.uwsgi.p[modifier1].suspend(wsgi_req) + + uwsgi_pypy_continulets[id].switch() + + # back to the continulet + if lib.uwsgi.p[modifier1].resume: + lib.uwsgi.p[modifier1].resume(wsgi_req) + + # update current running continulet + lib.uwsgi.wsgi_req = wsgi_req + + +def uwsgi_pypy_setup_continulets(): + if getattr(lib.uwsgi, "async") < 1: # keyword + raise Exception("pypy continulets require async mode !!!") + lib.uwsgi.schedule_to_main = lib.uwsgi_pypy_continulet_switch + lib.uwsgi.schedule_to_req = lib.uwsgi_pypy_continulet_schedule + print("*** PyPy Continulets engine loaded ***") + + +# Greenlets support +# May prefer to continulets. +# Built in to pypy (built on top of continulets) + +uwsgi_pypy_greenlets = {} + + +def uwsgi_pypy_greenlet_wrapper(): + # async_schedule_to_req_green() calls the plugin's request() method in a loop + lib.async_schedule_to_req_green() + + +@ffi.def_extern() +def uwsgi_pypy_greenlet_current_wsgi_req(): + try: + return greenlet.getcurrent().uwsgi_wsgi_req + except AttributeError: + lib.uwsgi_log(b"[BUG] current_wsgi_req NOT FOUND!!!\n") + return lib.uwsgi.wsgi_req # called early once? + return ffi.NULL + + +@ffi.def_extern() +def uwsgi_pypy_greenlet_schedule_to_req(): + id = lib.uwsgi.wsgi_req.async_id + modifier1 = lib.uwsgi.wsgi_req.uh.modifier1 + + # generate a new greenlet + if not lib.uwsgi.wsgi_req.suspended: + uwsgi_pypy_greenlets[id] = greenlet.greenlet(uwsgi_pypy_greenlet_wrapper) + uwsgi_pypy_greenlets[id].uwsgi_wsgi_req = lib.uwsgi.wsgi_req + lib.uwsgi.wsgi_req.suspended = 1 + + # this is called in the main stack + if lib.uwsgi.p[modifier1].suspend: + lib.uwsgi.p[modifier1].suspend(ffi.NULL) + + # let's switch + uwsgi_pypy_greenlets[id].switch() + + # back to the main stack + if lib.uwsgi.p[modifier1].resume: + lib.uwsgi.p[modifier1].resume(ffi.NULL) + + +@ffi.def_extern() +def uwsgi_pypy_greenlet_schedule_to_main(wsgi_req): + wsgi_req.async_id + modifier1 = wsgi_req.uh.modifier1 + + # this is called in the current greenlet + # our cffi plugin doesn't define suspend / resume + if lib.uwsgi.p[modifier1].suspend: + lib.uwsgi.p[modifier1].suspend(wsgi_req) + + uwsgi_pypy_main_greenlet.switch() + + # back to the greenlet + if lib.uwsgi.p[modifier1].resume: + lib.uwsgi.p[modifier1].resume(wsgi_req) + + # update current running greenlet + lib.uwsgi.wsgi_req = wsgi_req + + +def uwsgi_cffi_setup_greenlets(): + global greenlet, uwsgi_pypy_main_greenlet + import greenlet + + if getattr(lib.uwsgi, "async") < 1: + raise Exception("pypy greenlets require async mode !!!") + + lib.uwsgi.schedule_to_main = lib.uwsgi_pypy_greenlet_schedule_to_main + lib.uwsgi.schedule_to_req = lib.uwsgi_pypy_greenlet_schedule_to_req + lib.uwsgi.current_wsgi_req = lib.uwsgi_pypy_greenlet_current_wsgi_req + uwsgi_pypy_main_greenlet = greenlet.getcurrent() diff --git a/plugins/cffi/cffi_init.py b/plugins/cffi/cffi_init.py new file mode 100644 index 0000000000..c6c0e65d97 --- /dev/null +++ b/plugins/cffi/cffi_init.py @@ -0,0 +1,506 @@ +""" +cffi embedding API based Python plugin. +Should work with both CPython and PyPy 3. + + +Based on PyPy plugin by Maciej Fijalkowski. +""" + +import os +import sys +import site + +# add expected __main__ module +sys.modules["__main__"] = type(sys)("__main__") + +from cffi_plugin import ffi, lib + +# set desired virtualenv (may only work on Python 3?) +if lib.ucffi.home != ffi.NULL: + sys.path = [entry for entry in sys.path if not "site-packages" in entry] + sys.executable = os.path.join( + ffi.string(lib.ucffi.home).decode("utf-8"), "bin", "python" + ) + site.main() + +import imp +import importlib +import inspect +import types + + +class UwsgiModule(types.ModuleType): + pass + + +_uwsgi = UwsgiModule("_uwsgi") +_uwsgi.lib = lib +_uwsgi.ffi = ffi +_uwsgi._applications = {} +# predictable name +sys.modules["_uwsgi"] = _uwsgi + +wsgi_apps = _uwsgi._applications + + +def print_exc(): + import traceback + + traceback.print_exc() + + +def to_network(native): + return native.encode("latin1") + + +@ffi.def_extern() +def uwsgi_cffi_init(): + global wsgi_apps + + # pypy will find environment from current working directory + # (uwsgi --chdir $VIRTUAL_ENV/bin) + if "PYTHONPATH" in os.environ: + sys.path[0:0] = os.environ["PYTHONPATH"].split(os.pathsep) + + # define or override callbacks? + if lib.ucffi.init: + init_name = ffi.string(lib.ucffi.init).decode("utf-8") + importlib.import_module(init_name) + + return lib.UWSGI_OK + + +@ffi.def_extern() +def uwsgi_cffi_init_apps(): + """ + (The --mount= syntax is more general.) + """ + try: + if lib.ucffi.wsgi: + init_app(ffi.string(lib.ucffi.wsgi), b"") + except: + print_exc() + + +class WSGIfilewrapper(object): + """ + class implementing wsgi.file_wrapper + """ + + def __init__(self, wsgi_req, f, chunksize=0): + self.wsgi_req = wsgi_req + self.f = f + self.chunksize = chunksize + if hasattr(f, "close"): + self.close = f.close + + def __iter__(self): + return self + + def __next__(self): + if self.chunksize: + data = self.f.read(self.chunksize) + else: + data = self.f.read() + if data: + return data + raise StopIteration() + + next = __next__ + + def sendfile(self): + if hasattr(self.f, "fileno"): + lib.uwsgi_response_sendfile_do_can_close( + self.wsgi_req, self.f.fileno(), 0, 0, 0 + ) + elif hasattr(self.f, "read"): + if self.chunksize == 0: + chunk = self.f.read() + if len(chunk) > 0: + lib.uwsgi_response_write_body_do( + self.wsgi_req, ffi.new("char[]", chunk), len(chunk) + ) + return + while True: + chunk = self.f.read(self.chunksize) + if chunk is None or len(chunk) == 0: + break + lib.uwsgi_response_write_body_do( + self.wsgi_req, ffi.new("char[]", chunk), len(chunk) + ) + + +_uwsgi.WSGIfilewrapper = WSGIfilewrapper + + +class WSGIinput(object): + """ + class implementing wsgi.input + """ + + def __init__(self, wsgi_req): + self.wsgi_req = wsgi_req + + def read(self, size=0): + rlen = ffi.new("ssize_t*") + chunk = lib.uwsgi_request_body_read(self.wsgi_req, size, rlen) + if chunk != ffi.NULL: + return ffi.buffer(chunk, rlen[0])[:] + if rlen[0] < 0: + raise IOError("error reading wsgi.input") + raise IOError("error waiting for wsgi.input") + + def getline(self, hint=0): + rlen = ffi.new("ssize_t*") + chunk = lib.uwsgi_request_body_readline(self.wsgi_req, hint, rlen) + if chunk != ffi.NULL: + return ffi.buffer(chunk, rlen[0])[:] + if rlen[0] < 0: + raise IOError("error reading line from wsgi.input") + raise IOError("error waiting for line on wsgi.input") + + def readline(self, hint=0): + return self.getline(hint) + + def readlines(self, hint=0): + lines = [] + while True: + chunk = self.getline(hint) + if len(chunk) == 0: + break + lines.append(chunk) + return lines + + def __iter__(self): + return self + + def __next__(self): + chunk = self.getline() + if len(chunk) == 0: + raise StopIteration + return chunk + + +_uwsgi.WSGIinput = WSGIinput + + +@ffi.def_extern() +def uwsgi_cffi_request(wsgi_req): + """ + the WSGI request handler + """ + + if wsgi_req.async_force_again: + wsgi_req.async_force_again = 0 + # just close it + # it would be possible to continue with the response iterator here + return lib.UWSGI_OK + + def writer(data): + lib.uwsgi_response_write_body_do(wsgi_req, ffi.new("char[]", data), len(data)) + + def start_response(status, headers, exc_info=None): + if exc_info: + traceback.print_exception(*exc_info) + status = to_network(status) + lib.uwsgi_response_prepare_headers( + wsgi_req, ffi.new("char[]", status), len(status) + ) + for hh in headers: + header, value = to_network(hh[0]), to_network(hh[1]) + lib.uwsgi_response_add_header( + wsgi_req, + ffi.new("char[]", header), + len(hh[0]), + ffi.new("char[]", value), + len(hh[1]), + ) + return writer + + if lib.uwsgi_parse_vars(wsgi_req): + return -1 + + # check dynamic + # check app_id + app_id = lib.uwsgi_get_app_id( + wsgi_req, wsgi_req.appid, wsgi_req.appid_len, lib.cffi_plugin.modifier1 + ) + if app_id == -1 and not lib.uwsgi.no_default_app and lib.uwsgi.default_app > -1: + # and default app modifier1 == our modifier1 + app_id = lib.uwsgi.default_app + wsgi_req.app_id = app_id + app_mount = "" + # app_mount can be something while app_id is -1 + if wsgi_req.appid != ffi.NULL: + app_mount = ffi.string(wsgi_req.appid).decode("utf-8") + # uwsgi app struct + wi = lib.uwsgi.workers[lib.uwsgi.mywid].apps[app_id] + wi.requests += 1 # we might wind up here more often than expected + app = wsgi_apps.get(app_id) + + # (see python wsgi_handlers.c) + + environ = {} + iov = wsgi_req.hvec + for i in range(0, wsgi_req.var_cnt, 2): + environ[ + ffi.string(ffi.cast("char*", iov[i].iov_base), iov[i].iov_len).decode( + "latin1" + ) + ] = ffi.string( + ffi.cast("char*", iov[i + 1].iov_base), iov[i + 1].iov_len + ).decode( + "latin1" + ) + + # check bytes on environ... + environ["wsgi.version"] = (1, 0) + scheme = "http" + if "HTTPS" in environ: + if environ["HTTPS"] in ("on", "ON", "On", "1", "true", "TRUE", "True"): + scheme = "https" + environ["wsgi.url_scheme"] = environ.get("UWSGI_SCHEME", scheme) + environ["wsgi.input"] = WSGIinput(wsgi_req) + environ["wsgi.errors"] = sys.stderr + environ["wsgi.run_once"] = False + environ["wsgi.file_wrapper"] = lambda f, chunksize=0: WSGIfilewrapper( + wsgi_req, f, chunksize + ) + environ["wsgi.multithread"] = True + environ["wsgi.multiprocess"] = True + + environ["uwsgi.core"] = wsgi_req.async_id + environ["uwsgi.node"] = ffi.string(lib.uwsgi.hostname).decode("latin1") + + try: + response = app(environ, start_response) + except: + print_exc() + wsgi_req.async_force_again = 1 + return lib.UWSGI_AGAIN + + if type(response) is bytes: + writer(response) + else: + try: + if isinstance(response, WSGIfilewrapper): + response.sendfile() + else: + for chunk in response: + if isinstance(chunk, WSGIfilewrapper): + try: + chunk.sendfile() + finally: + chunk.close() + else: + writer(chunk) + finally: + if hasattr(response, "close"): + response.close() + + return lib.UWSGI_OK + + +@ffi.def_extern() +def uwsgi_cffi_after_request(wsgi_req): + lib.log_request(wsgi_req) + + +def uwsgi_foreach(usl): + # define uwsgi_foreach(x, y) for(x=y;x;x = x->next) + while usl != ffi.NULL: + yield usl + usl = usl.next + + +def execfile(path): + with open(path) as py: + code = compile(py.read(), path, "exec") + exec(code, globals(), {}) + + +def eval_exec(to_eval, to_exec): + for usl in uwsgi_foreach(to_eval): + code = compile(ffi.string(usl.value), "", "exec") + exec(code, globals(), {}) + + for usl in uwsgi_foreach(to_exec): + execfile(ffi.string(usl.value)) + + +@ffi.def_extern() +def uwsgi_cffi_preinit_apps(): + eval_exec(lib.ucffi.eval, lib.ucffi.exec) + + +@ffi.def_extern() +def uwsgi_cffi_post_fork(): + """ + .post_fork_hook + """ + eval_exec(lib.ucffi.eval_post_fork, lib.ucffi.exec_post_fork) + + import uwsgi + + if hasattr(uwsgi, "post_fork_hook"): + uwsgi.post_fork_hook() + + +def uwsgi_apps_cnt(): + # we init in worker 0 and then serve in worker n + return lib.uwsgi.workers[lib.uwsgi.mywid].apps_cnt + + +def uwsgi_apps(): + return lib.uwsgi.workers[lib.uwsgi.mywid].apps + + +def iscoroutine(app): + """ + Could app be ASGI? + """ + return inspect.iscoroutinefunction(app) or inspect.iscoroutinefunction( + getattr(app, "__call__") + ) + + +def init_app(app, mountpoint): + + id = uwsgi_apps_cnt() + + if id >= lib.uwsgi.max_apps: + lib.uwsgi_log( + b"ERROR: you cannot load more than %d apps in a worker\n" % uwsgi.max_apps + ) + return -1 + + if len(mountpoint) > 0xFF - 1: + # original uses prefix for very long mountpoints + lib.uwsgi_log(b"ERROR: mountpoint must be shorter than %d bytes\n" % 0xFF - 1) + + if lib.uwsgi_get_app_id(ffi.NULL, mountpoint, len(mountpoint), -1) != -1: + lib.uwsgi_log(b"mountpoint %s already configured. skip.\n" % mountpoint) + return -1 + + now = lib.uwsgi_now() + + if lib.uwsgi.default_app == -1 and not lib.uwsgi.no_default_app: + lib.uwsgi.default_app = id + + wi = uwsgi_apps()[id] # zero out wi? + + wi.modifier1 = lib.cffi_plugin.modifier1 + wi.mountpoint_len = len(mountpoint) + ffi.memmove(wi.mountpoint, mountpoint, len(mountpoint)) + + # original does dynamic chdir + # cffi is always in "single interpreter" mode + application = app.decode("utf-8") + + if application.endswith((".wsgi", ".py")): + # application.py / application.wsgi + wsgi_apps[id] = uwsgi_file_loader(application) + else: + # importable:callable syntax + wsgi_apps[id] = uwsgi_pypy_loader(application) + + # callable has to be not NULL for uwsgi_get_app_id: + app_type = "WSGI" + wi.callable = ffi.cast("void *", 1) + if iscoroutine(wsgi_apps[id]): + app_type = "ASGI" + wi.callable = ffi.cast("void *", 2) + wi.started_at = now + wi.startup_time = lib.uwsgi_now() - now + + lib.uwsgi_log( + ( + "%s app %d (mountpoint='%s') ready in %d seconds\n" + % (app_type, id, mountpoint.decode("utf-8"), wi.startup_time) + ).encode("utf-8") + ) + + # log if error + lib.uwsgi_cffi_more_apps() + + # TODO if uwsgi_apps[id] is a dict, deal with multiple applications... + + # copies wi to other workers if wid = 0 + lib.uwsgi_emulate_cow_for_apps(id) + + return id + + +@ffi.def_extern() +def uwsgi_cffi_mount_app(mountpoint, app): + """ + Handle the versatile --mount = flag + """ + try: + app_id = init_app(ffi.string(app), ffi.string(mountpoint)) + return app_id + except: + print_exc() + return -1 + + +@ffi.def_extern() +def uwsgi_cffi_enable_threads(): + pass + + +@ffi.def_extern() +def uwsgi_cffi_init_thread(): + pass + + +@ffi.def_extern() +def uwsgi_cffi_signal_handler(sig, handler): + ffi.from_handle(handler)(sig) + return 0 + + +@ffi.def_extern() +def uwsgi_cffi_mule(opt): + """ + From the docs: + As mentioned before, mules can be programmed. + To give custom logic to a mule, give the mule + option a path to a script (it must end in ".py") + or a "package.module:callable" value. + """ + opt = ffi.string(opt).decode("latin1") + if opt.endswith(".py"): + execfile(opt) + else: + uwsgi_pypy_loader(opt)() + + +@ffi.def_extern() +def uwsgi_cffi_rpc(func, argc, argv, argvs, buffer): + return ffi.from_handle(func)(argc, argv, argvs, buffer) + + +# +# Non-callback section +# + + +def uwsgi_pypy_loader(m): + """ + load a wsgi module + """ + c = "application" + if ":" in m: + m, c = m.split(":") + mod = importlib.import_module(m) + return getattr(mod, c) + + +def uwsgi_file_loader(path): + """ + load a .wsgi or .py file from path + """ + c = "application" + mod = imp.load_source("uwsgi_file_wsgi", path) + return getattr(mod, c) diff --git a/plugins/cffi/cffi_plugin.py b/plugins/cffi/cffi_plugin.py new file mode 100644 index 0000000000..d4d2294c1b --- /dev/null +++ b/plugins/cffi/cffi_plugin.py @@ -0,0 +1,130 @@ +# based on the example and pypy plugins + +import cffi +import build_bundle +import io + +ffibuilder = cffi.FFI() + +# cdef() exposes uwsgi functions to Python +ffibuilder.cdef(open("types.h").read()) +ffibuilder.cdef(open("_constants.h").read()) +ffibuilder.cdef(open("_uwsgi.h").read()) + +plugin_data = """ +static struct uwsgi_cffi { + char *wsgi; + char *init; + char *home; + + struct uwsgi_string_list *eval; + struct uwsgi_string_list *eval_post_fork; + struct uwsgi_string_list *exec; + struct uwsgi_string_list *exec_post_fork; +} ucffi; +""" + +# defined by our plugin +ffibuilder.cdef( + plugin_data + + """ +void uwsgi_cffi_more_apps(); + +// libc functions +ssize_t read (int, void *, size_t); +ssize_t write (int, const void *, size_t); +void free(void *); +""" + # For cffi_asyncio. + # Bound to Python code with @ffi.def_extern(). + # Also OK to leave unbound & uncalled. + + """ +extern "Python" static void uwsgi_pypy_continulet_schedule(void); +extern "Python" static void uwsgi_pypy_continulet_switch(struct wsgi_request *); +extern "Python" static void uwsgi_pypy_greenlet_schedule_to_req(void); +extern "Python" static void uwsgi_pypy_greenlet_schedule_to_main(struct wsgi_request *); +extern "Python" static struct wsgi_request * uwsgi_pypy_greenlet_current_wsgi_req(); +extern "Python" static int uwsgi_asyncio_wait_read_hook(int fd, int timeout); +extern "Python" static int uwsgi_asyncio_wait_write_hook(int fd, int timeout); +extern "Python" static void uwsgi_asyncio_schedule_fix(struct wsgi_request *); +extern "Python" static void asyncio_loop(void); +""" +) + +# embedding_api() exposes Python functions to uwsgi. +# Similar to extern "Python", but referenced by our own C set_source() +# as well as our Python code: +exposed_to_uwsgi = """ +extern struct uwsgi_server uwsgi; +extern struct uwsgi_plugin cffi_plugin; + +static int uwsgi_cffi_init(); +static void uwsgi_cffi_preinit_apps(); +static void uwsgi_cffi_init_apps(); +static int uwsgi_cffi_request(struct wsgi_request *wsgi_req); +static void uwsgi_cffi_after_request(struct wsgi_request *wsgi_req); +static void uwsgi_cffi_onload(); + +static uint64_t uwsgi_cffi_rpc(void *, uint8_t, char **, uint16_t *, char **); +static void uwsgi_cffi_post_fork(); +static void uwsgi_cffi_enable_threads(); +static void uwsgi_cffi_init_thread(); +static int uwsgi_cffi_mule(char *opt); +static int uwsgi_cffi_mule_msg(char *message, size_t len); +static int uwsgi_cffi_signal_handler(uint8_t sig, void *handler); + +static int uwsgi_cffi_mount_app(char *, char *); +""" +ffibuilder.embedding_api(exposed_to_uwsgi) + +init_code = io.StringIO() +build_bundle.bundler(init_code) +ffibuilder.embedding_init_code(init_code.getvalue()) + +ffibuilder.set_source( + "cffi_plugin", + """ +#include +""" + + plugin_data + + exposed_to_uwsgi + + """ + +extern void uwsgi_cffi_more_apps() { + uwsgi_apps_cnt++; +} + +static struct uwsgi_option uwsgi_cffi_options[] = { + {"cffi-home", required_argument, 0, "set PYTHONHOME/virtualenv", uwsgi_opt_set_str, &ucffi.home, 0}, + {"cffi-wsgi", required_argument, 0, "load a WSGI module (or use --mount instead)", uwsgi_opt_set_str, &ucffi.wsgi, 0}, + {"cffi-init", required_argument, 0, "load a module during init", uwsgi_opt_set_str, &ucffi.init, 0}, + {"cffi-eval", required_argument, 0, "evaluate Python code before fork()", uwsgi_opt_add_string_list, &ucffi.eval, 0}, + {"cffi-eval-post-fork", required_argument, 0, "evaluate Python code soon after fork()", uwsgi_opt_add_string_list, &ucffi.eval_post_fork, 0}, + {"cffi-exec", required_argument, 0, "execute Python code from file before fork()", uwsgi_opt_add_string_list, &ucffi.exec, 0}, + {"cffi-exec-post-fork", required_argument, 0, "execute Python code from file soon after fork()", uwsgi_opt_add_string_list, &ucffi.exec_post_fork, 0}, + {0, 0, 0, 0, 0, 0, 0}, +}; + +CFFI_DLLEXPORT struct uwsgi_plugin cffi_plugin = { + .name = "cffi", + .modifier1 = 0, + .init = uwsgi_cffi_init, + .request = uwsgi_cffi_request, + .after_request = uwsgi_cffi_after_request, + .options = uwsgi_cffi_options, + .preinit_apps = uwsgi_cffi_preinit_apps, + .init_apps = uwsgi_cffi_init_apps, + .init_thread = uwsgi_cffi_init_thread, + .signal_handler = uwsgi_cffi_signal_handler, + .mount_app = uwsgi_cffi_mount_app, + .enable_threads = uwsgi_cffi_enable_threads, + .rpc = uwsgi_cffi_rpc, + .post_fork = uwsgi_cffi_post_fork, + .mule = uwsgi_cffi_mule, + .mule_msg = uwsgi_cffi_mule_msg, +}; +""", +) + +if __name__ == "__main__": + ffibuilder.emit_c_code("cffi_plugin.c") diff --git a/plugins/cffi/cffi_setup_asyncio.py b/plugins/cffi/cffi_setup_asyncio.py new file mode 100644 index 0000000000..677b8077df --- /dev/null +++ b/plugins/cffi/cffi_setup_asyncio.py @@ -0,0 +1,6 @@ +import cffi_greenlets +import cffi_asyncio + +cffi_asyncio.setup_asyncio(32) # is this the same as the --async option? +cffi_greenlets.uwsgi_cffi_setup_greenlets() +cffi_asyncio.async_init() # should this happen in a postfork hook or at a different phase of config? diff --git a/plugins/cffi/cffi_setup_trio.py b/plugins/cffi/cffi_setup_trio.py new file mode 100644 index 0000000000..f308c676c6 --- /dev/null +++ b/plugins/cffi/cffi_setup_trio.py @@ -0,0 +1,7 @@ +import cffi_greenlets +import cffi_trio + +# the order of these steps is important +cffi_trio.setup_trio(32) # is this the same as the --async option? +cffi_greenlets.uwsgi_cffi_setup_greenlets() +cffi_trio.trio_init() diff --git a/plugins/cffi/cffi_trio.py b/plugins/cffi/cffi_trio.py new file mode 100644 index 0000000000..3a8d7b5525 --- /dev/null +++ b/plugins/cffi/cffi_trio.py @@ -0,0 +1,637 @@ +""" +Direct port of plugins/asyncio.c +""" + +import os +import sys +import trio +import inspect +import greenlet + +from cffi_asgi import asgi_scope_http, asgi_start_response, websocket_handler +from _uwsgi import ffi, lib, _applications, WSGIfilewrapper, WSGIinput + +uwsgi = lib.uwsgi +wsgi_apps = _applications + + +def get_loop(): + return asyncio.get_event_loop() + + +# testing +def request_id(): + return ( + lib.uwsgi.workers[lib.uwsgi.mywid].cores[lib.uwsgi.wsgi_req.async_id].requests + ) + + +# keep alive +_TRIO = ffi.new("char[]", "trio".encode("utf-8")) + + +# Timeouts. Check periodically with just one task. +# from the Python documentation +from heapq import heappush, heappop +import itertools + +pq = [] # list of entries arranged in a heap +entry_finder = {} # mapping of tasks to entries +REMOVED = "" # placeholder for a removed task +counter = itertools.count() # unique sequence count + + +def add_task(task, priority=0): + "Add a new task or update the priority of an existing task" + if task in entry_finder: + remove_task(task) + count = next(counter) + entry = [priority, count, task] + entry_finder[task] = entry + heappush(pq, entry) + + +def remove_task(task): + "Mark an existing task as REMOVED. Raise KeyError if not found." + entry = entry_finder.pop(task) + entry[-1] = REMOVED + + +def pop_task(): + "Remove and return the lowest priority task. Raise KeyError if empty." + while pq: + priority, count, task = heappop(pq) + if task is not REMOVED: + del entry_finder[task] + return task + raise KeyError("pop from an empty priority queue") + + +# there are two kinds of timeouts, a hook timeout which +# marks the request as timed out and a socket timeout which +# loops into the request handler with timed_out = True +class Timeout: + def __init__(self, wsgi_req, deadline): + self.wsgi_req = wsgi_req + # don't fire timeout if wsgi_req was recycled + self.request_id = (wsgi_req.async_id, request_id()) + self.deadline = deadline + + def fire(self, now): + if self.deadline > now: + add_task(self, priority=self.deadline) + else: + print("timeout!") + uwsgi_trio_request(wsgi_req, True) + + +class HookTimeout(Timeout): + def fire(self, now): + if self.deadline > now: + add_task(self, priority=self.deadline) + else: + print("hook timeout!") + uwsgi_asyncio_hook_timeout(wsgi_req) + + +async def timeout_task(): + """ + Check timeouts periodically + """ + while True: + await trio.sleep(1) + now = lib.uwsgi_now() + while True: + try: + task = pop_task() + except KeyError: + break + # will reschedule self if needed + task.fire(now) + + +timeout_handles = {} + + +def timeout_after(wsgi_req, seconds): + return + deadline = lib.uwsgi_now() + seconds + timeout = Timeout(wsgi_req, deadline) + timeout_handles[wsgi_req.async_id] = timeout + add_task(timeout, deadline) + + +def hook_timeout_after(wsgi_req, seconds): + deadline = lib.uwsgi_now() + seconds + timeout = HookTimeout(wsgi_req, deadline) + timeout_handles[wsgi_req.async_id] = timeout + add_task(timeout, deadline) + + +def timeout_cancel(wsgi_req): + return + handle = timeout_handles.pop(wsgi_req.async_id) + remove_task(handle) + + +def print_exc(): + import traceback + + traceback.print_exc() + + +def free_req_queue(wsgi_req): + """ + A #define that uses wsgi_req from context. + #define free_req_queue uwsgi.async_queue_unused_ptr++; uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = wsgi_req + """ + lib.uwsgi.async_queue_unused_ptr = lib.uwsgi.async_queue_unused_ptr + 1 + lib.uwsgi.async_queue_unused[lib.uwsgi.async_queue_unused_ptr] = wsgi_req + + +@ffi.def_extern() +def uwsgi_asyncio_wait_read_hook(fd, timeout): + print("enter read hook") + wsgi_req = uwsgi.current_wsgi_req() + # TODO + # loop = get_loop() + # loop.add_reader(fd, uwsgi_asyncio_hook_fd, wsgi_req) + try: + hook_timeout_after(wsgi_req, timeout) + try: + + # to loop + if uwsgi.schedule_to_main: + print("wait_read_hook", fd) + uwsgi.schedule_to_main(wsgi_req) + else: + print("no read hook", uwsgi.schedule_to_main) + # from loop + finally: + timeout_cancel(wsgi_req) + + if wsgi_req.async_timed_out: + return 0 + + return 1 + + except: + print_exc() + return -1 + + finally: + # returns False if not added + loop.remove_reader(fd) + + +@ffi.def_extern() +def uwsgi_asyncio_wait_write_hook(fd, timeout): + print("enter write hook") + wsgi_req = uwsgi.current_wsgi_req() + # TODO + # loop = get_loop() + # loop.add_writer(fd, uwsgi_asyncio_hook_fd, wsgi_req) + try: + hook_timeout_after(wsgi_req, timeout) + try: + # to loop + if uwsgi.schedule_to_main: + print("wait_write_hook", fd) + uwsgi.schedule_to_main(wsgi_req) + # from loop + + finally: + timeout_cancel(wsgi_req) + + if wsgi_req.async_timed_out: + return 0 + + return 1 + + except: + print_exc() + return -1 + + finally: + loop.remove_writer(fd) + + +def uwsgi_trio_request(wsgi_req, timed_out) -> int: + """ + Handle a request at the event-loop level. + The application-level request handler is on another level. + """ + status = 0 + try: + uwsgi.wsgi_req = wsgi_req + + if timed_out > 0: + raise IOError("timed out") # goto end + + # already cancelled if timed_out (probably) + timeout_cancel(wsgi_req) + + status = wsgi_req.socket.proto(wsgi_req) + + if status > 0: + timeout_after(wsgi_req, uwsgi.socket_timeout) + return status + + else: + + if status == 0: + # we call this two time... overengineering :( + uwsgi.async_proto_fd_table[wsgi_req.fd] = ffi.NULL + uwsgi.schedule_to_req() + # print("again b", request_id()) + return status + + except IOError: + # (stop reading from the fd - not needed in trio) + print_exc() + + except: + print_exc() + + # end + uwsgi.async_proto_fd_table[wsgi_req.fd] = ffi.NULL + lib.uwsgi_close_request(uwsgi.wsgi_req) + free_req_queue(wsgi_req) + + return status + + +_nurseries = {} + + +def uwsgi_trio_accept(uwsgi_sock, nursery): + + uwsgi = lib.uwsgi + wsgi_req = lib.find_first_available_wsgi_req() + if wsgi_req == ffi.NULL: + lib.uwsgi_async_queue_is_full(lib.uwsgi_now()) + return None + + # we should possibly be in the per-request greenthread here + uwsgi.wsgi_req = wsgi_req + lib.wsgi_req_setup(wsgi_req, wsgi_req.async_id, uwsgi_sock) + uwsgi.workers[uwsgi.mywid].cores[wsgi_req.async_id].in_request = 1 + _nurseries[wsgi_req.async_id] = nursery + + if lib.wsgi_req_simple_accept(wsgi_req, uwsgi_sock.fd): + uwsgi.workers[uwsgi.mywid].cores[wsgi_req.async_id].in_request = 0 + free_req_queue(wsgi_req) + return None + + wsgi_req.start_of_request = lib.uwsgi_micros() + wsgi_req.start_of_request_in_sec = wsgi_req.start_of_request // 1000000 + + # enter harakiri mode + if uwsgi.harakiri_options.workers > 0: + lib.set_harakiri(wsgi_req, uwsgi.harakiri_options.workers) + + uwsgi.async_proto_fd_table[wsgi_req.fd] = wsgi_req + + nursery.start_soon(request_task, wsgi_req.fd, uwsgi_trio_request, wsgi_req, False) + + timeout_after(wsgi_req, uwsgi.socket_timeout) + + +def uwsgi_asyncio_hook_fd(wsgi_req): + uwsgi = lib.uwsgi + print("hook fd", wsgi_req.fd) + uwsgi.wsgi_req = wsgi_req + uwsgi.schedule_to_req() + + +def uwsgi_asyncio_hook_timeout(wsgi_req): + uwsgi = lib.uwsgi + print("timeout hook", wsgi_req.fd) + uwssgi.wsgi_req = wsgi_req + uwsgi.wsgi_req.async_timed_out = 1 + uwsgi.schedule_to_req() + + +def uwsgi_asyncio_hook_fix(wsgi_req): + print("fix hook", wsgi_req.fd) + uwsgi.wsgi_req = wsgi_req + uwsgi.schedule_to_req() + + +@ffi.def_extern() +def uwsgi_asyncio_schedule_fix(wsgi_req): + if wsgi_req: + print("fix schedule", wsgi_req.fd) + print("schedule_fix") + loop = get_loop() + loop.call_soon(uwsgi_asyncio_hook_fix, wsgi_req) + + +async def request_task(fd, callback, *args): + while True: + await trio.lowlevel.wait_readable(fd) + status = callback(*args) + if status <= 0: + trio.lowlevel.notify_closing(fd) + break + + +async def reader_task(fd, callback, *args): + while True: + await trio.lowlevel.wait_readable(fd) + callback(*args) + + +async def writer_task(fd, callable, *args): + while True: + await trio.lowlevel.wait_writable(fd) + callback(*args) + + +# all of our trio stuff should be running in the main thread. +# When a socket becomes readable, we will kick off synchronous +# work in the per-request greenthread. Then if it's an ASGI +# application the per-request greenthread queues up another +# task to run on the main greenthread. Maybe we set wsgi_req +# as a ContextVar too. +# Several switches between C and Python (and any other uWSGI +# plugin we might happen to be running) depending. +async def server(sockets): + global _trio_token + _trio_token = trio.lowlevel.current_trio_token() + async with trio.open_nursery() as nursery: + nursery.start_soon(timeout_task) + for uwsgi_sock in sockets: + nursery.start_soon( + reader_task, uwsgi_sock.fd, uwsgi_trio_accept, uwsgi_sock, nursery + ) + + +@ffi.def_extern() +def asyncio_loop(): # name defined in cffi C header + """ + Set up trio. + """ + + if not uwsgi.has_threads and uwsgi.mywid == 1: + print( + "!!! Running trio without threads IS NOT recommended, enable " + "them with --enable-threads !!!\n" + ) + + if uwsgi.socket_timeout < 30: + print( + "!!! Running trio with a socket-timeout lower than 30 seconds " + "is not recommended, tune it with --socket-timeout !!!\n" + ) + + if not uwsgi.async_waiting_fd_table: + uwsgi.async_waiting_fd_table = lib.uwsgi_calloc( + ffi.sizeof("struct wsgi_request *") * uwsgi.max_fd + ) + if not uwsgi.async_proto_fd_table: + uwsgi.async_proto_fd_table = lib.uwsgi_calloc( + ffi.sizeof("struct wsgi_request *") * uwsgi.max_fd + ) + + uwsgi.wait_write_hook = lib.uwsgi_asyncio_wait_write_hook + uwsgi.wait_read_hook = lib.uwsgi_asyncio_wait_read_hook + + assert lib.uwsgi.wait_write_hook == lib.uwsgi_asyncio_wait_write_hook + + if getattr(uwsgi, "async") < 1: # keyword + print("the async loop engine requires async mode (--async )\n") + raise SystemExit(1) + + if not uwsgi.schedule_to_main: + print( + "*** DANGER *** async mode without coroutine/greenthread engine loaded !!!\n" + ) + + # call uwsgi_cffi_setup_greenlets() first: + if not uwsgi.schedule_to_req: + uwsgi.schedule_to_req = lib.async_schedule_to_req + else: + uwsgi.schedule_fix = lib.uwsgi_asyncio_schedule_fix + + sockets = [] + uwsgi_sock = uwsgi.sockets + while uwsgi_sock != ffi.NULL: + sockets.append(uwsgi_sock) + uwsgi_sock = uwsgi_sock.next + + trio.run(server, sockets) + + +def handle_asgi_request(wsgi_req, app): + """ + Handle asgi request, with synchronous code, in the per-request greenlet. + """ + scope = asgi_scope_http(wsgi_req) + gc = greenlet.getcurrent() + + async def _send(event): + gc.switch(event) + + if scope["type"] == "websocket": + + async def _ready(): + # send ping / pong (keepalive) by reading even though it isn't readable + with trio.move_on_after(lib.uwsgi.socket_timeout - 5): + await trio.lowlevel.wait_readable(wsgi_req.fd) + + send, receive = websocket_handler(wsgi_req, _send, _ready) + + elif scope["type"] == "http": + send = _send + + async def receive(): + return {"type": "http.request"} + + async def app_starter(): + await app(scope, receive, send) + + def create_app_task(): + nursery = _nurseries[wsgi_req.async_id] + nursery.start_soon(app_starter) + + _trio_token.run_sync_soon(create_app_task) + + while True: + event = gc.parent.switch() + if event["type"] == "http.response.start": + # raw uwsgi function accepts bytes + asgi_start_response(wsgi_req, event["status"], event["headers"]) + elif event["type"] == "http.response.body": + data = event["body"] + lib.uwsgi_response_write_body_do( + wsgi_req, ffi.new("char[]", data), len(data) + ) + if not event.get("more_body"): + break + elif event["type"] == "websocket.close": + break + else: + print("loop event", event) + + return lib.UWSGI_OK + + +def to_network(native): + return native.encode("latin1") + + +# Instead of using a C function pointer to call our Python function, +# we store 1 for WSGI or 2 for ASGI. +ASGI_CALLABLE = ffi.cast("void *", 2) + + +@ffi.def_extern() +def uwsgi_cffi_request(wsgi_req): + try: + return _uwsgi_cffi_request(wsgi_req) + except: + print_exc() + return lib.UWSGI_OK + + +def _uwsgi_cffi_request(wsgi_req): + """ + the WSGI request handler + """ + + if wsgi_req.async_force_again: + print("force again") + wsgi_req.async_force_again = 0 + # just close it + try: + ob_timeout = get_ob_timeout(wsgi_req) + ob_timeout.cancel() + except KeyError: + pass + asyncio.get_event_loop().remove_reader(wsgi_req.fd) + return lib.UWSGI_OK + + def writer(data): + lib.uwsgi_response_write_body_do(wsgi_req, ffi.new("char[]", data), len(data)) + + def start_response(status, headers, exc_info=None): + if exc_info: + traceback.print_exception(*exc_info) + status = to_network(status) + lib.uwsgi_response_prepare_headers( + wsgi_req, ffi.new("char[]", status), len(status) + ) + for hh in headers: + header, value = to_network(hh[0]), to_network(hh[1]) + lib.uwsgi_response_add_header( + wsgi_req, + ffi.new("char[]", header), + len(hh[0]), + ffi.new("char[]", value), + len(hh[1]), + ) + return writer + + if lib.uwsgi_parse_vars(wsgi_req): + return -1 + + # check dynamic + # check app_id + app_id = lib.uwsgi_get_app_id( + wsgi_req, wsgi_req.appid, wsgi_req.appid_len, lib.cffi_plugin.modifier1 + ) + if app_id == -1 and not lib.uwsgi.no_default_app and lib.uwsgi.default_app > -1: + # and default app modifier1 == our modifier1 + app_id = lib.uwsgi.default_app + wsgi_req.app_id = app_id + + app_mount = "" + # app_mount can be something while app_id is -1 + if wsgi_req.appid != ffi.NULL: + app_mount = ffi.string(wsgi_req.appid).decode("utf-8") + + # uwsgi app struct + wi = lib.uwsgi.workers[lib.uwsgi.mywid].apps[app_id] + wi.requests += 1 # we might wind up here more often than expected + app = wsgi_apps.get(app_id) + + # (see python wsgi_handlers.c) + + if wi.callable == ASGI_CALLABLE: + handle_asgi_request(wsgi_req, app) + return lib.UWSGI_OK + + environ = {} + iov = wsgi_req.hvec + for i in range(0, wsgi_req.var_cnt, 2): + environ[ + ffi.string(ffi.cast("char*", iov[i].iov_base), iov[i].iov_len).decode( + "latin1" + ) + ] = ffi.string( + ffi.cast("char*", iov[i + 1].iov_base), iov[i + 1].iov_len + ).decode( + "latin1" + ) + + # check bytes on environ... + environ["wsgi.version"] = (1, 0) + scheme = "http" + if "HTTPS" in environ: + if environ["HTTPS"] in ("on", "ON", "On", "1", "true", "TRUE", "True"): + scheme = "https" + environ["wsgi.url_scheme"] = environ.get("UWSGI_SCHEME", scheme) + environ["wsgi.input"] = WSGIinput(wsgi_req) + environ["wsgi.errors"] = sys.stderr + environ["wsgi.run_once"] = False + environ["wsgi.file_wrapper"] = lambda f, chunksize=0: WSGIfilewrapper( + wsgi_req, f, chunksize + ) + environ["wsgi.multithread"] = True + environ["wsgi.multiprocess"] = True + + environ["uwsgi.core"] = wsgi_req.async_id + environ["uwsgi.node"] = ffi.string(lib.uwsgi.hostname).decode("latin1") + + try: + response = app(environ, start_response) + except: + print_exc() + # can I get a 500? + # will also get here when a websocket closes + wsgi_req.async_force_again = 1 + return lib.UWSGI_AGAIN + + if type(response) is bytes: + writer(response) + else: + try: + if isinstance(response, WSGIfilewrapper): + response.sendfile() + else: + for chunk in response: + if isinstance(chunk, WSGIfilewrapper): + try: + chunk.sendfile() + finally: + chunk.close() + else: + writer(chunk) + finally: + if hasattr(response, "close"): + response.close() + + return lib.UWSGI_OK + + +def setup_trio(threads): + setattr(uwsgi, "async", threads) # keyword + if uwsgi.socket_timeout < 30: + uwsgi.socket_timeout = 30 + + uwsgi.loop = _TRIO + + +def trio_init(): + lib.uwsgi_register_loop(_TRIO, lib.asyncio_loop) diff --git a/plugins/cffi/cffitest.sh b/plugins/cffi/cffitest.sh new file mode 100755 index 0000000000..efddaf6a94 --- /dev/null +++ b/plugins/cffi/cffitest.sh @@ -0,0 +1,21 @@ +#!/bin/sh +# Run from base uwsgi directory +set -e +# export C_INCLUDE_PATH=/usr/local/opt/openssl\@1.1/include +# build uwsgi +python uwsgiconfig.py --build nolang +# build plugin +python uwsgiconfig.py -p plugins/cffi nolang +# run +./uwsgi --plugin cffi -T \ + --http-socket 0.0.0.0:8080 \ + --single-interpreter \ + --async=100 \ + --cffi-eval "import cffi_greenlets; cffi_greenlets.uwsgi_cffi_setup_greenlets()" \ + --env=PYTHONPATH=$HOME/prog/uwsgi:$HOME/prog/uwsgi/plugins/cffi \ + --cffi-wsgi=$PWD/tests/websockets_chat_async.py \ + --manage-script-name \ + --mount=/app=$PWD/examples/welcome3.py \ + --chdir=$VIRTUAL_ENV/bin \ + --master \ + --cffi-eval "import sys; print('executable:', sys.executable)" diff --git a/plugins/cffi/chattest.sh b/plugins/cffi/chattest.sh new file mode 100755 index 0000000000..c2ec965711 --- /dev/null +++ b/plugins/cffi/chattest.sh @@ -0,0 +1,21 @@ +#!/bin/sh +# Run from base uwsgi directory +set -e +export PYTHONPATH=$PWD:$PWD/plugins/cffi +export C_INCLUDE_PATH=/usr/local/opt/openssl\@1.1/include +python ./uwsgiconfig.py -p plugins/cffi nolang +authbind --deep ./uwsgi \ + --master \ + --enable-threads \ + --plugin=cffi \ + --cffi-init=cffi_setup_asyncio \ + --async=32 \ + --http=[::]:80 \ + --http-websockets \ + --http-timeout=40 \ + --manage-script-name \ + --mount=/=starlettetest:app \ + --mount=/wsgi=$PWD/examples/welcome3.py \ + --chdir=$VIRTUAL_ENV/bin \ + --touch-reload $PWD/starlettetest.py \ + --touch-reload $PWD/plugins/cffi/cffi_asyncio.py diff --git a/plugins/cffi/constants.py b/plugins/cffi/constants.py new file mode 100644 index 0000000000..179e82ee04 --- /dev/null +++ b/plugins/cffi/constants.py @@ -0,0 +1,38 @@ +""" +Process uwsgi cflags, dot-h into something cffi can use. +""" + +import re +import subprocess + +# or could run the preprocessor to omit this on unsupported platforms +skip = set(("MSG_FASTOPEN", "UWSGI_DEBUG")) + +define_re = re.compile(".*#define\s+(\w+)\s+\d+") + +try: + uwsgi_cflags = subprocess.check_output(["../../uwsgi", "--cflags"]).decode("utf-8") +except subprocess.CalledProcessError: + uwsgi_cflags = "" + +uwsgi_cdef = [] +uwsgi_defines = [] +uwsgi_cflags = uwsgi_cflags.split() + +for cflag in uwsgi_cflags: + if cflag.startswith("-D"): + line = cflag[2:] + if "=" in line or line in skip: + continue + else: + uwsgi_cdef.append("#define %s ..." % line) + +uwsgi_dot_h = open("../../uwsgi.h").read() + +with open("_constants.h", "w+") as defines: + defines.write("\n".join(uwsgi_cdef)) + defines.write("\n\n") + for line in uwsgi_dot_h.splitlines(): + match = define_re.match(line) + if match and not match.group(1).startswith("__") and not match.group(1) in skip: + defines.write("#define %s ...\n" % match.group(1)) diff --git a/plugins/cffi/filtercdefs.py b/plugins/cffi/filtercdefs.py new file mode 100755 index 0000000000..b7402760f5 --- /dev/null +++ b/plugins/cffi/filtercdefs.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +# Filter out only matching files from preprocessor output +# Omit blank lines + +DEBUG = False + +import re, sys + +comment = re.compile(r'^# (?P\d+) "(?P[^\"]+)"') + +# include stdint.h also? +allowed_filenames = re.compile(r".*(|uwsgi\.h)$") + + +def filter_by_file(input): + filename = "" + allowed = True + for line in input: + match = comment.match(line) + if match: + filename = match.group("filename") + allowed = bool(allowed_filenames.match(filename)) + if not line.strip(): + continue + if not allowed: + continue + if line[0] == "#": + continue + yield line + + +struct = re.compile(r"\s*(?P\w+)\s+(?P\w+)\s*{") +declaration = re.compile(r"\s*(?P(struct )?\w+)\s+\*?\s*(?P\w+)\s*") +matchers = { + None: struct, + "struct": declaration, + "enum": None, + "union": None, + "skip": None, +} +end = re.compile(".*}.*;") + +# types that for whatever reason we don't want to deal with +avoid_types = set( + ( + "cap_value_t", + "pthread_key_t", + "pthread_mutex_t", + "pthread_attr_t", + "pthread_t", + "ino_t", + "off_t", + "mode_t", + "struct msghdr", + "struct termios", + "struct rlimit", + "struct timeval", + "struct sockaddr_un", + "pcre", + "pcre_extra", + ) +) + +# skip lines containing these strings: +avoid_names = set(("mode_t", "uwsgi_recv_cred", "cap_value_t")) + +# declared in uwsgi.h but not included in the base profile: +avoid_names.update( + ( + "uwsgi_amqp_consume", + "uwsgi_hooks_setns_run", + "uwsgi_legion_i_am_the_lord", + "uwsgi_legion_lord_scroll", + "uwsgi_legion_scrolls", + ) +) + + +def output(string): + if not DEBUG: + return + sys.stderr.write(string) + + +# well-formatted uwsgi.h admits semi-simple parsing. +def filter_structs(lines): + stack = [None] + for line in lines: + skipline = False + state = stack[-1] + matcher = matchers[state] + if any(name in line for name in avoid_names): + skipline = True + if end.match(line): + if state == "struct": + yield "...;\n" + if state: + stack.pop() + elif matcher: + match = matcher.match(line) + if match: + output(str(match.groups()) + "\n") + if state == None: + kind = match.group("kind") + if kind == "union": + skipline = True + if not line.strip().endswith(";"): + stack.append("skip") + else: + stack.append(match.group("kind")) + elif state == "struct": + kind = match.group("kind") + output(str(match.groupdict()) + "\n") + if kind in avoid_types: + skipline = True + if kind == "union": # sockaddr_t? + skipline = True + if not line.strip().endswith(";"): + stack.append("skip") + + if skipline or state == "skip": + yield "//" + line.strip() + "\n" + else: + yield line.strip() + "\n" + + +if __name__ == "__main__": + pipeline = filter_structs(filter_by_file(open(sys.argv[1]))) + sys.stdout.writelines(pipeline) diff --git a/plugins/cffi/module_bundle.py b/plugins/cffi/module_bundle.py new file mode 100644 index 0000000000..69db0653bf --- /dev/null +++ b/plugins/cffi/module_bundle.py @@ -0,0 +1,55 @@ +""" +Load Python modules from strings. You know, for tracebacks. + +cffi can embed a single module for initialization. This lets us +have several, and shows good tracebacks with source code printouts +when there are errors in those modules. +""" + +import sys +import os +import importlib.abc, importlib.util +import gzip +import base64 + + +def unpack(data): + return gzip.decompress(base64.b64decode(data)) + + +class StringLoader(importlib.abc.SourceLoader): + """ + Allow inspection of "built-in" modules that are embedded as strings. + """ + + def __init__(self, data): + self.data = data + + def get_source(self, fullname): + try: + return unpack(self.data[fullname]).decode("utf-8") + except KeyError: + raise ImportError() + + def get_data(self, path): + path = path.partition("/")[-1][:-3] + try: + return unpack(self.data[path]) + except KeyError: + raise ImportError() + + def get_filename(self, fullname): + return "/" + fullname + ".py" + + +# MODULES go here + +loader = StringLoader(MODULES) + +for module_name in MODULES: + spec = importlib.util.spec_from_loader(module_name, loader, origin=module_name) + # spec.has_location = True + module = importlib.util.module_from_spec(spec) + if module_name != "_init": + sys.modules[module_name] = module + spec.loader.exec_module(module) diff --git a/plugins/cffi/repl_trick.sh b/plugins/cffi/repl_trick.sh new file mode 100755 index 0000000000..21abbf008b --- /dev/null +++ b/plugins/cffi/repl_trick.sh @@ -0,0 +1,7 @@ +#!/bin/sh +# Run uwsgi as an interactive Python interpreter. +# ./uwsgi --socket : --plugin cffi --cffi-eval "import IPython; IPython.embed()" --honour-stdin --cffi-home $VIRTUAL_ENV +# repl trick: +# "import code; code.interact()" +# pypy repl trick. random socket port. +./uwsgi --socket : --plugin cffi --cffi-eval "import _pypy_interact; _pypy_interact.interactive_console()" --honour-stdin --cffi-home $VIRTUAL_ENV \ No newline at end of file diff --git a/plugins/cffi/todo.py b/plugins/cffi/todo.py new file mode 100755 index 0000000000..3512a9b5cd --- /dev/null +++ b/plugins/cffi/todo.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# Find functions in pymodule that are missing from uwsgi.py +# Compile both plugins, then run this script. + +import os.path +import json +import subprocess + +script = "import uwsgi, json; print(json.dumps(sorted(dir(uwsgi))))" +CFFI = ["./uwsgi", "--plugin", "cffi", "--socket", ":", "--cffi-eval", script] +PYMODULE = ["./uwsgi", "--plugin", "python", "--socket", ":", "--eval", script] + +cffi = [ + line + for line in subprocess.run( + CFFI, cwd="../..", capture_output=True, encoding="utf-8" + ).stdout.splitlines() + if line.startswith("[") +][0] +pymodule = [ + line + for line in subprocess.run( + PYMODULE, cwd="../..", capture_output=True, encoding="utf-8" + ).stderr.splitlines() + if line.startswith("[") +][0] + +cffi_api = set(json.loads(cffi)) +pymodule_api = set(x for x in json.loads(pymodule) if not "Importer" in x) + +print("Missing functions:") +print("\n".join(sorted(pymodule_api - cffi_api))) + +print("\nExtra functions:") +print("\n".join(sorted(cffi_api - pymodule_api))) diff --git a/plugins/cffi/triotest.sh b/plugins/cffi/triotest.sh new file mode 100755 index 0000000000..d1524667bd --- /dev/null +++ b/plugins/cffi/triotest.sh @@ -0,0 +1,25 @@ +#!/bin/sh +# Run from base uwsgi directory +set -e +export PYTHONPATH=$PWD:$PWD/plugins/cffi +export C_INCLUDE_PATH=/usr/local/opt/openssl\@1.1/include +python ./uwsgiconfig.py -p plugins/cffi nolang +authbind --deep ./uwsgi \ + --master \ + --enable-threads \ + --listen 64 \ + --plugin=cffi \ + --cffi-init=cffi_setup_trio \ + --socket-timeout=60 \ + --async=64 \ + --http=[::]:80 \ + --http-websockets \ + --manage-script-name \ + --mount=/=starlettetest:app \ + --mount=/wsgi=$PWD/examples/welcome3.py \ + --chdir=$VIRTUAL_ENV/bin \ + --touch-reload $PWD/starlettetest.py \ + --touch-reload $PWD/plugins/cffi/cffi_trio.py + + +# --cffi-home=$VIRTUAL_ENV \ diff --git a/plugins/cffi/types.h b/plugins/cffi/types.h new file mode 100644 index 0000000000..2206e1a312 --- /dev/null +++ b/plugins/cffi/types.h @@ -0,0 +1,14 @@ +// these may not be portable... +typedef int32_t pid_t; +typedef int32_t uid_t; +typedef int32_t gid_t; +typedef long time_t; +typedef unsigned int socklen_t; +typedef long off_t; +typedef uint64_t rlim_t; + +struct iovec { + void *iov_base; + size_t iov_len; + ...; +}; \ No newline at end of file diff --git a/plugins/cffi/uwsgi.py b/plugins/cffi/uwsgi.py new file mode 100644 index 0000000000..2ea76fc5f0 --- /dev/null +++ b/plugins/cffi/uwsgi.py @@ -0,0 +1,723 @@ +""" +Implement the uwsgi module. + +Based on plugins/pypy/pypy_setup.py +""" + +import os +from _uwsgi import ffi, lib + +# this is a list holding object we do not want to be freed (like callback and handlers) +uwsgi_gc = [] + +applications = {} # TODO + +# latin1 might be better +hostname = ffi.string(lib.uwsgi.hostname, lib.uwsgi.hostname_len).decode("utf-8") +numproc = lib.uwsgi.numproc +buffer_size = lib.uwsgi.buffer_size +if lib.uwsgi.mode: + mode = ffi.string(lib.uwsgi.mode).decode("utf-8") +if lib.uwsgi.pidfile: + pidfile = ffi.string(lib.uwsgi.pidfile).decode("utf-8") + +# parse version from cflags +_version = {} +for _flag in ffi.string(lib.uwsgi_get_cflags()).decode("utf-8").split(): + if _flag.startswith("-DUWSGI_VERSION"): + _k, _v = _flag.split("=", 1) + _version[_k] = _v.replace('"', "").replace("\\", "") + +version = _version["-DUWSGI_VERSION"] +version_info = tuple( + int(_version["-DUWSGI_VERSION_" + _k]) + for _k in ("BASE", "MAJOR", "MINOR", "REVISION") +) + (_version["-DUWSGI_VERSION_CUSTOM"],) + + +def _encode_or_null(*args): + """ + Convert strings to utf-8 preserving ffi.NULL + """ + return (arg if arg == ffi.NULL else arg.encode("utf-8") for arg in args) + + +def _current_wsgi_req(): + wsgi_req = lib.uwsgi.current_wsgi_req() + if wsgi_req == ffi.NULL: + raise SystemError("you can call uwsgi api function only from the main callable") + return wsgi_req + + +def _print_exc(): + import traceback + + traceback.print_exc() + + +def request_id(): + wsgi_req = _current_wsgi_req() + return lib.uwsgi.workers[lib.uwsgi.mywid].cores[wsgi_req.async_id].requests + + +def async_id(): + wsgi_req = _current_wsgi_req() + return wsgi_req.async_id + + +def register_signal(signum, kind, handler): + cb = ffi.callback("void(int)", handler) + uwsgi_gc.append(cb) + if ( + lib.uwsgi_register_signal( + signum, + ffi.new("char[]", kind.encode("utf-8")), + cb, + lib.cffi_plugin.modifier1, + ) + < 0 + ): + raise Exception("unable to register signal %d" % signum) + + +class uwsgi_pypy_RPC(object): + def __init__(self, func): + self.func = func + + def __call__(self, argc, argv, argvs, buf): + pargs = [] + for i in range(0, argc): + pargs.append(ffi.buffer(argv[i], argvs[i])[:]) + response = self.func(*pargs) + if len(response) > 0: + buf[0] = lib.uwsgi_malloc(len(response)) + dst = ffi.buffer(buf[0], len(response)) + dst[: len(response)] = response + return len(response) + + +def register_rpc(name, func, argc=0): + rpc_func = uwsgi_pypy_RPC(func) + cb = ffi.callback("int(int, char*[], int[], char**)", rpc_func) + uwsgi_gc.append(cb) + if ( + lib.uwsgi_register_rpc( + ffi.new("char[]", name.encode("utf-8")), + ffi.addressof(lib.cffi_plugin), + argc, + cb, + ) + < 0 + ): + raise Exception("unable to register rpc func %s" % name) + + +def rpc(node, func, *args): + argc = 0 + argv = ffi.new("char*[256]") + argvs = ffi.new("uint16_t[256]") + rsize = ffi.new("uint64_t*") + + for arg in args: + if argc >= 255: + raise Exception("invalid number of rpc arguments") + if len(arg) >= 65535: + raise Exception("invalid rpc argument size (must be < 65535)") + argv[argc] = ffi.new("char[]", arg.encode("utf-8")) + argvs[argc] = len(arg) + argc += 1 + + if node: + c_node = ffi.new("char[]", node.encode("utf-8")) + else: + c_node = ffi.NULL + + response = lib.uwsgi_do_rpc( + c_node, ffi.new("char[]", func.encode("utf-8")), argc, argv, argvs, rsize + ) + if response: + ret = ffi.buffer(response, rsize[0])[:] + lib.free(response) + return ret + return None + + +def call(func, *args): + node = None + if "@" in func: + (func, node) = func.split("@") + return uwsgi_pypy_rpc(node, func, *args) + + +signal = lambda x: lib.uwsgi_signal_send(lib.uwsgi.signal_socket, x) + +metric_get = lambda x: lib.uwsgi_metric_get(x, ffi.NULL) +metric_set = lambda x, y: lib.uwsgi_metric_set(x, ffi.NULL, y) +metric_inc = lambda x, y=1: lib.uwsgi_metric_inc(x, ffi.NULL, y) +metric_dec = lambda x, y=1: lib.uwsgi_metric_dec(x, ffi.NULL, y) +metric_mul = lambda x, y=1: lib.uwsgi_metric_mul(x, ffi.NULL, y) +metric_div = lambda x, y=1: lib.uwsgi_metric_div(x, ffi.NULL, y) + + +def metric_set_max(name, value): + """ + only set the metric name if the give value is greater than the one currently stored + """ + (name,) = _encode_or_null(name) + if lib.uwsgi_metric_set_max(name, ffi.NULL, value): + return None + return True + + +def metric_set_min(name, value): + """ + only set the metric name if the give value is lower than the one currently stored + """ + (name,) = _encode_or_null(name) + if lib.uwsgi_metric_set_min(name, ffi.NULL, value): + return None + return True + + +def cache_clear(cache=ffi.NULL): + """ + The cache argument is the so called "magic identifier". Its syntax is cache[@node]. + """ + (cache,) = _encode_or_null(cache) + if not lib.uwsgi_cache_magic_clear(cache): + return True + return None + + +def cache_get(key, cache=ffi.NULL): + key, cache = _encode_or_null(key, cache) + vallen = ffi.new("uint64_t*") + value = lib.uwsgi_cache_magic_get(key, len(key), vallen, ffi.NULL, cache) + if value == ffi.NULL: + return None + ret = ffi.buffer(value, vallen[0])[:] + lib.free(value) + return ret + + +def cache_set(key, value, expires=0, cache=ffi.NULL): + key, cache = _encode_or_null(key, cache) + if ( + lib.uwsgi_cache_magic_set(key, len(key), value, len(value), expires, 0, cache) + < 0 + ): + raise Exception("unable to store item in the cache") + + +def cache_update(key, value, expires=0, cache=ffi.NULL): + key, cache = _encode_or_null(key, cache) + if ( + lib.uwsgi_cache_magic_set( + key, len(key), value, len(value), expires, 1 << 1, cache + ) + < 0 + ): + raise Exception("unable to store item in the cache") + + +def cache_del(key, cache=ffi.NULL): + key, cache = _encode_or_null(key, cache) + if lib.uwsgi_cache_magic_del(key, len(key), cache) < 0: + raise Exception("unable to delete item from the cache") + + +def cache_keys(cache=ffi.NULL): + (cache,) = _encode_or_null(cache) + uc = lib.uwsgi_cache_by_name(cache) + if uc == ffi.NULL: + raise Exception("no local uWSGI cache available") + l = [] + lib.uwsgi_cache_rlock(uc) + pos = ffi.new("uint64_t *") + uci = ffi.new("struct uwsgi_cache_item **") + while True: + uci[0] = lib.uwsgi_cache_keys(uc, pos, uci) + if uci[0] == ffi.NULL: + break + l.append(ffi.buffer(lib.uwsgi_cache_item_key(uci[0]), uci[0].keysize)[:]) + lib.uwsgi_cache_rwunlock(uc) + return l + + +def cache_exists(key, cache=ffi.NULL): + key, cache = _encode_or_null(key, cache) + if lib.uwsgi_cache_magic_exists(key, len(key), cache): + return True + return None + + +def add_timer(signum, secs): + if lib.uwsgi_add_timer(signum, secs) < 0: + raise Exception("unable to register timer") + + +def add_rb_timer(signum, secs): + if lib.uwsgi_signal_add_rb_timer(signum, secs, 0) < 0: + raise Exception("unable to register redblack timer") + + +def add_file_monitor(signum, filename): + if ( + lib.uwsgi_add_file_monitor(signum, ffi.new("char[]", filename.encode("utf-8"))) + < 0 + ): + raise Exception("unable to register file monitor") + + +def lock(num=0): + if lib.uwsgi_user_lock(num) < 0: + raise Exception("invalid lock") + + +def unlock(num=0): + if lib.uwsgi_user_unlock(num) < 0: + raise Exception("invalid lock") + + +def masterpid(): + if lib.uwsgi.master_process: + return lib.uwsgi.workers[0].pid + return 0 + + +def worker_id(): + return lib.uwsgi.mywid + + +def mule_id(): + return lib.uwsgi.muleid + + +def mule_msg_recv_size(): + return lib.uwsgi.mule_msg_recv_size + + +mule_msg_extra_hooks = [] + + +def install_mule_msg_hook(mule_msg_dispatcher): + mule_msg_extra_hooks.append(mule_msg_dispatcher) + + +# plugin callback defined outside cffi_init.py +@ffi.def_extern() +def uwsgi_cffi_mule_msg(message, len): + msg = ffi.string(message, len) + if not mule_msg_extra_hooks: + return 0 + for hook in mule_msg_extra_hooks: + try: + hook(msg) + except: + _print_exc() + + return 1 + + +def logsize(): + return lib.uwsgi.shared.logsize + + +def signal_registered(signum): + if lib.uwsgi_signal_registered(signum) > 0: + return True + return False + + +def alarm(alarm, msg): + lib.uwsgi_alarm_trigger( + ffi.new("char[]", alarm.encode("utf-8")), + ffi.new("char[]", msg.encode("utf-8")), + len(msg), + ) + + +setprocname = lambda name: lib.uwsgi_set_processname( + ffi.new("char[]", name.encode("utf-8")) +) + + +def add_cron(signum, minute, hour, day, month, week): + if lib.uwsgi_signal_add_cron(signum, minute, hour, day, month, week) < 0: + raise Exception("unable to register cron") + + +def suspend(): + """ + uwsgi.suspend() + """ + wsgi_req = _current_wsgi_req() + if lib.uwsgi.schedule_to_main: + lib.uwsgi.schedule_to_main(wsgi_req) + + +def workers(): + """ + uwsgi.workers() + """ + workers = [] + for i in range(1, lib.uwsgi.numproc + 1): + worker = {} + worker["id"] = lib.uwsgi.workers[i].id + worker["pid"] = lib.uwsgi.workers[i].pid + worker["requests"] = lib.uwsgi.workers[i].requests + worker["delta_requests"] = lib.uwsgi.workers[i].delta_requests + worker["signals"] = lib.uwsgi.workers[i].signals + worker["exceptions"] = lib.uwsgi_worker_exceptions(i) + worker["apps"] = [] + apps_cnt = lib.uwsgi.workers[i].apps_cnt + apps = lib.uwsgi.workers[i].apps + for j in range(0, apps_cnt): + app = apps[j] + worker["apps"].append( + { + "id": j, + "mountpoint": ffi.string(app.mountpoint, app.mountpoint_len).decode( + "utf-8" + ), + "startup_time": app.startup_time, + "requests": app.requests, + } + ) + if lib.uwsgi.workers[i].cheaped: + worker["status"] == "cheap" + elif lib.uwsgi.workers[i].suspended and not lib.uwsgi_worker_is_busy(i): + worker["status"] == "pause" + else: + if lib.uwsgi.workers[i].sig: + worker["status"] = "sig%d" % lib.uwsgi.workers[i].signum + elif lib.uwsgi_worker_is_busy(i): + worker["status"] = "busy" + else: + worker["status"] = "idle" + worker["running_time"] = lib.uwsgi.workers[i].running_time + worker["avg_rt"] = lib.uwsgi.workers[i].avg_response_time + worker["tx"] = lib.uwsgi.workers[i].tx + + workers.append(worker) + return workers + + +def async_sleep(timeout): + """ + uwsgi.async_sleep(timeout) + """ + if timeout > 0: + wsgi_req = _current_wsgi_req() + lib.async_add_timeout(wsgi_req, timeout) + + +def async_connect(addr): + """ + uwsgi.async_connect(addr) + """ + fd = lib.uwsgi_connect(ffi.new("char[]", addr.encode("utf-8")), 0, 1) + if fd < 0: + raise Exception("unable to connect to %s" % addr) + return fd + + +connection_fd = lambda: _current_wsgi_req().fd + + +def wait_fd_read(fd, timeout=0): + """ + uwsgi.wait_fd_read(fd, timeout=0) + """ + wsgi_req = _current_wsgi_req() + if lib.async_add_fd_read(wsgi_req, fd, timeout) < 0: + raise Exception("unable to add fd %d to the event queue" % fd) + + +def wait_fd_write(fd, timeout=0): + """ + uwsgi.wait_fd_write(fd, timeout=0) + """ + wsgi_req = _current_wsgi_req() + if lib.async_add_fd_write(wsgi_req, fd, timeout) < 0: + raise Exception("unable to add fd %d to the event queue" % fd) + + +def ready_fd(): + """ + uwsgi.ready_fd() + """ + wsgi_req = _current_wsgi_req() + return lib.uwsgi_ready_fd(wsgi_req) + + +def send(*args): + """ + uwsgi.send(fd=None,data) + """ + if len(args) == 0: + raise ValueError("uwsgi.send() takes at least 1 argument") + elif len(args) == 1: + wsgi_req = _current_wsgi_req() + fd = wsgi_req.fd + data = args[0] + else: + fd = args[0] + data = args[1] + rlen = lib.write(fd, data, len(data)) + if rlen <= 0: + raise IOError("unable to send data") + return rlen + + +def recv(*args): + """ + uwsgi.recv(fd=None,len) + """ + if len(args) == 0: + raise ValueError("uwsgi.recv() takes at least 1 argument") + elif len(args) == 1: + wsgi_req = _current_wsgi_req() + fd = wsgi_req.fd + l = args[0] + else: + fd = args[0] + l = args[1] + data = ffi.new("char[%d]" % l) + # why not os.read()? + rlen = lib.read(fd, data, l) + if rlen <= 0: + raise IOError("unable to receive data") + return ffi.string(data[0:rlen]) + + +""" +uwsgi.close(fd) +""" +close = lambda fd: lib.close(fd) + +""" +uwsgi.disconnect() +""" +disconnect = lambda: lib.uwsgi_disconnect(_current_wsgi_req()) + + +def route(router, args): + """ + Route request to another uWSGI instance. + Like proxy_pass, but with the uWSGI protocol or whatever the named router supports. + + See https://uwsgi-docs.readthedocs.io/en/latest/articles/OffloadingWebsocketsAndSSE.html#simplifying-things-using-the-uwsgi-api-uwsgi-2-0-3 + """ + wsgi_req = _current_wsgi_req() + router, args = _encode_or_null(router, args) + c_args = lib.uwsgi_concat2(args, b"") # freed by route_api_func + return lib.uwsgi_route_api_func(wsgi_req, router, c_args) + + +def sendfile(filename): + # TODO support all argument types from uwsgi_pymodule.c + + fd = os.open(filename, os.O_RDONLY) + pos = 0 + filesize = 0 + lib.uwsgi_response_sendfile_do(_current_wsgi_req(), fd, pos, filesize) + # uwsgi check write errors? + return [] + + +def websocket_recv(): + """ + uwsgi.websocket_recv() + """ + wsgi_req = lib.uwsgi.current_wsgi_req() + ub = lib.uwsgi_websocket_recv(wsgi_req) + if ub == ffi.NULL: + raise IOError("unable to receive websocket message") + ret = ffi.buffer(ub.buf, ub.pos)[:] + lib.uwsgi_buffer_destroy(ub) + return ret + + +def websocket_recv_nb(): + """ + uwsgi.websocket_recv_nb() + """ + wsgi_req = lib.uwsgi.current_wsgi_req() + ub = lib.uwsgi_websocket_recv_nb(wsgi_req) + if ub == ffi.NULL: + raise IOError("unable to receive websocket message") + ret = ffi.buffer(ub.buf, ub.pos)[:] + lib.uwsgi_buffer_destroy(ub) + return ret + + +def websocket_handshake(key=None, origin=None, proto=None): + """ + uwsgi.websocket_handshake(key, origin) + """ + wsgi_req = _current_wsgi_req() + len_key = 0 + len_origin = 0 + len_proto = 0 + c_key = ffi.NULL + c_origin = ffi.NULL + c_proto = ffi.NULL + if key: + len_key = len(key) + c_key = ffi.new("char[]", key.encode("latin1")) # correct encoding? + if origin: + len_origin = len(origin) + c_origin = ffi.new("char[]", origin.encode("latin1")) + if proto: + len_proto = len(proto) + c_proto = ffi.new("char[]", proto.encode("latin1")) + if ( + lib.uwsgi_websocket_handshake( + wsgi_req, c_key, len_key, c_origin, len_origin, c_proto, len_proto + ) + < 0 + ): + raise IOError("unable to complete websocket handshake") + + +def websocket_send(msg): + """ + uwsgi.websocket_send(msg) + """ + wsgi_req = lib.uwsgi.current_wsgi_req() + if ( + lib.uwsgi_websocket_send( + wsgi_req, ffi.new("char[]", msg.encode("utf-8")), len(msg) + ) + < 0 + ): + raise IOError("unable to send websocket message") + + +def websocket_send_binary(msg): + """ + uwsgi.websocket_send_binary(msg) + """ + wsgi_req = lib.uwsgi.current_wsgi_req() + if ( + lib.uwsgi_websocket_send_binary( + wsgi_req, ffi.new("char[]", msg.encode("utf-8")), len(msg) + ) + < 0 + ): + raise IOError("unable to send binary websocket message") + + +def chunked_read(timeout=0): + """ + uwsgi.chunked_read(timeout=0) + """ + wsgi_req = _current_wsgi_req() + rlen = ffi.new("size_t*") + chunk = lib.uwsgi_chunked_read(wsgi_req, rlen, timeout, 0) + if chunk == ffi.NULL: + raise IOError("unable to receive chunked part") + return ffi.buffer(chunk, rlen[0])[:] + + +def chunked_read_nb(): + """ + uwsgi.chunked_read_nb() + """ + wsgi_req = _current_wsgi_req() + rlen = ffi.new("size_t*") + chunk = lib.uwsgi_chunked_read(wsgi_req, rlen, 0, 1) + if chunk == ffi.NULL: + if lib.uwsgi_is_again() > 0: + return None + raise IOError("unable to receive chunked part") + + return ffi.buffer(chunk, rlen[0])[:] + + +def set_warning_message(message): + message = message.encode("utf-8") + if len(message) > 80: + lib.uwsgi_log(b"- warning message must be max 80 chars, it will be truncated -") + message = message[:80] + lib.uwsgi.shared.warning_message[0 : len(message)] = message + b"\0" + return True + + +def set_user_harakiri(x): + """ + uwsgi.set_user_harakiri(sec) + """ + wsgi_req = _current_wsgi_req() + lib.set_user_harakiri(wsgi_req, x) + + +def get_logvar(key): + """ + uwsgi.get_logvar(key) + """ + wsgi_req = _current_wsgi_req() + c_key = ffi.new("char[]", key.encode("utf-8")) + lv = lib.uwsgi_logvar_get(wsgi_req, c_key, len(key)) + if lv: + return ffi.string(lv.val[0 : lv.vallen]) + return None + + +def set_logvar(key, val): + """ + uwsgi.set_logvar(key, value) + """ + wsgi_req = _current_wsgi_req() + c_key = ffi.new("char[]", key.encode("utf-8")) + c_val = ffi.new("char[]", val.encode("utf-8")) + lib.uwsgi_logvar_add(wsgi_req, c_key, len(key), c_val, len(val)) + + +SPOOL_OK = -2 +SPOOL_RETRY = -1 +SPOOL_IGNORE = 0 + + +def _init(): + """ + Create uwsgi module. + + This is a way to control what we export. + """ + import sys + import types + + class UwsgiModule(types.ModuleType): + pass + + module = UwsgiModule("uwsgi") + module.__dict__.update( + (k, v) for (k, v) in globals().items() if not k.startswith("_") + ) + + """ + populate uwsgi.opt + """ + opt = {} + for i in range(0, lib.uwsgi.exported_opts_cnt): + uo = lib.uwsgi.exported_opts[i] + k = ffi.string(uo.key) + if uo.value == ffi.NULL: + v = True + else: + v = ffi.string(uo.value) + if k in opt: + if type(opt[k]) is list: + opt[k].append(v) + else: + opt[k] = [opt[k], v] + else: + opt[k] = v + module.opt = opt + + sys.modules["uwsgi"] = module + + +_init() diff --git a/plugins/cffi/uwsgiplugin.py b/plugins/cffi/uwsgiplugin.py new file mode 100644 index 0000000000..42c26fd2dc --- /dev/null +++ b/plugins/cffi/uwsgiplugin.py @@ -0,0 +1,137 @@ +NAME = "cffi" + +import os.path +import sys +from distutils import sysconfig +import subprocess + +# Our modified uwsgiconfig.py passes its config to us as a global +UWSGI_CFLAGS = UWSGICONFIG.cflags[:] + +# These flags expose features we don't want to wrap in the Python API. +# Our generated code will be complied with all the cflags. +avoid_flags = ["-DUWSGI_PCRE", "-DUWSGI_SSL", "-DUWSGI_ZLIB"] +UWSGI_CFLAGS = [flag for flag in UWSGI_CFLAGS if flag not in avoid_flags] + +command = [ + "gcc", + "./uwsgi.h", + "-E", + '-D"__attribute__(x)"=', +] + UWSGI_CFLAGS + +# cffi requires a preprocessed uwsgi.h +with open("plugins/cffi/_uwsgi_preprocessed.h", "w+") as output: + subprocess.run( + " ".join(command), encoding="utf-8", check=True, shell=True, stdout=output + ) + +# other build steps that don't require cflags +subprocess.check_call(["make", "PYTHON=%s" % sys.executable], cwd="plugins/cffi") + + +def get_python_version(): + version = sysconfig.get_config_var("VERSION") + try: + version = version + sys.abiflags + except Exception: + pass + return version + + +GCC_LIST = ["cffi_plugin"] + +if sys.implementation.name == "pypy": + # Link to the base libpypy3-c.so, not the copy in a virtualenv: + + CFLAGS = [ + "-pthread", + "-DNDEBUG", + f"-I{sys.base_exec_prefix}/include", + f"-I{sys.prefix}/include", + "-fvisibility=hidden", + ] + + if sys.platform == "linux": + LDFLAGS = [ + f"-L{sys.base_exec_prefix}/bin/", + f"-Wl,-rpath={sys.base_exec_prefix}/bin/", + "-lpypy3-c", + ] + else: + LDFLAGS = [f"-L{sys.base_exec_prefix}/bin/", "-lpypy3-c"] + LIBS = [] + + def post_build(config): + # How to detect embedded or shared object? + # find pypy3-c on osx + if sys.platform == "darwin": + rpath = os.path.join(sys.base_exec_prefix, "bin") + subprocess.check_call( + ["install_name_tool", "-add_rpath", rpath, "cffi_plugin.so"] + ) + + +else: + # copied from plugins/python + + CFLAGS = [ + "-I" + sysconfig.get_python_inc(), + "-I" + sysconfig.get_python_inc(plat_specific=True), + ] + LDFLAGS = [] + + if "UWSGI_PYTHON_NOLIB" not in os.environ: + LIBS = ( + sysconfig.get_config_var("LIBS").split() + + sysconfig.get_config_var("SYSLIBS").split() + ) + # check if it is a non-shared build (but please, add --enable-shared to your python's ./configure script) + if not sysconfig.get_config_var("Py_ENABLE_SHARED"): + libdir = sysconfig.get_config_var("LIBPL") + # libdir does not exists, try to get it from the venv + version = get_python_version() + if not os.path.exists(libdir): + libdir = "%s/lib/python%s/config" % (sys.prefix, version) + # try skipping abiflag + if not os.path.exists(libdir) and version.endswith("m"): + version = version[:-1] + libdir = "%s/lib/python%s/config" % (sys.prefix, version) + # try 3.x style config dir + if not os.path.exists(libdir): + libdir = "%s/lib/python%s/config-%s" % ( + sys.prefix, + version, + get_python_version(), + ) + + # get cpu type + uname = os.uname() + if uname[4].startswith("arm"): + libpath = "%s/%s" % (libdir, sysconfig.get_config_var("LIBRARY")) + if not os.path.exists(libpath): + libpath = "%s/%s" % (libdir, sysconfig.get_config_var("LDLIBRARY")) + else: + libpath = "%s/%s" % (libdir, sysconfig.get_config_var("LDLIBRARY")) + if not os.path.exists(libpath): + libpath = "%s/%s" % (libdir, sysconfig.get_config_var("LIBRARY")) + if not os.path.exists(libpath): + libpath = "%s/libpython%s.a" % (libdir, version) + LIBS.append(libpath) + # hack for messy linkers/compilers + if "-lutil" in LIBS: + LIBS.append("-lutil") + else: + try: + libdir = sysconfig.get_config_var("LIBDIR") + except Exception: + libdir = "%s/lib" % sysconfig.PREFIX + + LDFLAGS.append("-L%s" % libdir) + LDFLAGS.append("-Wl,-rpath,%s" % libdir) + + os.environ["LD_RUN_PATH"] = "%s" % libdir + + LIBS.append("-lpython%s" % get_python_version()) + else: + LIBS = [] diff --git a/starlettetest.html b/starlettetest.html new file mode 100644 index 0000000000..d6ca1a7bc2 --- /dev/null +++ b/starlettetest.html @@ -0,0 +1,84 @@ + + + + + + + + + + + + +
+ + +
WebSocket Chat (asgi + $async_library)
+
+ + +
+ + + + + + + + + + +
+
+
+ + + + + + + \ No newline at end of file diff --git a/starlettetest.py b/starlettetest.py new file mode 100644 index 0000000000..fd6af73c27 --- /dev/null +++ b/starlettetest.py @@ -0,0 +1,165 @@ +from starlette.applications import Starlette +from starlette.responses import JSONResponse, HTMLResponse +from starlette.websockets import WebSocket +from starlette.routing import Route, Mount, WebSocketRoute + +import string + +import os +import asyncio +import asyncio_redis +import sniffio + +import inspect +import signal + +# pypy 7.3.3-beta0 doesn't have warn_on_full_buffer +try: + argspec = inspect.getfullargspec(signal.set_wakeup_fd) + if not "warn_on_full_buffer" in argspec.kwonlyargs: + import trio._core._wakeup_socketpair + + trio._core._wakeup_socketpair.HAVE_WARN_ON_FULL_BUFFER = False +except TypeError: # CPython + pass + + +def get_template(): + with open(os.path.join(os.path.dirname(__file__), "starlettetest.html")) as tf: + template = tf.read() + return string.Template(template) + + +REDIS_CHANNEL = "foobar" + + +async def homepage(request): + ws_scheme = "wss" + if request["scheme"] == "http": + ws_scheme = "ws" + return HTMLResponse( + get_template().substitute( + protocol=ws_scheme, + path=request.headers["host"], + async_library=ASYNC_LIBRARY + ) + ) + + +async def chat_endpoint(websocket: WebSocket): + """ + Relay websocket messages to redis channel and vice versa. + """ + await websocket.accept() + + connection = await redis_connect() + subscriber = await redis_subscribe() + await subscriber.subscribe([REDIS_CHANNEL]) + + async def left(): + while True: + event = await websocket.receive() + if event["type"] != "websocket.receive": # disconnect? + print("done receiving", event) + break + msg = event["text"] + await connection.publish(REDIS_CHANNEL, msg) + raise StopIteration() # to cancel the wait + + async def right(): + try: + while True: + print("getting a redis message") + msg = await subscriber.next_published() + print("redis msg", msg) + if msg: + await websocket.send_text(msg.value) + if msg.value == "8k": + # maybe switch contexts? + await websocket.send_text("k" * 65536) + except: + import traceback + + traceback.print_exc() + raise + + loop = asyncio.get_event_loop() + + try: + # no default Starlette exception printing? + tasks = loop.create_task(right()), loop.create_task(left()) + print("tasking", tasks) + await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + except: + import traceback + + traceback.print_exc() + finally: + for task in tasks: + print("cancel", task) + task.cancel() + + +async def redis_connect(): + return await asyncio_redis.Connection.create(host="localhost", port=6379) + + +async def redis_subscribe(): + connection = await redis_connect() + subscriber = await connection.start_subscribe() + return subscriber + + +app_to_wrap = Starlette( + debug=True, + routes=[Route("/", homepage), WebSocketRoute("/foobar/", chat_endpoint)], +) + + +async def app(scope, receive, send): + global ASYNC_LIBRARY + ASYNC_LIBRARY = sniffio.current_async_library() + if sniffio.current_async_library() == "trio": + import trio_asyncio + + async with trio_asyncio.open_loop() as loop: + # async part of your main program here + receive_ = trio_asyncio.trio_as_aio(receive) + send_ = trio_asyncio.trio_as_aio(send) + await trio_asyncio.aio_as_trio(app_to_wrap)(scope, receive_, send_) + else: + await app_to_wrap(scope, receive, send) + + +import sys + + +UNINTERESTING_FUNCS = set(("asgi_scope_http",)) + + +def trace_calls_and_returns(frame, event, arg): + co = frame.f_code + func_name = co.co_name + if func_name == "write": + # Ignore write() calls from print statements + return + line_no = frame.f_lineno + filename = co.co_filename + if not ( + "starlette" in filename + or filename.startswith("/Users/daniel/prog/uwsgi/plugins/cffi/") + ): + return + if event == "call": + print("Call to %s on line %s of %s" % (func_name, line_no, filename)) + return trace_calls_and_returns + elif event == "return": + print("%s => %s" % (func_name, arg)) + elif event == "line" and func_name not in UNINTERESTING_FUNCS: + print("%s:%s of %s" % (func_name, line_no, filename)) + # else: + # print(event) + return + + +# sys.settrace(trace_calls_and_returns) diff --git a/tests/asgitest.py b/tests/asgitest.py new file mode 100644 index 0000000000..9dd142b43b --- /dev/null +++ b/tests/asgitest.py @@ -0,0 +1,134 @@ + + +async def asgifoo(scope, receive, send): + try: + pprint.pprint("in asgifoo") + event = await receive() + pprint.pprint("asgifoo got") + pprint.pprint(event) + await send( + { + "type": "http.response.start", + "status": 200, + "headers": [ + (b"X-Header", b"Amazing Value"), + (b"Content-Type", b"text/plain; charset=UTF-8"), + ], + } + ) + for i in range(32): + await send( + { + "type": "http.response.body", + "body": b"Hello from asgifoo %d\n" % i, + "more_body": True, + } + ) + await asyncio.sleep(1) + await send({"type": "http.response.body", "body": b"Goodbye from asgifoo"}) + except: + # critical debug info + import traceback + + traceback.print_exc() + + +import sys + + +def trace_calls_and_returns(frame, event, arg): + co = frame.f_code + func_name = co.co_name + if func_name == "write": + # Ignore write() calls from print statements + return + line_no = frame.f_lineno + filename = co.co_filename + if event == "call": + print("Call to %s on line %s of %s" % (func_name, line_no, filename)) + return trace_calls_and_returns + elif event == "return": + print("%s => %s" % (func_name, arg)) + # else: + # print(event) + return + + +from starlettetest import app + + +def application(env, sr): + """ + Adapt uwsgi + greenlets to asgi + asyncio + """ + + # skip extra request + if env["PATH_INFO"] == "/favicon.ico": + sr("200 OK", [("Content-Type", "image/x-icon")]) + return [b""] + + import _uwsgi + from _uwsgi import ffi, lib + + wsgi_req = _uwsgi.lib.uwsgi.current_wsgi_req() + + environ = {} + headers = [] + iov = wsgi_req.hvec + for i in range(0, wsgi_req.var_cnt, 2): + key, value = ( + ffi.string(ffi.cast("char*", iov[i].iov_base), iov[i].iov_len), + ffi.string(ffi.cast("char*", iov[i + 1].iov_base), iov[i + 1].iov_len), + ) + if key.startswith(b"HTTP_"): + headers.append((key[5:].lower(), value)) + else: + environ[key.decode("ascii")] = value + + scope = { + "type": "http", + "asgi": {"spec_version": "2.1"}, + "http_version": environ["SERVER_PROTOCOL"][len("HTTP/") :].decode("utf-8"), + "method": environ["REQUEST_METHOD"].decode("utf-8"), + "scheme": environ.get("UWSGI_SCHEME", "http"), + "path": environ["PATH_INFO"].decode("utf-8"), + "raw_path": environ["REQUEST_URI"], + "query_string": environ["QUERY_STRING"], + "root_path": environ["SCRIPT_NAME"].decode("utf-8"), + "headers": headers, + # some references to REMOTE_PORT but not always in environ + "client": (environ["REMOTE_ADDR"].decode("utf-8"), 0), + "server": (environ["SERVER_NAME"].decode("utf-8"), environ["SERVER_PORT"]), + } + + gc = greenlet.getcurrent() + + async def send(event): + gc.switch(event) + # do we need to do anything else to avoid pausing asyncio? or to give + # backpressure to the asgi app by waiting a bit longer? + # uwsgi probably switches to main greenlet / event loop for us. + + async def receive(): + print("can we rx something?") + return {"type": "http.request"} + + asyncio.get_event_loop().create_task(app(scope, receive, send)) + + while True: + event = gc.parent.switch() + print("got", event, "in adapter") + if event["type"] == "http.response.start": + # raw uwsgi function accepts bytes + sr( + str(event["status"]), + [ + [key.decode("latin1"), value.decode("latin1")] + for (key, value) in event["headers"] + ], + ) + elif event["type"] == "http.response.body": + yield event["body"] + if not event.get("more_body"): + break + diff --git a/tests/testgevent.py b/tests/testgevent.py index 53849abff3..3b8fc89655 100644 --- a/tests/testgevent.py +++ b/tests/testgevent.py @@ -6,21 +6,22 @@ def microtask(wid): - print "i am a gevent task" + print("i am a gevent task") gevent.sleep(10) - print "10 seconds elapsed in worker id %d" % wid + print("10 seconds elapsed in worker id %d" % wid) def athread(): while True: time.sleep(1) - print "i am the thread 1" + print("i am the thread 1") def athread2(): while True: time.sleep(1) - print "i am the thread 2" + print("i am the thread 2") + t1 = Thread(target=athread) t1.daemon = True @@ -34,7 +35,7 @@ def athread2(): def application(environ, start_response): gevent.sleep() - start_response('200 OK', [('Content-Type', 'text/html')]) + start_response("200 OK", [("Content-Type", "text/html")]) yield "sleeping for 3 seconds...
" gevent.sleep(3) yield "done
" diff --git a/tests/websockets_chat_async.py b/tests/websockets_chat_async.py index bc05473567..ece8b9084f 100644 --- a/tests/websockets_chat_async.py +++ b/tests/websockets_chat_async.py @@ -10,12 +10,12 @@ def application(env, sr): - ws_scheme = 'ws' - if 'HTTPS' in env or env['wsgi.url_scheme'] == 'https': - ws_scheme = 'wss' + ws_scheme = "ws" + if "HTTPS" in env or env["wsgi.url_scheme"] == "https": + ws_scheme = "wss" - if env['PATH_INFO'] == '/': - sr('200 OK', [('Content-Type', 'text/html')]) + if env["PATH_INFO"] == "/": + sr("200 OK", [("Content-Type", "text/html")]) output = """ @@ -47,24 +47,31 @@ def application(env, sr):

WebSocket

+
- + +
- """ % (ws_scheme, env['HTTP_HOST']) + """ % ( + ws_scheme, + env["HTTP_HOST"], + ) if sys.version_info[0] > 2: - return output.encode('latin1') - return output - elif env['PATH_INFO'] == '/favicon.ico': - return "" - elif env['PATH_INFO'] == '/foobar/': - uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', '')) + return [output.encode("latin1")] + return [output] + elif env["PATH_INFO"] == "/favicon.ico": + return [""] + elif env["PATH_INFO"] == "/foobar/": + uwsgi.websocket_handshake( + env["HTTP_SEC_WEBSOCKET_KEY"], env.get("HTTP_ORIGIN", "") + ) print("websockets...") - r = redis.StrictRedis(host='localhost', port=6379, db=0) + r = redis.StrictRedis(host="localhost", port=6379, db=0) channel = r.pubsub() - channel.subscribe('foobar') + channel.subscribe("foobar") websocket_fd = uwsgi.connection_fd() redis_fd = channel.connection._sock.fileno() @@ -78,18 +85,23 @@ def application(env, sr): if fd == websocket_fd: msg = uwsgi.websocket_recv_nb() if msg: - r.publish('foobar', msg) + r.publish("foobar", msg.decode("utf-8")) elif fd == redis_fd: msg = channel.parse_response() print(msg) # only interested in user messages - t = 'message' + t = "message" if sys.version_info[0] > 2: - t = b'message' + t = b"message" if msg[0] == t: - uwsgi.websocket_send("[%s] %s" % (time.time(), msg)) + uwsgi.websocket_send( + ( + "[%s] %s" + % (time.time(), [m.decode("utf-8") for m in msg]) + ).encode("utf-8") + ) else: # on timeout call websocket_recv_nb again to manage ping/pong msg = uwsgi.websocket_recv_nb() if msg: - r.publish('foobar', msg) + r.publish("foobar", msg.decode("utf-8")) diff --git a/tests/websockets_chat_asyncio.py b/tests/websockets_chat_asyncio.py index 849399e34d..ecee11d567 100644 --- a/tests/websockets_chat_asyncio.py +++ b/tests/websockets_chat_asyncio.py @@ -4,6 +4,13 @@ import asyncio_redis import time import greenlet +import os, sys +import pprint + +try: + import cffi_setup_asyncio +except ImportError: + pass class GreenFuture(asyncio.Future): @@ -21,16 +28,16 @@ def result(self): @asyncio.coroutine def redis_open(f): - connection = yield from asyncio_redis.Connection.create(host='localhost', port=6379) + connection = yield from asyncio_redis.Connection.create(host="localhost", port=6379) f.set_result(connection) f.greenlet.switch() @asyncio.coroutine def redis_subscribe(f): - connection = yield from asyncio_redis.Connection.create(host='localhost', port=6379) + connection = yield from asyncio_redis.Connection.create(host="localhost", port=6379) subscriber = yield from connection.start_subscribe() - yield from subscriber.subscribe(['foobar']) + yield from subscriber.subscribe(["foobar"]) f.set_result(subscriber) f.greenlet.switch() @@ -49,18 +56,34 @@ def redis_wait(subscriber, f): @asyncio.coroutine def redis_publish(connection, msg): - yield from connection.publish('foobar', msg.decode('utf-8')) + yield from connection.publish("foobar", msg) -def application(env, sr): +async def async_wait(delay, gl): + await asyncio.sleep(delay) + gl.switch() + + +def trickle(env, sr): + sr("200 OK", [("Content-Type", "text/html; charset=UTF-8")]) + + gl = greenlet.getcurrent() - ws_scheme = 'ws' - if 'HTTPS' in env or env['wsgi.url_scheme'] == 'https': - ws_scheme = 'wss' + for i in range(32): + yield f"

Hello {i}

\n".encode("utf-8") + # or asyncio.get_event_loop().create_task() + asyncio.Task(async_wait(1, gl)) + gl.parent.switch() - if env['PATH_INFO'] == '/': - sr('200 OK', [('Content-Type', 'text/html')]) - return (""" + +def application(env, sr): + ws_scheme = "ws" + if "HTTPS" in env or env["wsgi.url_scheme"] == "https": + ws_scheme = "wss" + + if env["PATH_INFO"] == "/": + sr("200 OK", [("Content-Type", "text/html; charset=UTF-8")]) + output = """ -

WebSocket

+

WebSocket (asyncio)

+
- + +
- """ % (ws_scheme, env['HTTP_HOST'])).encode() - elif env['PATH_INFO'] == '/favicon.ico': - return b"" - elif env['PATH_INFO'] == '/foobar/': - uwsgi.websocket_handshake() + """ % ( + ws_scheme, + env["HTTP_HOST"], + ) + + return [output.encode("utf-8")] + elif env["PATH_INFO"] == "/favicon.ico": + sr("200 OK", [("Content-Type", "image/x-icon")]) + return [b""] + + elif env["PATH_INFO"] == "/trickle/": + return trickle(env, sr) + + elif env["PATH_INFO"] == "/foobar/": + uwsgi.websocket_handshake( + env["HTTP_SEC_WEBSOCKET_KEY"], env.get("HTTP_ORIGIN", "") + ) print("websockets...") # a future for waiting for redis connection f = GreenFuture() @@ -117,7 +154,8 @@ def application(env, sr): myself = greenlet.getcurrent() myself.has_ws_msg = False # start monitoring websocket events - asyncio.get_event_loop().add_reader(uwsgi.connection_fd(), ws_recv_msg, myself) + fd = uwsgi.connection_fd() + asyncio.get_event_loop().add_reader(fd, ws_recv_msg, myself) # add a 4 seconds timer to manage ping/pong asyncio.get_event_loop().call_later(4, ws_recv_msg, myself) @@ -133,7 +171,7 @@ def application(env, sr): # any redis message in the queue ? if f.done(): msg = f.result() - uwsgi.websocket_send("[%s] %s" % (time.time(), msg)) + uwsgi.websocket_send(("[%s] %s" % (time.time(), msg)).encode("utf-8")) # restart coroutine f = GreenFuture() asyncio.Task(redis_wait(subscriber, f)) @@ -141,6 +179,27 @@ def application(env, sr): myself.has_ws_msg = False msg = uwsgi.websocket_recv_nb() if msg: - asyncio.Task(redis_publish(connection, msg)) + asyncio.Task(redis_publish(connection, msg.decode("utf-8"))) # switch again f.greenlet.parent.switch() + + +def trace_calls_and_returns(frame, event, arg): + co = frame.f_code + func_name = co.co_name + if func_name == "write": + # Ignore write() calls from print statements + return + line_no = frame.f_lineno + filename = co.co_filename + if event == "call": + print("Call to %s on line %s of %s" % (func_name, line_no, filename)) + return trace_calls_and_returns + elif event == "return": + print("%s => %s" % (func_name, arg)) + # else: + # print(event) + return + + +# sys.settrace(trace_calls_and_returns) diff --git a/uwsgiconfig.py b/uwsgiconfig.py index 1712e54516..74efb4b456 100644 --- a/uwsgiconfig.py +++ b/uwsgiconfig.py @@ -460,7 +460,7 @@ def build_uwsgi(uc, print_only=False, gcll=None): continue path = path.rstrip('/') - path, up = get_plugin_up(path) + path, up = get_plugin_up(path, uc) p_cflags = cflags[:] p_cflags += up['CFLAGS'] @@ -1400,8 +1400,8 @@ def execfile(path, up): exec(code, up) -def get_plugin_up(path): - up = {} +def get_plugin_up(path, uc): + up = {'UWSGICONFIG':uc} if os.path.isfile(path): bname = os.path.basename(path) # override path @@ -1440,7 +1440,7 @@ def build_plugin(path, uc, cflags, ldflags, libs, name=None): git_dir = get_remote_plugin(path) path = os.path.abspath(git_dir) - path, up = get_plugin_up(path) + path, up = get_plugin_up(path, uc) p_cflags = cflags[:] p_cflags += up['CFLAGS']