-
Notifications
You must be signed in to change notification settings - Fork 30
Simplify HDFDocumentComposer #858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Given I did not change any of the tests, we still pass in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks almost there but I spotted a few simplifications
async def collect_stream_docs( | ||
self, indices_written: int | ||
) -> AsyncIterator[StreamAsset]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, having seen the code it does no I/O, so we can put it back to a plain iterator
async def collect_stream_docs( | |
self, indices_written: int | |
) -> AsyncIterator[StreamAsset]: | |
def collect_stream_docs( | |
self, indices_written: int | |
) -> Iterator[StreamAsset]: |
async for doc in self._composer.collect_stream_docs(indices_written): | ||
yield doc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which simplifies to
async for doc in self._composer.collect_stream_docs(indices_written): | |
yield doc | |
yield from self._composer.collect_stream_docs(indices_written) |
if not self._composer: | ||
self._composer = HDFDocumentComposer( | ||
Path(await self.panda_data_block.hdf_directory.get_value()) | ||
/ Path(await self.panda_data_block.hdf_file_name.get_value()), | ||
self._datasets, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the reasoning to put it here rather than in open()
?
yield "stream_resource", doc | ||
for doc in self.composer.stream_data(indices_written): | ||
yield "stream_datum", doc | ||
if self.composer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should raise if self.composer is None
if self.composer: | |
if self.composer is None: | |
msg = "open() not called on {self}" | |
raise RuntimeError(msg) |
@shihab-dls please could we rebase this so we can merge it? |
Fixes #795
This PR should mostly remove duplicate code in writers that perform the same
collect_stream_docs
operation. As #795 requested tests to remain unchanged, writers still have theircollect_stream_docs
methods, but this simply calls the composers method.With this, composers can now be created in
open()
; however, as panda's writer creates its dataset in_update_datasets()
, I create the composer here instead.EDIT: I've moved this back to
collect_stream_docs
(for panda writer), as I think it's clearer here.