-
-
Notifications
You must be signed in to change notification settings - Fork 217
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #35265 from dimagi/es/module-badges
Module Badges Fixture
- Loading branch information
Showing
12 changed files
with
319 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
from lxml.builder import E | ||
|
||
from casexml.apps.phone.fixtures import FixtureProvider | ||
|
||
from corehq import extensions | ||
from corehq.apps.case_search.exceptions import CaseFilterError | ||
from corehq.apps.case_search.filter_dsl import build_filter_from_xpath | ||
from corehq.apps.es.case_search import CaseSearchES | ||
from corehq.messaging.templating import ( | ||
MessagingTemplateRenderer, | ||
NestedDictTemplateParam, | ||
) | ||
from corehq.toggles import MODULE_BADGES | ||
|
||
from .models import CSQLFixtureExpression | ||
|
||
|
||
def _get_user_template_info(restore_user): | ||
return { | ||
"username": restore_user.username, | ||
"uuid": restore_user.user_id, | ||
"user_data": restore_user.user_session_data, | ||
"location_ids": restore_user.get_location_ids(restore_user.domain), | ||
} | ||
|
||
|
||
def _get_template_renderer(restore_user): | ||
renderer = MessagingTemplateRenderer() | ||
renderer.set_context_param('user', NestedDictTemplateParam(_get_user_template_info(restore_user))) | ||
for name, param in custom_csql_fixture_context(restore_user.domain, restore_user): | ||
renderer.set_context_param(name, param) | ||
return renderer | ||
|
||
|
||
@extensions.extension_point | ||
def custom_csql_fixture_context(domain, restore_user): | ||
'''Register custom template params to be available in CSQL templates''' | ||
|
||
|
||
def _run_query(domain, csql): | ||
try: | ||
filter_ = build_filter_from_xpath(csql, domain=domain) | ||
except CaseFilterError: | ||
return "ERROR" | ||
return str(CaseSearchES() | ||
.domain(domain) | ||
.filter(filter_) | ||
.count()) | ||
|
||
|
||
def _get_indicator_nodes(restore_state, indicators): | ||
with restore_state.timing_context('_get_template_renderer'): | ||
renderer = _get_template_renderer(restore_state.restore_user) | ||
for name, csql_template in indicators: | ||
with restore_state.timing_context(name): | ||
value = _run_query(restore_state.domain, renderer.render(csql_template)) | ||
yield E.value(value, name=name) | ||
|
||
|
||
class CaseSearchFixtureProvider(FixtureProvider): | ||
id = 'case-search-fixture' | ||
|
||
def __call__(self, restore_state): | ||
if not MODULE_BADGES.enabled(restore_state.domain): | ||
return | ||
indicators = _get_indicators(restore_state.domain) | ||
if indicators: | ||
nodes = _get_indicator_nodes(restore_state, indicators) | ||
yield E.fixture(E.values(*nodes), id=self.id) | ||
|
||
|
||
def _get_indicators(domain): | ||
return list(CSQLFixtureExpression.by_domain(domain).values_list('name', 'csql')) | ||
|
||
|
||
case_search_fixture_generator = CaseSearchFixtureProvider() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
import uuid | ||
from unittest.mock import patch | ||
|
||
from django.test import TestCase | ||
|
||
from lxml import etree | ||
|
||
from casexml.apps.case.mock import CaseBlock | ||
from casexml.apps.phone.tests.utils import call_fixture_generator | ||
|
||
from corehq.apps.domain.shortcuts import create_domain | ||
from corehq.apps.es.case_search import case_search_adapter | ||
from corehq.apps.es.tests.utils import case_search_es_setup, es_test | ||
from corehq.apps.users.models import WebUser | ||
from corehq.tests.util.xml import assert_xml_equal, assert_xml_partial_equal | ||
from corehq.util.test_utils import flag_enabled | ||
|
||
from ..fixtures import _get_template_renderer, case_search_fixture_generator | ||
|
||
|
||
@flag_enabled('MODULE_BADGES') | ||
@es_test(requires=[case_search_adapter], setup_class=True) | ||
class TestCaseSearchFixtures(TestCase): | ||
domain_name = 'test-case-search-fixtures' | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
super().setUpClass() | ||
cls.domain_obj = create_domain(cls.domain_name) | ||
cls.user = WebUser.create(cls.domain_name, '[email protected]', 'secret', None, None) | ||
cls.restore_user = cls.user.to_ota_restore_user(cls.domain_name) | ||
case_search_es_setup(cls.domain_name, cls._get_case_blocks(cls.user.user_id)) | ||
cls.addClassCleanup(cls.domain_obj.delete) | ||
|
||
@staticmethod | ||
def _get_case_blocks(owner_id): | ||
def case_block(case_type, name, owner_id): | ||
return CaseBlock( | ||
case_id=str(uuid.uuid4()), | ||
case_type=case_type, | ||
case_name=name, | ||
owner_id=owner_id, | ||
create=True, | ||
) | ||
|
||
return [ | ||
case_block('client', 'Kleo', owner_id), | ||
case_block('client', 'Sven', owner_id), | ||
case_block('client', 'Thilo', '---'), | ||
case_block('place', 'Berlin', owner_id), | ||
case_block('place', 'Sirius B', '---'), | ||
] | ||
|
||
def render(self, template_string): | ||
return _get_template_renderer(self.restore_user).render(template_string) | ||
|
||
def generate_fixture(self): | ||
res = call_fixture_generator(case_search_fixture_generator, self.restore_user, self.domain_obj) | ||
return etree.tostring(next(res), encoding='utf-8') | ||
|
||
def test_no_interpolation(self): | ||
res = self.render("dob < '2020-01-01'") | ||
self.assertEqual(res, "dob < '2020-01-01'") | ||
|
||
def test_user_id(self): | ||
res = self.render("@owner_id = '{user.uuid}'") | ||
self.assertEqual(res, f"@owner_id = '{self.user.user_id}'") | ||
|
||
@patch('custom.bha.commcare_extensions.get_user_clinic_ids') | ||
def test_bha_custom_csql_fixture_context(self, get_user_clinic_ids): | ||
self.restore_user.domain = 'co-carecoordination' | ||
|
||
def reset_domain(): | ||
self.restore_user.domain = self.domain_name | ||
self.addCleanup(reset_domain) | ||
|
||
get_user_clinic_ids.return_value = "abc123 def456" | ||
res = self.render("selected(@owner_id, '{bha.user_clinic_ids}')") | ||
self.assertEqual(res, "selected(@owner_id, 'abc123 def456')") | ||
|
||
@patch('corehq.apps.case_search.fixtures._get_indicators') | ||
@patch('corehq.apps.case_search.fixtures._run_query') | ||
def test_fixture_generator(self, run_query, get_indicators): | ||
run_query.return_value = "42" | ||
get_indicators.return_value = [ | ||
('pre_pandemic_births', "dob < '2020-01-01'"), | ||
('owned_by_user', "@owner_id = '{user.uuid}'"), | ||
] | ||
|
||
expected = """ | ||
<fixture id="case-search-fixture"> | ||
<values> | ||
<value name="pre_pandemic_births">42</value> | ||
<value name="owned_by_user">42</value> | ||
</values> | ||
</fixture>""" | ||
assert_xml_equal(expected, self.generate_fixture()) | ||
|
||
@patch('corehq.apps.case_search.fixtures._get_indicators') | ||
def test_full_query(self, get_indicators): | ||
indicators = [ | ||
# (name, csql_template, expected_count) | ||
('owned_by_user', "@owner_id = '{user.uuid}'", 3), | ||
('total_clients', "@case_type = 'client'", 3), | ||
('own_clients', "@case_type = 'client' and @owner_id = '{user.uuid}'", 2), | ||
('bad_query', "this is not a valid query", "ERROR"), | ||
] | ||
get_indicators.return_value = [(name, csql_template) for name, csql_template, _ in indicators] | ||
|
||
res = self.generate_fixture() | ||
for name, _, expected in indicators: | ||
expected_xml = f'<partial><value name="{name}">{expected}</value></partial>' | ||
assert_xml_partial_equal(expected_xml, res, f'./values/value[@name="{name}"]') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from django.conf import settings | ||
|
||
from corehq.apps.case_search.fixtures import custom_csql_fixture_context | ||
from corehq.messaging.templating import SimpleDictTemplateParam | ||
|
||
from .util import get_user_clinic_ids, get_user_facility_ids | ||
|
||
BHA_DOMAINS = settings.CUSTOM_DOMAINS_BY_MODULE['custom.bha'] | ||
|
||
|
||
@custom_csql_fixture_context.extend(domains=BHA_DOMAINS) | ||
def bha_csql_fixture_context(domain, restore_user): | ||
facility_ids = get_user_facility_ids(domain, restore_user) | ||
return ('bha', SimpleDictTemplateParam({ | ||
'facility_ids': facility_ids, | ||
'user_clinic_ids': get_user_clinic_ids(domain, restore_user, facility_ids), | ||
})) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from corehq.apps.locations.tests.util import LocationHierarchyTestCase | ||
from corehq.apps.users.models import WebUser | ||
|
||
from ..util import get_user_facility_ids | ||
|
||
|
||
class UserFacilityTests(LocationHierarchyTestCase): | ||
domain = 'bha-user-facility-tests' | ||
location_type_names = ['state', 'registry', 'organization', 'facility', 'facility_data'] | ||
location_structure = [ | ||
('state-1', [ | ||
('registry-1', [ | ||
('organization-1', [ | ||
('facility-1a', []), | ||
('facility-1b', [ | ||
('facility_data-1b', []), | ||
]), | ||
]), | ||
('organization-2', [ | ||
('facility-2a', []), | ||
('facility-2b', []), | ||
]), | ||
]), | ||
]) | ||
] | ||
|
||
def test_get_user_facility_ids(self): | ||
user = WebUser.create(self.domain, '[email protected]', 'secret', None, None) | ||
user.add_to_assigned_locations(self.domain, self.locations['organization-1']) | ||
user.add_to_assigned_locations(self.domain, self.locations['facility-2a']) | ||
restore_user = user.to_ota_restore_user(self.domain) | ||
res = get_user_facility_ids(self.domain, restore_user) | ||
self.assertItemsEqual(res, [ | ||
self.locations[loc_name].location_id for loc_name in [ | ||
'facility-1a', | ||
'facility-1b', | ||
'facility-2a', | ||
] | ||
]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from corehq.apps.locations.models import SQLLocation | ||
from corehq.apps.es.case_search import CaseSearchES | ||
|
||
|
||
def get_user_facility_ids(domain, restore_user): | ||
# Facility locations the user is either directly assigned to, or which are | ||
# children of organizations the user is assigned to | ||
owned_locs = (restore_user.get_sql_locations(domain) | ||
.filter(location_type__code__in=['organization', 'facility'])) | ||
return list(SQLLocation.objects | ||
.get_queryset_descendants(owned_locs, include_self=True) | ||
.filter(location_type__code='facility') | ||
.location_ids()) | ||
|
||
|
||
def get_user_clinic_ids(domain, restore_user, facility_ids): | ||
return " ".join( | ||
CaseSearchES() | ||
.domain(domain) | ||
.case_type('clinic') | ||
.is_closed(False) | ||
.owner(facility_ids) | ||
.get_ids() | ||
) |
Oops, something went wrong.