Some thoughts on AsyncBufferedReader #1720
grant-allan-ctct
started this conversation in
Ideas
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I'm pretty new to this library and a learner when it comes to Python and especially asyncio, so there may be not much virtue in these ideas, but y'all seem like a friendly bunch so I'll put pen to paper anyway and see what y'all think.
Background info:
The
AsyncBufferedReader
holds aself.buffer
that's anasyncio.Queue
, as can be see here.Such a queue has an internal counter
_unfinished_tasks
that's incremented each time thatput_nowait
is called. Is also has an event-lock thingy called_finished
which has itsset()
method called at queue construction and itsclear()
method whenput_nowait
is called. Looking at the other places where_unfinished_tasks
and_finished
used, we can see how theasyncio.Queue
achieves the functionality that's documented for itsjoin()
method, viz. waiting until all queued items have been "processed" and then closing shop (see here).Wonderings:
Allow
async for
loops to end?The implementation of
__anext__
on theAsyncBufferedReader
class is written in such a way that iterating over the collection will never end, and further, the iterator seems willing to wait forever for the next message to arrive. To clarify, I'll mention the following example code snippet is taken from the docstring forAyncBufferedReader
here:If we write something like that in our code though, what happens that any lines of code that come afterwards are effectively unreachable. I realize that it's possible that this is exactly what we want, but there could be some other options:
_unfinished_tasks
that the queue is tracking, and have__anext__
raise aStopAsyncIteration
if it gets to 0. This will mark theasync for
loop as being complete, and program execution can then continue. In order to do this, we would need to make a call toself.buffer.task_done()
every time we successfully removed an item from the queue usingself.buffer.get()
, as described here.await asyncio.wait_for(self.buffer.get(), timeout)
inside__anext__
and check for occurrence of a timeout; if one occurs, then either:raise StopAsyncIteration
, orNone
and allow the universe to decide how to respond, e.g. maybe, or maybe not,break
out of the loopProvide an optional timeout for
get_message
?Speaking of timeouts, it might be nice for the
AsyncBufferedReader
to have a common default timeout that it can share between the__anext__
implementation and theget_message
implementation (here). And, ifget_message
were given an optionaltimeout
argument then__anext__
could just callreturn await get_message()
(depending on outcome of prior questions).Remove a smidge of DRY violation ?
Actually, seems like having
__anext__
just callreturn await get_message()
could happen already 😎 .Beta Was this translation helpful? Give feedback.
All reactions