Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enterprise Form Report Iterators #35253

Closed
wants to merge 5 commits into from
Closed

Conversation

mjriley
Copy link
Contributor

@mjriley mjriley commented Oct 23, 2024

Product Description

Technical Summary

Feature Flag

Safety Assurance

Safety story

Automated test coverage

QA Plan

Migrations

  • The migrations in this code can be safely applied first independently of the code

Rollback instructions

  • This PR can be reverted after deploy with no further considerations

Labels & Review

  • Risk label is set correctly
  • The set of people pinged as reviewers is appropriate for the level of risk of the change



class ResumableIteratorWrapperTests(SimpleTestCase):
def test_can_iterate_through_a_wrapped_iterator(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_can_iterate_through_a_wrapped_iterator(self):
def test_can_iterate_through_an_iterator(self):

}

if limit:
next_params = self.objects.get_next_query_params()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will list.get_next_query_params() return?

Comment on lines +28 to +43
domain_index = domains.index(last_domain) if last_domain else 0

def _get_domain_iterator(last_time=None, last_id=None):
if domain_index >= len(domains):
return None
domain = domains[domain_index]
return domain_form_generator(domain, start_date, end_date, last_time, last_id)

current_iterator = _get_domain_iterator(last_time, last_id)

while current_iterator:
yield from current_iterator
domain_index += 1
if domain_index >= len(domains):
break
current_iterator = _get_domain_iterator()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The while loop with domain_index was hard to follow. Here's a suggestion of how itertools.dropwhile could be used with a for loop.

Suggested change
domain_index = domains.index(last_domain) if last_domain else 0
def _get_domain_iterator(last_time=None, last_id=None):
if domain_index >= len(domains):
return None
domain = domains[domain_index]
return domain_form_generator(domain, start_date, end_date, last_time, last_id)
current_iterator = _get_domain_iterator(last_time, last_id)
while current_iterator:
yield from current_iterator
domain_index += 1
if domain_index >= len(domains):
break
current_iterator = _get_domain_iterator()
from itertools import dropwhile # TODO move to top of module
last_plus_remains = dropwhile(lambda d: d != last_domain, domains)
remaining_domains = dropwhile(lambda d: d == last_domain, last_plus_remains)
for domain in remaining_domains:
yield from domain_form_generator(domain, start_date, end_date, last_time, last_id)


def domain_form_generator(domain, start_date, end_date, last_time=None, last_id=None):
if not last_time:
last_time = datetime.now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should use utcnow (or whatever the equivalent non-deprecated version of that is).

If last_time is passed in it will be a string because it was retrieved from the request. Does it matter to be mixing types like that?

Comment on lines +53 to +61
for form in results.hits:
last_form_fetched = form
yield last_form_fetched

if len(results.hits) >= results.total:
break
else:
last_time = last_form_fetched['received_on']
last_id = last_form_fetched['_id']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will line 60 raise NameError or use the previous value of last_form_fetched (which is probably wrong and I think will result in an infinite loop) if results.hits is empty?

Suggested change
for form in results.hits:
last_form_fetched = form
yield last_form_fetched
if len(results.hits) >= results.total:
break
else:
last_time = last_form_fetched['received_on']
last_id = last_form_fetched['_id']
yield from results.hits
if not results.hits or len(results.hits) >= results.total:
break
last_form_fetched = results.hits[-1]
last_time = last_form_fetched['received_on']
last_id = last_form_fetched['_id']

The >= operation implies that it is possible for len(results.hits) to be greater than results.total. Out of curiosity, what would that mean? Sounds like ES would have returned more results than it though were available, which seems like a contradiction.

last_id = last_form_fetched['_id']


def create_domain_query(domain, start_date, end_date, last_time, last_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name made me think it was building a query that returned domains.

Suggested change
def create_domain_query(domain, start_date, end_date, last_time, last_id):
def create_domain_forms_query(domain, start_date, end_date, last_time, last_id):

corehq/apps/enterprise/iterators.py Show resolved Hide resolved
include_form_id=True,
)

return get_enterprise_form_iterator(account, start_date, end_date, last_domain, last_time, last_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the page size limit be passed in here? Might be able to retrieve that with something like

limit = self.paginator.get_limit()


query.es_query['sort'] = [
{'received_on': {'order': 'desc'}},
{'form.meta.instanceID': 'asc'}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how it works in ES, but in SQL this could result in an inefficient query if there is no index that can be used with this sort criteria. Have you looked into the performance of ES with sorting large result sets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@esoergel mentioned that Elasticsearch essentially indexes all fields. I believe this follows how we retrieve cases

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's right, this is the same approach we use there. ES stores everything you send it, and it makes everything in the mapping file available for querying. It's indexing and filter caching mechanisms are kinda black-boxy as best as I can tell, but this is for sure in line with how querying and sorting are expected to work.

Comment on lines +26 to +29
if self.is_complete:
return None
if not self.iteration_started:
return {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a meaningful difference between these return values on the client? If not, would it work to return an empty dict if the iteration is complete? It would simplify the return type (always returns a dict).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal was to be able to use this value to determine whether the iterator was complete or not, but as you recommended, I've removed this class now

"""
limit = self.get_limit()

objects = list(islice(self.objects, limit if limit else None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean if limit evaluates to false? I assume that would mean no limit. Should that not be impossible, given that this class has a max_limit parameter?

Suggested change
objects = list(islice(self.objects, limit if limit else None))
assert limit, f"page limit is required (got {limit!r})"
objects = list(islice(self.objects, limit))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is trying to mimic what Tastypie does. You can see in its Paginator code that it allows the possibility of no limit. It mentions that here

Comment on lines +79 to +80
next_params = self.objects.get_next_query_params()
if next_params:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assuming that self.objects is an instance of ResumableIteratorWrapper? Could ResumableIteratorWrapper be eliminated or dramatically simplified (remove all iteration logic) since we are doing the iteration above in this method (list(islice(self.objects, ...)?

Suggested change
next_params = self.objects.get_next_query_params()
if next_params:
if objects:
next_params = self.objects.get_element_properties_fn(objects[-1])

Put another way, does anything other than this method iterate over self.objects? What happens if the limit is not applied as it is here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on this. It does seem to simplify the logic if the paginator is responsible for resolving next parameters.

Comment on lines +76 to +77
{'received_on': {'order': 'desc'}},
{'form.meta.instanceID': 'asc'}
Copy link
Contributor

@esoergel esoergel Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'd actually recommend inserted_at rather than received_on. received_on is the timestamp of when we receive the form, and inserted_at is a timestamp applied just before it gets sent to ES. The risk with using received_on is that delays in processing mean you might see documents with an earlier received_on date appearing in ES after others with a later date. If you're paginating over the data as it's being amended, this could cause some of the most recent submissions to get skipped. To be sure, that's a bit of an edge case, but there's an easy enough mitigation.

You can also use doc_id instead of form.meta.instanceID - not sure if it's any faster, but less nesting seems nice.

@mjriley mjriley closed this Nov 12, 2024
@mjriley
Copy link
Contributor Author

mjriley commented Nov 12, 2024

Closed this PR with the intention of moving the 'active' PR to #35295

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants