Skip to content

Commit

Permalink
Merge pull request #192 from plotly/andrew/tweaks
Browse files Browse the repository at this point in the history
Andrew/tweaks
  • Loading branch information
ayjayt authored Feb 6, 2025
2 parents 1f06ee0 + b26a8ba commit 9c3abee
Show file tree
Hide file tree
Showing 40 changed files with 1,254 additions and 237 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/publish_testpypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- run: uv python pin ${{ matrix.python_v }}
if: ${{ matrix.python_v != '' }}
# don't modify sync file! messes up version!
- run: uv sync --all-extras --frozen # does order matter?
- run: uv sync --no-sources --all-extras --frozen # does order matter?
- run: uv build
- run: uv run --no-sync choreo_get_chrome -v --i ${{ matrix.chrome_v }}
- name: Reinstall from wheel
Expand Down Expand Up @@ -75,7 +75,7 @@ jobs:
with:
python-version-file: "pyproject.toml"
- run: git checkout ${{ github.ref_name }}
- run: uv sync --frozen --all-extras
- run: uv sync --no-sources --frozen --all-extras
- run: uv build
- name: Publish package distributions to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
Expand Down
14 changes: 7 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@ jobs:
timeout-minutes: 1

- name: Install choreographer
run: uv sync --all-extras
run: uv sync --no-sources --all-extras
- name: Install google-chrome-for-testing
run: uv run choreo_get_chrome
run: uv run --no-sources choreo_get_chrome
- name: Diagnose
run: uv run choreo_diagnose --no-run
run: uv run --no-sources choreo_diagnose --no-run
timeout-minutes: 1

- name: Test
if: ${{ ! runner.debug && matrix.os != 'ubuntu-latest' }}
run: uv run poe test
run: uv run --no-sources poe test
timeout-minutes: 7

- name: Test (Linux)
if: ${{ ! runner.debug && matrix.os == 'ubuntu-latest' }}
run: xvfb-run uv run poe test
run: xvfb-run uv run --no-sources poe test
timeout-minutes: 7

- name: Test (Debug)
if: runner.debug
run: uv run poe debug-test
run: uv run --no-sources poe debug-test

- name: Test (Debug, Linux)
if: ${{ runner.debug && matrix.os == 'ubuntu-latest' }}
run: xvfb-run uv run poe debug-test
run: xvfb-run uv run --no-sources poe debug-test
timeout-minutes: 7
4 changes: 4 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
v1.0.0
- Increase wait for checking regular close
- Decrease freeze for manual bad-close cleanup
- Squash race condition
- Improve parallelization by disabling site-per-process
- Add option for whole new window with create_tab
- General logging improvements
205 changes: 115 additions & 90 deletions choreographer/_brokers/_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ def __init__(self, browser: Browser, channel: ChannelInterface) -> None:
self.futures = {}
self._subscriptions_futures = {}

self._write_lock = asyncio.Lock()

def new_subscription_future(
self,
session_id: str,
subscription: str,
) -> asyncio.Future[Any]:
_logger.debug(
f"Session {session_id} is subscribing to {subscription} one time.",
)
if session_id not in self._subscriptions_futures:
self._subscriptions_futures[session_id] = {}
if subscription not in self._subscriptions_futures[session_id]:
Expand All @@ -84,132 +89,149 @@ def clean(self) -> None:
_logger.debug("Cancelling message futures")
for future in self.futures.values():
if not future.done():
_logger.debug(f"Cancelling {future}")
_logger.debug2(f"Cancelling {future}")
future.cancel()
_logger.debug("Cancelling read task")
if self._current_read_task and not self._current_read_task.done():
_logger.debug(f"Cancelling read: {self._current_read_task}")
_logger.debug2(f"Cancelling read: {self._current_read_task}")
self._current_read_task.cancel()
_logger.debug("Cancelling subscription-futures")
for session in self._subscriptions_futures.values():
for query in session.values():
for future in query:
if not future.done():
_logger.debug(f"Cancelling {future}")
_logger.debug2(f"Cancelling {future}")
future.cancel()
_logger.debug("Cancelling background tasks")
for task in self._background_tasks_cancellable:
if not task.done():
_logger.debug(f"Cancelling {task}")
_logger.debug2(f"Cancelling {task}")
task.cancel()

def run_read_loop(self) -> None: # noqa: C901, PLR0915 complexity
def check_error(result: asyncio.Future[Any]) -> None:
try:
e = result.exception()
if e:
self._background_tasks.add(
asyncio.create_task(self._browser.close()),
)
if not isinstance(e, asyncio.CancelledError):
_logger.error(f"Error in run_read_loop: {e!s}")
raise e
except asyncio.CancelledError:
self._background_tasks.add(asyncio.create_task(self._browser.close()))
def check_read_loop_error(result: asyncio.Future[Any]) -> None:
e = result.exception()
if e:
_logger.debug("Error in readloop. Will post a close() task.")
self._background_tasks.add(
asyncio.create_task(self._browser.close()),
)
if isinstance(e, channels.ChannelClosedError):
_logger.debug("PipeClosedError caught")
_logger.debug2("Full Error:", exc_info=e)
elif isinstance(e, asyncio.CancelledError):
_logger.debug("CancelledError caught.")
_logger.debug2("Full Error:", exc_info=e)
else:
_logger.error("Error in run_read_loop.", exc_info=e)
raise e

