From cb3ab6bcfdd1accbd6e4d64d652890f9ed1ca1b2 Mon Sep 17 00:00:00 2001 From: steovd <126480839+steovd@users.noreply.github.com> Date: Sat, 8 Feb 2025 12:41:24 +0100 Subject: [PATCH 1/2] add failing test --- tests/test_io.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/test_io.py b/tests/test_io.py index e3ff28159..e5cbb8c4e 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -118,6 +118,25 @@ async def test_outstream(anyio_backend, iopub_thread): assert stream.writable() +async def test_outstream_hooks(anyio_backend, iopub_thread): + session = Session() + + stream = OutStream(session, iopub_thread, "stdout") + + with stream: + hook_called = False + + def hook(msg): + nonlocal hook_called + hook_called = True + return msg + + stream.register_hook(hook) + stream.write("hi") + stream.flush() + assert hook_called + + @pytest.mark.anyio() async def test_event_pipe_gc(iopub_thread): session = Session(key=b"abc") From 97d329226c86dc9566abe20a38b20b15d27ddddc Mon Sep 17 00:00:00 2001 From: steovd <126480839+steovd@users.noreply.github.com> Date: Sat, 8 Feb 2025 12:44:00 +0100 Subject: [PATCH 2/2] fix OutStream hooks not being called --- ipykernel/iostream.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index cb4fc0525..08f3e9fe7 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -7,6 +7,7 @@ import atexit import contextvars +import functools import io import os import sys @@ -620,10 +621,19 @@ def flush(self): else: self._flush() + @property def _flush(self): + """Prepare _flush_impl partial to be scheduled on the IO thread. + + This indirection is necessary to ensure _flush_impl calls hooks + registered from the current thread (as they are thread-local). + """ + return functools.partial(self._flush_impl, self._hooks) + + def _flush_impl(self, hooks=()): """This is where the actual send happens. - _flush should generally be called in the IO thread, + _flush_impl should generally be called in the IO thread, unless the thread has been destroyed (e.g. forked subprocess). """ self._flush_pending = False @@ -648,7 +658,7 @@ def _flush(self): # Each transform either returns a new # message or None. If None is returned, # the message has been 'used' and we return. - for hook in self._hooks: + for hook in hooks: msg = hook(msg) if msg is None: return