Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enterprise Form Report Iterators #35253

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion corehq/apps/accounting/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def autopay_card(self):

def get_domains(self):
return list(Subscription.visible_objects.filter(account_id=self.id, is_active=True).values_list(
'subscriber__domain', flat=True))
'subscriber__domain', flat=True).order_by('subscriber__domain'))

def has_enterprise_admin(self, email):
lower_emails = [e.lower() for e in self.enterprise_admin_emails]
Expand Down
86 changes: 86 additions & 0 deletions corehq/apps/enterprise/api/keyset_paginator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from itertools import islice
from django.http.request import QueryDict
from urllib.parse import urlencode
from tastypie.paginator import Paginator


class KeysetPaginator(Paginator):
def __init__(self, request_data, objects,
resource_uri=None, limit=None, max_limit=1000, collection_name='objects'):
'objects is expected to be an iterator'
super().__init__(
request_data,
objects,
resource_uri=resource_uri,
limit=limit,
max_limit=max_limit,
collection_name=collection_name
)

def get_offset(self):
raise NotImplementedError()

def get_slice(self, limit, offset):
raise NotImplementedError()

def get_count(self):
raise NotImplementedError()

def get_previous(self, limit, offset):
raise NotImplementedError()

def get_next(self, limit, **next_params):
return self._generate_uri(limit, **next_params)

def _generate_uri(self, limit, **next_params):
if self.resource_uri is None:
return None

if isinstance(self.request_data, QueryDict):
# Because QueryDict allows multiple values for the same key, we need to remove existing values
# prior to updating
request_params = self.request_data.copy()
if 'limit' in request_params:
del request_params['limit']
for key in next_params:
if key in request_params:
del request_params[key]

request_params.update({'limit': str(limit), **next_params})
encoded_params = request_params.urlencode()
else:
request_params = {}
for k, v in self.request_data.items():
if isinstance(v, str):
request_params[k] = v.encode('utf-8')
else:
request_params[k] = v

request_params.update({'limit': limit, **next_params})
encoded_params = urlencode(request_params)

return '%s?%s' % (
self.resource_uri,
encoded_params
)

def page(self):
"""
Generates all pertinent data about the requested page.
"""
limit = self.get_limit()