async def read_loop() -> None: # noqa: PLR0912, C901
try:
responses = await asyncio.to_thread(
self._channel.read_jsons,
blocking=True,
)
for response in responses:
error = protocol.get_error_from_result(response)
key = protocol.calculate_message_key(response)
if not key and error:
raise protocol.DevtoolsProtocolError(response)
self._check_for_closed_session(response)
# surrounding lines overlap in idea
if protocol.is_event(response):
event_session_id = response.get(
"sessionId",
"",
)
x = self._get_target_session_by_session_id(
event_session_id,
)
if not x:
continue
_, event_session = x
if not event_session:
_logger.error("Found an event that returned no session.")
continue

session_futures = self._subscriptions_futures.get(
event_session_id,
)
if session_futures:
for query in session_futures:
match = (
query.endswith("*")
and response["method"].startswith(query[:-1])
) or (response["method"] == query)
if match:
for future in session_futures[query]:
if not future.done():
future.set_result(response)
session_futures[query] = []

for query in list(event_session.subscriptions):
responses = await asyncio.to_thread(
self._channel.read_jsons,
blocking=True,
)
_logger.debug(f"Channel read found {len(responses)} json objects.")
for response in responses:
error = protocol.get_error_from_result(response)
key = protocol.calculate_message_key(response)
if not key and error:
raise protocol.DevtoolsProtocolError(response)

# looks for event that we should handle internally
self._check_for_closed_session(response)
# surrounding lines overlap in idea
if protocol.is_event(response):
event_session_id = response.get(
"sessionId",
"",
)
_logger.debug2(f"Is event for {event_session_id}")
x = self._get_target_session_by_session_id(
event_session_id,
)
if not x:
continue
_, event_session = x
if not event_session:
_logger.error("Found an event that returned no session.")
continue
_logger.debug(
f"Received event {response['method']} for "
f"{event_session_id} targeting {event_session}.",
)

session_futures = self._subscriptions_futures.get(
event_session_id,
)
_logger.debug2(
"Checking for event subscription future.",
)
if session_futures:
for query in session_futures:
match = (
query.endswith("*")
and response["method"].startswith(query[:-1])
) or (response["method"] == query)
_logger.debug2(
f"Checking subscription key: {query} "
f"against event method {response['method']}",
)
if match:
t: asyncio.Task[Any] = asyncio.create_task(
event_session.subscriptions[query][0](response),
_logger.debug2(
"Found event subscription future.",
)
self._background_tasks_cancellable.add(t)
if not event_session.subscriptions[query][1]:
event_session.unsubscribe(query)

elif key:
if key in self.futures:
_logger.debug(f"run_read_loop() found future for key {key}")
future = self.futures.pop(key)
elif "error" in response:
raise protocol.DevtoolsProtocolError(response)
else:
raise RuntimeError(f"Couldn't find a future for key: {key}")
future.set_result(response)
else:
warnings.warn( # noqa: B028
f"Unhandled message type:{response!s}",
UnhandledMessageWarning,
for future in session_futures[query]:
if not future.done():
future.set_result(response)
session_futures[query] = []

_logger.debug2(
"Checking for event subscription callback.",
)
for query in list(event_session.subscriptions):
match = (
query.endswith("*")
and response["method"].startswith(query[:-1])
) or (response["method"] == query)
_logger.debug2(
"Found event subscription callback.",
)
except channels.ChannelClosedError:
_logger.debug("PipeClosedError caught")
self._background_tasks.add(asyncio.create_task(self._browser.close()))
return
if match:
t: asyncio.Task[Any] = asyncio.create_task(
event_session.subscriptions[query][0](response),
)
self._background_tasks_cancellable.add(t)
if not event_session.subscriptions[query][1]:
event_session.unsubscribe(query)

elif key:
_logger.debug(f"Have a response with key {key}")
if key in self.futures:
_logger.debug(f"Found future for key {key}")
future = self.futures.pop(key)
elif "error" in response:
raise protocol.DevtoolsProtocolError(response)
else:
raise RuntimeError(f"Couldn't find a future for key: {key}")
future.set_result(response)
else:
warnings.warn(
f"Unhandled message type:{response!s}",
UnhandledMessageWarning,
stacklevel=1,
)
read_task = asyncio.create_task(read_loop())
read_task.add_done_callback(check_error)
read_task.add_done_callback(check_read_loop_error)
self._current_read_task = read_task

read_task = asyncio.create_task(read_loop())
read_task.add_done_callback(check_error)
read_task.add_done_callback(check_read_loop_error)
self._current_read_task = read_task

async def write_json(
self,
obj: protocol.BrowserCommand,
) -> protocol.BrowserResponse:
_logger.debug2(f"In broker.write_json for {obj}")
protocol.verify_params(obj)
key = protocol.calculate_message_key(obj)
_logger.debug1(f"Broker writing {obj['method']} with key {key}")
if not key:
raise RuntimeError(
"Message strangely formatted and "
Expand All @@ -220,7 +242,8 @@ async def write_json(
self.futures[key] = future
_logger.debug(f"Created future: {key} {future}")
try:
await asyncio.to_thread(self._channel.write_json, obj)
async with self._write_lock:
await asyncio.to_thread(self._channel.write_json, obj)
except BaseException as e: # noqa: BLE001
future.set_exception(e)
del self.futures[key]
Expand All @@ -247,6 +270,7 @@ def _check_for_closed_session(self, response: protocol.BrowserResponse) -> bool:
"",
)
if session_closed == "":
_logger.debug2("Found closed session through events.")
return True

x = self._get_target_session_by_session_id(session_closed)
Expand All @@ -262,6 +286,7 @@ def _check_for_closed_session(self, response: protocol.BrowserResponse) -> bool:
"'Target.detachedFromTarget'. "
f"Session {session_closed} was closed.",
)
_logger.debug2("Found closed session through events.")
return True
return False
else:
Expand Down
Loading

0 comments on commit 9c3abee

Please sign in to comment.