Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Task methods to make _asyncio more similar to cpython #8647

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
asyncio: Add Task methods to get tasks closer to CPython.
Adds methods that are in CPython, such as `exception`, `result`,
`get_coro`, `cancelled`, `add_done_callback`, and
`remove_done_callback`.

Also adds support for the unary hash so tasks may be collected in
a python set.
imnotjames committed Nov 22, 2023
commit adbb02d9e32fb3fba423fa523211643a68afe6fc
144 changes: 143 additions & 1 deletion extmod/modasyncio.c
Original file line number Diff line number Diff line change
@@ -46,6 +46,12 @@
(task)->state == TASK_STATE_DONE_NOT_WAITED_ON \
|| (task)->state == TASK_STATE_DONE_WAS_WAITED_ON)

#define IS_CANCELLED_ERROR(error) ( \
mp_obj_is_subclass_fast( \
MP_OBJ_FROM_PTR(mp_obj_get_type(error)), \
mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError)) \
))

typedef struct _mp_obj_task_t {
mp_pairheap_t pairheap;
mp_obj_t coro;
@@ -202,6 +208,114 @@ STATIC mp_obj_t task_done(mp_obj_t self_in) {
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_done_obj, task_done);

STATIC mp_obj_t task_add_done_callback(mp_obj_t self_in, mp_obj_t callback) {
assert(mp_obj_is_callable(callback));
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (TASK_IS_DONE(self)) {
// In CPython the callbacks are not immediately called and are instead
// called by the event loop. However, MicroPython's event loop doesn't
// support `call_soon` to handle callback processing.
//
// Because of this, it's close enough to call the callback immediately.

mp_call_function_2(callback, self_in, self->data);
return mp_const_none;
}

if (self->state != mp_const_true) {
// Tasks SHOULD support more than one callback per CPython but to reduce
// the surface area of this change tasks can currently only support one.
mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT(">1 callback unsupported"));
}

self->state = callback;
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_add_done_callback_obj, task_add_done_callback);

STATIC mp_obj_t task_remove_done_callback(mp_obj_t self_in, mp_obj_t callback) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (callback != self->state) {
// If the callback isn't a match we can count this as removing 0 callbacks
return mp_obj_new_int(0);
}

self->state = mp_const_true;
return mp_obj_new_int(1);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_remove_done_callback_obj, task_remove_done_callback);

STATIC mp_obj_t task_get_coro(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
return MP_OBJ_FROM_PTR(self->coro);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_get_coro_obj, task_get_coro);

STATIC mp_obj_t task_exception(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (!TASK_IS_DONE(self)) {
mp_obj_t error_type = mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_InvalidStateError));
nlr_raise(mp_make_raise_obj(error_type));
}

// If the exception is a cancelled error then we should raise it
if (IS_CANCELLED_ERROR(self->data)) {
nlr_raise(mp_make_raise_obj(self->data));
}

// If it's a StopIteration we should should return none
if (mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&mp_type_StopIteration))) {
return mp_const_none;
}

if (!mp_obj_is_exception_instance(self->data)) {
return mp_const_none;
}

return self->data;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_exception_obj, task_exception);

STATIC mp_obj_t task_result(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (!TASK_IS_DONE(self)) {
mp_obj_t error_type = mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_InvalidStateError));
nlr_raise(mp_make_raise_obj(error_type));
}

// If `exception()` returns anything we raise that
mp_obj_t exception_obj = task_exception(self_in);
if (exception_obj != mp_const_none) {
nlr_raise(mp_make_raise_obj(exception_obj));
}

// If not StopIteration, bail early
if (!mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&mp_type_StopIteration))) {
return mp_const_none;
}

return mp_obj_exception_get_value(self->data);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_result_obj, task_result);

STATIC mp_obj_t task_cancelled(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (!TASK_IS_DONE(self)) {
// If task isn't done it can't possibly be cancelled, and would instead
// be considered "cancelling" even if a cancel was requested until it
// has fully completed.
return mp_obj_new_bool(false);
}

return mp_obj_new_bool(IS_CANCELLED_ERROR(self->data));
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancelled_obj, task_cancelled);

STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
// Check if task is already finished.
@@ -276,6 +390,24 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
} else if (attr == MP_QSTR___await__) {
dest[0] = MP_OBJ_FROM_PTR(&task_await_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_add_done_callback) {
dest[0] = MP_OBJ_FROM_PTR(&task_add_done_callback_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_remove_done_callback) {
dest[0] = MP_OBJ_FROM_PTR(&task_remove_done_callback_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_get_coro) {
dest[0] = MP_OBJ_FROM_PTR(&task_get_coro_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_result) {
dest[0] = MP_OBJ_FROM_PTR(&task_result_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_exception) {
dest[0] = MP_OBJ_FROM_PTR(&task_exception_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_cancelled) {
dest[0] = MP_OBJ_FROM_PTR(&task_cancelled_obj);
dest[1] = self_in;
}
} else if (dest[1] != MP_OBJ_NULL) {
// Store
@@ -289,6 +421,15 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
}
}

STATIC mp_obj_t task_unary_op(mp_unary_op_t op, mp_obj_t o_in) {
switch (op) {
case MP_UNARY_OP_HASH:
return MP_OBJ_NEW_SMALL_INT((mp_uint_t)o_in);
default:
return MP_OBJ_NULL; // op not supported
}
}

STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
(void)iter_buf;
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
@@ -337,7 +478,8 @@ STATIC MP_DEFINE_CONST_OBJ_TYPE(
MP_TYPE_FLAG_ITER_IS_CUSTOM,
make_new, task_make_new,
attr, task_attr,
iter, &task_getiter_iternext
iter, &task_getiter_iternext,
unary_op, task_unary_op
);

/******************************************************************************/