diff --git a/src/tlo/methods/healthsystem.py b/src/tlo/methods/healthsystem.py index 0c81fe4026..11b6e0a1fe 100644 --- a/src/tlo/methods/healthsystem.py +++ b/src/tlo/methods/healthsystem.py @@ -1606,6 +1606,66 @@ def get_appt_footprint_as_time_request(self, facility_info: FacilityInfo, appt_f return appt_footprint_times + def get_total_minutes_of_this_officer_in_this_district(self, current_capabilities, _officer): + """Returns the minutes of current capabilities for the officer identified (this officer type in this + facility_id).""" + return current_capabilities.get(_officer) + + def get_total_minutes_of_this_officer_in_all_district(self, current_capabilities, _officer): + """Returns the minutes of current capabilities for the officer identified in all districts (this officer + type in this all facilities of the same level in all districts).""" + + def split_officer_compound_string(cs) -> Tuple[int, str]: + """Returns (facility_id, officer_type) for the officer identified in the string of the form: + 'FacilityID_{facility_id}_Officer_{officer_type}'.""" + _, _facility_id, _, _officer_type = cs.split('_', 3) # (NB. Some 'officer_type' include "_") + return int(_facility_id), _officer_type + + def _match(_this_officer, facility_ids: List[int], officer_type: str): + """Returns True if the officer identified is of the identified officer_type and is in one of the + facility_ids.""" + this_facility_id, this_officer_type = split_officer_compound_string(_this_officer) + return (this_officer_type == officer_type) and (this_facility_id in facility_ids) + + facility_id, officer_type = split_officer_compound_string(_officer) + facility_level = self._facility_by_facility_id[int(facility_id)].level + facilities_of_same_level_in_all_district = [ + _fac.id for _fac in self._facilities_for_each_district[facility_level].values() + ] + + officers_in_the_same_level_in_all_districts = [ + _officer for _officer in current_capabilities.keys() if + _match(_officer, facility_ids=facilities_of_same_level_in_all_district, officer_type=officer_type) + ] + + return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts) + + + def check_if_all_required_officers_have_nonzero_capabilities(self, expected_time_requests)-> bool: + """Check if all officers required by the appt footprint are available to perform the HSI""" + + ok_to_run = True + + for officer in expected_time_requests.keys(): + if self.compute_squeeze_factor_to_district_level: + availability = self.get_total_minutes_of_this_officer_in_this_district(self.capabilities_today, officer) + else: + availability = self.get_total_minutes_of_this_officer_in_all_district(self.capabilities_today, officer) + + # If officer does not exist in the relevant facility, log warning and proceed as if availability = 0 + if availability is None: + logger.warning( + key="message", + data=(f"Requested officer {officer} is not contemplated by health system. ") + ) + availability = 0.0 + + if availability == 0.0: + ok_to_run = False + + return ok_to_run + + def get_squeeze_factors(self, footprints_per_event, total_footprint, current_capabilities, compute_squeeze_factor_to_district_level: bool ): @@ -1636,48 +1696,16 @@ def get_squeeze_factors(self, footprints_per_event, total_footprint, current_cap (position in array matches that in the all_call_today list). """ - def get_total_minutes_of_this_officer_in_this_district(_officer): - """Returns the minutes of current capabilities for the officer identified (this officer type in this - facility_id).""" - return current_capabilities.get(_officer) - - def get_total_minutes_of_this_officer_in_all_district(_officer): - """Returns the minutes of current capabilities for the officer identified in all districts (this officer - type in this all facilities of the same level in all districts).""" - - def split_officer_compound_string(cs) -> Tuple[int, str]: - """Returns (facility_id, officer_type) for the officer identified in the string of the form: - 'FacilityID_{facility_id}_Officer_{officer_type}'.""" - _, _facility_id, _, _officer_type = cs.split('_', 3) # (NB. Some 'officer_type' include "_") - return int(_facility_id), _officer_type - - def _match(_this_officer, facility_ids: List[int], officer_type: str): - """Returns True if the officer identified is of the identified officer_type and is in one of the - facility_ids.""" - this_facility_id, this_officer_type = split_officer_compound_string(_this_officer) - return (this_officer_type == officer_type) and (this_facility_id in facility_ids) - - facility_id, officer_type = split_officer_compound_string(_officer) - facility_level = self._facility_by_facility_id[int(facility_id)].level - facilities_of_same_level_in_all_district = [ - _fac.id for _fac in self._facilities_for_each_district[facility_level].values() - ] - - officers_in_the_same_level_in_all_districts = [ - _officer for _officer in current_capabilities.keys() if - _match(_officer, facility_ids=facilities_of_same_level_in_all_district, officer_type=officer_type) - ] - return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts) # 1) Compute the load factors for each officer type at each facility that is # called-upon in this list of HSIs load_factor = {} for officer, call in total_footprint.items(): if compute_squeeze_factor_to_district_level: - availability = get_total_minutes_of_this_officer_in_this_district(officer) + availability = self.get_total_minutes_of_this_officer_in_this_district(current_capabilities, officer) else: - availability = get_total_minutes_of_this_officer_in_all_district(officer) + availability = self.get_total_minutes_of_this_officer_in_all_district(current_capabilities, officer) # If officer does not exist in the relevant facility, log warning and proceed as if availability = 0 if availability is None: @@ -2066,11 +2094,14 @@ def run_individual_level_events_in_mode_0_or_1(self, _appt_footprint_before_running = event.EXPECTED_APPT_FOOTPRINT # Mode 0: All HSI Event run, with no squeeze - # Mode 1: All HSI Events run with squeeze provided latter is not inf + # Mode 1: All HSI Events run provided all required officers have non-zero capabilities ok_to_run = True - if self.mode_appt_constraints == 1 and squeeze_factor == float('inf'): - ok_to_run = False + if self.mode_appt_constraints == 1: + if event.expected_time_requests: + ok_to_run = self.check_if_all_required_officers_have_nonzero_capabilities( + event.expected_time_requests) + if ok_to_run: diff --git a/tests/test_healthsystem.py b/tests/test_healthsystem.py index 62c6970196..8e2fb907f7 100644 --- a/tests/test_healthsystem.py +++ b/tests/test_healthsystem.py @@ -391,7 +391,7 @@ def test_run_in_mode_1_with_capacity(tmpdir, seed): @pytest.mark.slow -def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed): +def test_rescaling_capabilities_based_on_load_factors(tmpdir, seed): # Capabilities should increase when a HealthSystem that has low capabilities changes mode with # the option `scale_to_effective_capabilities` set to `True`. @@ -404,6 +404,7 @@ def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed): "directory": tmpdir, "custom_levels": { "tlo.methods.healthsystem": logging.DEBUG, + "tlo.methods.healthsystem.summary": logging.INFO } }, resourcefilepath=resourcefilepath ) @@ -438,99 +439,37 @@ def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed): hs_params['scale_to_effective_capabilities'] = True # Run the simulation - sim.make_initial_population(n=popsize) - sim.simulate(end_date=end_date) - check_dtypes(sim) - - # read the results - output = parse_log_file(sim.log_filepath, level=logging.DEBUG) - - # Do the checks - assert len(output['tlo.methods.healthsystem']['HSI_Event']) > 0 - hsi_events = output['tlo.methods.healthsystem']['HSI_Event'] - hsi_events['date'] = pd.to_datetime(hsi_events['date']).dt.year - - # Check that all squeeze factors were high in 2010, but not all were high in 2011 - # thanks to rescaling of capabilities - assert ( - hsi_events.loc[ - (hsi_events['Person_ID'] >= 0) & - (hsi_events['Number_By_Appt_Type_Code'] != {}) & - (hsi_events['date'] == 2010), - 'Squeeze_Factor' - ] >= 100.0 - ).all() # All the events that had a non-blank footprint experienced high squeezing. - assert not ( - hsi_events.loc[ - (hsi_events['Person_ID'] >= 0) & - (hsi_events['Number_By_Appt_Type_Code'] != {}) & - (hsi_events['date'] == 2011), - 'Squeeze_Factor' - ] >= 100.0 - ).all() # All the events that had a non-blank footprint experienced high squeezing. - - -@pytest.mark.slow -def test_run_in_mode_1_with_almost_no_capacity(tmpdir, seed): - # Events should run but (for those with non-blank footprints) with high squeeze factors - # (Mode 1 -> elastic constraints) - - # Establish the simulation object - sim = Simulation( - start_date=start_date, - seed=seed, - log_config={ - "filename": "log", - "directory": tmpdir, - "custom_levels": { - "tlo.methods.healthsystem": logging.DEBUG, - } - }, resourcefilepath=resourcefilepath - ) - - # Define the service availability - service_availability = ['*'] - - # Register the core modules - sim.register(demography.Demography(), - simplified_births.SimplifiedBirths(), - enhanced_lifestyle.Lifestyle(), - healthsystem.HealthSystem(service_availability=service_availability, - capabilities_coefficient=0.0000001, # This will mean that capabilities are - # very close to 0 everywhere. - # (If the value was 0, then it would - # be interpreted as the officers NEVER - # being available at a facility, - # which would mean the HSIs should not - # run (as opposed to running with - # a very high squeeze factor)). - mode_appt_constraints=1), - symptommanager.SymptomManager(), - healthseekingbehaviour.HealthSeekingBehaviour(), - mockitis.Mockitis(), - chronicsyndrome.ChronicSyndrome() - ) - - # Run the simulation - sim.make_initial_population(n=popsize) + sim.make_initial_population(n=1000) sim.simulate(end_date=end_date) check_dtypes(sim) # read the results - output = parse_log_file(sim.log_filepath, level=logging.DEBUG) - - # Do the checks - assert len(output['tlo.methods.healthsystem']['HSI_Event']) > 0 - hsi_events = output['tlo.methods.healthsystem']['HSI_Event'] - # assert hsi_events['did_run'].all() - assert ( - hsi_events.loc[(hsi_events['Person_ID'] >= 0) & (hsi_events['Number_By_Appt_Type_Code'] != {}), - 'Squeeze_Factor'] >= 100.0 - ).all() # All the events that had a non-blank footprint experienced high squeezing. - assert (hsi_events.loc[hsi_events['Person_ID'] < 0, 'Squeeze_Factor'] == 0.0).all() - - # Check that some Mockitis cures occurred (though health system) - assert any(sim.population.props['mi_status'] == 'P') + output = parse_log_file(sim.log_filepath, level=logging.INFO) + pd.set_option('display.max_columns', None) + summary = output['tlo.methods.healthsystem.summary'] + capacity_by_officer_and_level = summary['Capacity_By_OfficerType_And_FacilityLevel'] + + # Filter rows for the two years + row_2010 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2010-12-31"].squeeze() + row_2011 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2011-12-31"].squeeze() + + # Dictionary to store results + results = {} + + # Check that load has significantly reduced in second year, thanks to the significant + # rescaling of capabilities. + # (There is some degeneracy here, in that load could also be reduced due to declining demand. + # However it is extremely unlikely that demand for care would have dropped by a factor of 100 + # in second year, hence this is a fair test). + for col in capacity_by_officer_and_level.columns: + if col == "date": + continue # skip the date column + if not (capacity_by_officer_and_level[col] == 0).any(): # check column is not all zeros + ratio = row_2010[col] / row_2011[col] + + results[col] = ratio > 100 + + assert all(results.values()) @pytest.mark.slow