objects = list(islice(self.objects, limit if limit else None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean if limit evaluates to false? I assume that would mean no limit. Should that not be impossible, given that this class has a max_limit parameter?

Suggested change
objects = list(islice(self.objects, limit if limit else None))
assert limit, f"page limit is required (got {limit!r})"
objects = list(islice(self.objects, limit))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is trying to mimic what Tastypie does. You can see in its Paginator code that it allows the possibility of no limit. It mentions that here

meta = {
'limit': limit,
}

if limit:
next_params = self.objects.get_next_query_params()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will list.get_next_query_params() return?

if next_params:
Comment on lines +79 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assuming that self.objects is an instance of ResumableIteratorWrapper? Could ResumableIteratorWrapper be eliminated or dramatically simplified (remove all iteration logic) since we are doing the iteration above in this method (list(islice(self.objects, ...)?

Suggested change
next_params = self.objects.get_next_query_params()
if next_params:
if objects:
next_params = self.objects.get_element_properties_fn(objects[-1])

Put another way, does anything other than this method iterate over self.objects? What happens if the limit is not applied as it is here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on this. It does seem to simplify the logic if the paginator is responsible for resolving next parameters.

meta['next'] = self.get_next(limit, **next_params)

return {
self.collection_name: objects,
'meta': meta,
}
40 changes: 20 additions & 20 deletions corehq/apps/enterprise/api/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
from corehq.apps.api.resources import HqBaseResource
from corehq.apps.api.resources.auth import ODataAuthentication
from corehq.apps.api.resources.meta import get_hq_throttle
from corehq.apps.enterprise.enterprise import (
EnterpriseReport,
)
from corehq.apps.enterprise.api.keyset_paginator import KeysetPaginator
from corehq.apps.enterprise.enterprise import EnterpriseReport
from corehq.apps.enterprise.iterators import get_enterprise_form_iterator

from corehq.apps.enterprise.tasks import generate_enterprise_report, ReportTaskProgress

Expand Down Expand Up @@ -351,6 +351,7 @@ def get_primary_keys(self):

class FormSubmissionResource(ODataEnterpriseReportResource):
class Meta(ODataEnterpriseReportResource.Meta):
paginator_class = KeysetPaginator
limit = 10000
max_limit = 20000

Expand All @@ -363,26 +364,25 @@ class Meta(ODataEnterpriseReportResource.Meta):

REPORT_SLUG = EnterpriseReport.FORM_SUBMISSIONS

def get_report_task(self, request):
enddate = datetime.strptime(request.GET['enddate'], '%Y-%m-%d') if 'enddate' in request.GET else None
startdate = datetime.strptime(request.GET['startdate'], '%Y-%m-%d') if 'startdate' in request.GET else None
def get_object_list(self, request):
# TODO: logic to handle when the start/end date are null or last 30 days
start_date = request.GET.get('start_date', None)
end_date = request.GET.get('end_date', None)
last_domain = request.GET.get('last_domain', None)
last_time = request.GET.get('last_time', None)
last_id = request.GET.get('last_id', None)

account = BillingAccount.get_account_by_domain(request.domain)
return generate_enterprise_report.s(
self.REPORT_SLUG,
account.id,
request.couch_user.username,
start_date=startdate,
end_date=enddate,
include_form_id=True,
)

return get_enterprise_form_iterator(account, start_date, end_date, last_domain, last_time, last_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the page size limit be passed in here? Might be able to retrieve that with something like

limit = self.paginator.get_limit()


def dehydrate(self, bundle):
bundle.data['form_id'] = bundle.obj[0]
bundle.data['form_name'] = bundle.obj[1]
bundle.data['submitted'] = self.convert_datetime(bundle.obj[2])
bundle.data['app_name'] = bundle.obj[3]
bundle.data['mobile_user'] = bundle.obj[4]
bundle.data['domain'] = bundle.obj[6]
bundle.data['form_id'] = bundle.obj['form']['meta']['instanceID']
bundle.data['form_name'] = bundle.obj['@name']
bundle.data['submitted'] = self.convert_datetime(bundle.obj['received_on'])
bundle.data['app_name'] = 'App Name' # TODO bundle.obj[3]
bundle.data['mobile_user'] = bundle.obj['form']['meta']['username']
bundle.data['domain'] = bundle.obj['domain']

return bundle

Expand Down
84 changes: 84 additions & 0 deletions corehq/apps/enterprise/iterators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,91 @@
from datetime import datetime
from corehq.apps.es import filters
from corehq.apps.es.forms import FormES
from corehq.apps.enterprise.resumable_iterator_wrapper import ResumableIteratorWrapper


def raise_after_max_elements(it, max_elements, exception=None):
for total_yielded, ele in enumerate(it):
if total_yielded >= max_elements:
exception = exception or Exception('Too Many Elements')
raise exception

yield ele


def get_enterprise_form_iterator(account, start_date, end_date, last_domain=None, last_time=None, last_id=None):
domains = account.get_domains()

it = multi_domain_form_generator(domains, start_date, end_date, last_domain, last_time, last_id)
return ResumableIteratorWrapper(it, lambda ele: {
'domain': ele['domain'],
'received_on': ele['received_on'],
'id': ele['form']['meta']['instanceID']
})


def multi_domain_form_generator(domains, start_date, end_date, last_domain=None, last_time=None, last_id=None):
domain_index = domains.index(last_domain) if last_domain else 0

def _get_domain_iterator(last_time=None, last_id=None):
if domain_index >= len(domains):
return None
domain = domains[domain_index]
return domain_form_generator(domain, start_date, end_date, last_time, last_id)

current_iterator = _get_domain_iterator(last_time, last_id)

while current_iterator:
yield from current_iterator
domain_index += 1
if domain_index >= len(domains):
break
current_iterator = _get_domain_iterator()
Comment on lines +28 to +43
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The while loop with domain_index was hard to follow. Here's a suggestion of how itertools.dropwhile could be used with a for loop.

Suggested change
domain_index = domains.index(last_domain) if last_domain else 0
def _get_domain_iterator(last_time=None, last_id=None):
if domain_index >= len(domains):
return None
domain = domains[domain_index]
return domain_form_generator(domain, start_date, end_date, last_time, last_id)
current_iterator = _get_domain_iterator(last_time, last_id)
while current_iterator:
yield from current_iterator
domain_index += 1
if domain_index >= len(domains):
break
current_iterator = _get_domain_iterator()
from itertools import dropwhile # TODO move to top of module
last_plus_remains = dropwhile(lambda d: d != last_domain, domains)
remaining_domains = dropwhile(lambda d: d == last_domain, last_plus_remains)
for domain in remaining_domains:
yield from domain_form_generator(domain, start_date, end_date, last_time, last_id)



def domain_form_generator(domain, start_date, end_date, last_time=None, last_id=None):
if not last_time:
last_time = datetime.now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should use utcnow (or whatever the equivalent non-deprecated version of that is).

If last_time is passed in it will be a string because it was retrieved from the request. Does it matter to be mixing types like that?


while True:
query = create_domain_query(domain, start_date, end_date, last_time, last_id)
results = query.run()
for form in results.hits:
last_form_fetched = form
yield last_form_fetched

if len(results.hits) >= results.total:
break
else:
last_time = last_form_fetched['received_on']
last_id = last_form_fetched['_id']
Comment on lines +53 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will line 60 raise NameError or use the previous value of last_form_fetched (which is probably wrong and I think will result in an infinite loop) if results.hits is empty?

Suggested change
for form in results.hits:
last_form_fetched = form
yield last_form_fetched
if len(results.hits) >= results.total:
break
else:
last_time = last_form_fetched['received_on']
last_id = last_form_fetched['_id']
yield from results.hits
if not results.hits or len(results.hits) >= results.total:
break
last_form_fetched = results.hits[-1]
last_time = last_form_fetched['received_on']
last_id = last_form_fetched['_id']

The >= operation implies that it is possible for len(results.hits) to be greater than results.total. Out of curiosity, what would that mean? Sounds like ES would have returned more results than it though were available, which seems like a contradiction.



def create_domain_query(domain, start_date, end_date, last_time, last_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name made me think it was building a query that returned domains.

Suggested change
def create_domain_query(domain, start_date, end_date, last_time, last_id):
def create_domain_forms_query(domain, start_date, end_date, last_time, last_id):

CURSOR_SIZE = 5
mjriley marked this conversation as resolved.
Show resolved Hide resolved

query = (
FormES()
.domain(domain)
.user_type('mobile')
.submitted(gte=start_date, lte=end_date)
.size(CURSOR_SIZE)
)

query.es_query['sort'] = [
{'received_on': {'order': 'desc'}},
{'form.meta.instanceID': 'asc'}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how it works in ES, but in SQL this could result in an inefficient query if there is no index that can be used with this sort criteria. Have you looked into the performance of ES with sorting large result sets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@esoergel mentioned that Elasticsearch essentially indexes all fields. I believe this follows how we retrieve cases

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's right, this is the same approach we use there. ES stores everything you send it, and it makes everything in the mapping file available for querying. It's indexing and filter caching mechanisms are kinda black-boxy as best as I can tell, but this is for sure in line with how querying and sorting are expected to work.

Comment on lines +76 to +77
Copy link
Contributor

@esoergel esoergel Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'd actually recommend inserted_at rather than received_on. received_on is the timestamp of when we receive the form, and inserted_at is a timestamp applied just before it gets sent to ES. The risk with using received_on is that delays in processing mean you might see documents with an earlier received_on date appearing in ES after others with a later date. If you're paginating over the data as it's being amended, this could cause some of the most recent submissions to get skipped. To be sure, that's a bit of an edge case, but there's an easy enough mitigation.

You can also use doc_id instead of form.meta.instanceID - not sure if it's any faster, but less nesting seems nice.

]

if last_id:
query = query.filter(filters.OR(
filters.AND(
filters.term('received_on', last_time),
filters.range_filter('form.meta.instanceID', gt=last_id)
),
filters.range_filter('received_on', lt=last_time)
))
else:
query = query.submitted(lte=last_time)

return query
31 changes: 31 additions & 0 deletions corehq/apps/enterprise/resumable_iterator_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
class ResumableIteratorWrapper:
def __init__(self, sequence, get_element_properties_fn=None):
self.it = iter(sequence)
self.prev_element = None
self.iteration_started = False
self.is_complete = False

self.get_element_properties_fn = get_element_properties_fn
if not self.get_element_properties_fn:
self.get_element_properties_fn = lambda ele: {'value': ele}

def __iter__(self):
return self

def __next__(self):
self.iteration_started = True
try:
self.prev_element = next(self.it)
except StopIteration:
self.is_complete = True
raise

return self.prev_element

def get_next_query_params(self):
if self.is_complete:
return None
if not self.iteration_started:
return {}
Comment on lines +26 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a meaningful difference between these return values on the client? If not, would it work to return an empty dict if the iteration is complete? It would simplify the return type (always returns a dict).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal was to be able to use this value to determine whether the iterator was complete or not, but as you recommended, I've removed this class now


return self.get_element_properties_fn(self.prev_element)
Empty file.
66 changes: 66 additions & 0 deletions corehq/apps/enterprise/tests/api/keyset_paginator_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from django.test import SimpleTestCase
from django.http import QueryDict
from corehq.apps.enterprise.resumable_iterator_wrapper import ResumableIteratorWrapper
from corehq.apps.enterprise.api.keyset_paginator import KeysetPaginator


class KeysetPaginatorTests(SimpleTestCase):
def test_page_fetches_all_results_below_limit(self):
objects = ResumableIteratorWrapper(range(5))
paginator = KeysetPaginator(QueryDict(), objects, limit=10)
page = paginator.page()
self.assertEqual(page['objects'], [0, 1, 2, 3, 4])
self.assertEqual(page['meta'], {'limit': 10})

def test_page_includes_next_information_when_more_results_are_available(self):
objects = ResumableIteratorWrapper(range(5), lambda ele: {'next': ele})
paginator = KeysetPaginator(QueryDict(), objects, resource_uri='http://test.com/', limit=3)
page = paginator.page()
self.assertEqual(page['objects'], [0, 1, 2])
self.assertEqual(page['meta'], {'limit': 3, 'next': 'http://test.com/?limit=3&next=2'})

def test_does_not_include_duplicate_limits(self):
request_data = QueryDict(mutable=True)
request_data['limit'] = 3
objects = ResumableIteratorWrapper(range(5), lambda ele: {'next': ele})
paginator = KeysetPaginator(request_data, objects, resource_uri='http://test.com/')
page = paginator.page()
self.assertEqual(page['meta']['next'], 'http://test.com/?limit=3&next=2')

def test_supports_dict_request_data(self):
request_data = {
'limit': 3,
'some_param': 'yes'
}
objects = ResumableIteratorWrapper(range(5), lambda ele: {'next': ele})
paginator = KeysetPaginator(request_data, objects, resource_uri='http://test.com/')
page = paginator.page()
self.assertEqual(page['meta']['next'], 'http://test.com/?limit=3&some_param=yes&next=2')

def test_get_offset_not_implemented(self):
objects = ResumableIteratorWrapper(range(5))
paginator = KeysetPaginator(QueryDict(), objects)

with self.assertRaises(NotImplementedError):
paginator.get_offset()

def test_get_slice_not_implemented(self):
objects = ResumableIteratorWrapper(range(5))
paginator = KeysetPaginator(QueryDict(), objects)

with self.assertRaises(NotImplementedError):
paginator.get_slice(limit=10, offset=20)

def test_get_count_not_implemented(self):
objects = ResumableIteratorWrapper(range(5))
paginator = KeysetPaginator(QueryDict(), objects)

with self.assertRaises(NotImplementedError):
paginator.get_count()

def test_get_previous_not_implemented(self):
objects = ResumableIteratorWrapper(range(5))
paginator = KeysetPaginator(QueryDict(), objects)

with self.assertRaises(NotImplementedError):
paginator.get_previous(limit=10, offset=20)
Loading
Loading