Skip to content
This repository was archived by the owner on Jul 1, 2021. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions trinity/sync/common/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
NamedTuple,
)

from async_service import Service, background_asyncio_service
import async_service
from async_service import Service, background_asyncio_service, external_asyncio_api

from eth.abc import BlockHeaderAPI

Expand Down Expand Up @@ -117,9 +118,18 @@ def __init__(self,
self._fetched_headers = asyncio.Queue(max_pending_headers)

async def next_skeleton_segment(self) -> AsyncIterator[Tuple[BlockHeader, ...]]:
# @external_asyncio_api can not be used on `next_skeleton_segment` directly because
# it returns a generator. Maybe that could be fixed though?
@external_asyncio_api
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can not use @external_asyncio_api directly on async def next_skeleton_segment then this feels like abusing the decorator I guess

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed. It doesn't take too much work in async-service to expose the new API though, AFAICT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async def _cancellable_fetch_headers_get(_: Service) -> Tuple[BlockHeader, ...]:
return await self._fetched_headers.get()

while self.manager.is_running:
yield await self._fetched_headers.get()
self._fetched_headers.task_done()
try:
yield await _cancellable_fetch_headers_get(self)
self._fetched_headers.task_done()
except async_service.exceptions.LifecycleError:
break

async def run(self) -> None:
self.manager.run_daemon_task(self._display_stats)
Expand Down Expand Up @@ -976,6 +986,7 @@ async def _full_skeleton_sync(self, skeleton_syncer: SkeletonSyncer[TChainPeer])

previous_segment = first_segment
async for segment in skeleton_generator:

self._stitcher.register_tasks(segment, ignore_duplicates=True)

gap_length = segment[0].block_number - previous_segment[-1].block_number - 1
Expand Down