Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 68 additions & 37 deletions src/tlo/methods/healthsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,66 @@ def get_appt_footprint_as_time_request(self, facility_info: FacilityInfo, appt_f

return appt_footprint_times

def get_total_minutes_of_this_officer_in_this_district(self, current_capabilities, _officer):
"""Returns the minutes of current capabilities for the officer identified (this officer type in this
facility_id)."""
return current_capabilities.get(_officer)

def get_total_minutes_of_this_officer_in_all_district(self, current_capabilities, _officer):
"""Returns the minutes of current capabilities for the officer identified in all districts (this officer
type in this all facilities of the same level in all districts)."""

def split_officer_compound_string(cs) -> Tuple[int, str]:
"""Returns (facility_id, officer_type) for the officer identified in the string of the form:
'FacilityID_{facility_id}_Officer_{officer_type}'."""
_, _facility_id, _, _officer_type = cs.split('_', 3) # (NB. Some 'officer_type' include "_")
return int(_facility_id), _officer_type

def _match(_this_officer, facility_ids: List[int], officer_type: str):
"""Returns True if the officer identified is of the identified officer_type and is in one of the
facility_ids."""
this_facility_id, this_officer_type = split_officer_compound_string(_this_officer)
return (this_officer_type == officer_type) and (this_facility_id in facility_ids)

facility_id, officer_type = split_officer_compound_string(_officer)
facility_level = self._facility_by_facility_id[int(facility_id)].level
facilities_of_same_level_in_all_district = [
_fac.id for _fac in self._facilities_for_each_district[facility_level].values()
]

officers_in_the_same_level_in_all_districts = [
_officer for _officer in current_capabilities.keys() if
_match(_officer, facility_ids=facilities_of_same_level_in_all_district, officer_type=officer_type)
]

return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts)


def check_if_all_required_officers_have_nonzero_capabilities(self, expected_time_requests)-> bool:
"""Check if all officers required by the appt footprint are available to perform the HSI"""

ok_to_run = True

for officer in expected_time_requests.keys():
if self.compute_squeeze_factor_to_district_level:
availability = self.get_total_minutes_of_this_officer_in_this_district(self.capabilities_today, officer)
else:
availability = self.get_total_minutes_of_this_officer_in_all_district(self.capabilities_today, officer)

# If officer does not exist in the relevant facility, log warning and proceed as if availability = 0
if availability is None:
logger.warning(
key="message",
data=(f"Requested officer {officer} is not contemplated by health system. ")
)
availability = 0.0

if availability == 0.0:
ok_to_run = False

return ok_to_run


def get_squeeze_factors(self, footprints_per_event, total_footprint, current_capabilities,
compute_squeeze_factor_to_district_level: bool
):
Expand Down Expand Up @@ -1636,48 +1696,16 @@ def get_squeeze_factors(self, footprints_per_event, total_footprint, current_cap
(position in array matches that in the all_call_today list).
"""

def get_total_minutes_of_this_officer_in_this_district(_officer):
"""Returns the minutes of current capabilities for the officer identified (this officer type in this
facility_id)."""
return current_capabilities.get(_officer)

def get_total_minutes_of_this_officer_in_all_district(_officer):
"""Returns the minutes of current capabilities for the officer identified in all districts (this officer
type in this all facilities of the same level in all districts)."""

def split_officer_compound_string(cs) -> Tuple[int, str]:
"""Returns (facility_id, officer_type) for the officer identified in the string of the form:
'FacilityID_{facility_id}_Officer_{officer_type}'."""
_, _facility_id, _, _officer_type = cs.split('_', 3) # (NB. Some 'officer_type' include "_")
return int(_facility_id), _officer_type

def _match(_this_officer, facility_ids: List[int], officer_type: str):
"""Returns True if the officer identified is of the identified officer_type and is in one of the
facility_ids."""
this_facility_id, this_officer_type = split_officer_compound_string(_this_officer)
return (this_officer_type == officer_type) and (this_facility_id in facility_ids)

facility_id, officer_type = split_officer_compound_string(_officer)
facility_level = self._facility_by_facility_id[int(facility_id)].level
facilities_of_same_level_in_all_district = [
_fac.id for _fac in self._facilities_for_each_district[facility_level].values()
]

officers_in_the_same_level_in_all_districts = [
_officer for _officer in current_capabilities.keys() if
_match(_officer, facility_ids=facilities_of_same_level_in_all_district, officer_type=officer_type)
]

return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts)

# 1) Compute the load factors for each officer type at each facility that is
# called-upon in this list of HSIs
load_factor = {}
for officer, call in total_footprint.items():
if compute_squeeze_factor_to_district_level:
availability = get_total_minutes_of_this_officer_in_this_district(officer)
availability = self.get_total_minutes_of_this_officer_in_this_district(current_capabilities, officer)
else:
availability = get_total_minutes_of_this_officer_in_all_district(officer)
availability = self.get_total_minutes_of_this_officer_in_all_district(current_capabilities, officer)

# If officer does not exist in the relevant facility, log warning and proceed as if availability = 0
if availability is None:
Expand Down Expand Up @@ -2066,11 +2094,14 @@ def run_individual_level_events_in_mode_0_or_1(self,
_appt_footprint_before_running = event.EXPECTED_APPT_FOOTPRINT

# Mode 0: All HSI Event run, with no squeeze
# Mode 1: All HSI Events run with squeeze provided latter is not inf
# Mode 1: All HSI Events run provided all required officers have non-zero capabilities
ok_to_run = True

if self.mode_appt_constraints == 1 and squeeze_factor == float('inf'):
ok_to_run = False
if self.mode_appt_constraints == 1:
if event.expected_time_requests:
ok_to_run = self.check_if_all_required_officers_have_nonzero_capabilities(
event.expected_time_requests)


if ok_to_run:

Expand Down
119 changes: 29 additions & 90 deletions tests/test_healthsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def test_run_in_mode_1_with_capacity(tmpdir, seed):


@pytest.mark.slow
def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed):
def test_rescaling_capabilities_based_on_load_factors(tmpdir, seed):
# Capabilities should increase when a HealthSystem that has low capabilities changes mode with
# the option `scale_to_effective_capabilities` set to `True`.

Expand All @@ -404,6 +404,7 @@ def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed):
"directory": tmpdir,
"custom_levels": {
"tlo.methods.healthsystem": logging.DEBUG,
"tlo.methods.healthsystem.summary": logging.INFO
}
}, resourcefilepath=resourcefilepath
)
Expand Down Expand Up @@ -438,99 +439,37 @@ def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed):
hs_params['scale_to_effective_capabilities'] = True

# Run the simulation
sim.make_initial_population(n=popsize)
sim.simulate(end_date=end_date)
check_dtypes(sim)

# read the results
output = parse_log_file(sim.log_filepath, level=logging.DEBUG)

# Do the checks
assert len(output['tlo.methods.healthsystem']['HSI_Event']) > 0
hsi_events = output['tlo.methods.healthsystem']['HSI_Event']
hsi_events['date'] = pd.to_datetime(hsi_events['date']).dt.year

# Check that all squeeze factors were high in 2010, but not all were high in 2011
# thanks to rescaling of capabilities
assert (
hsi_events.loc[
(hsi_events['Person_ID'] >= 0) &
(hsi_events['Number_By_Appt_Type_Code'] != {}) &
(hsi_events['date'] == 2010),
'Squeeze_Factor'
] >= 100.0
).all() # All the events that had a non-blank footprint experienced high squeezing.
assert not (
hsi_events.loc[
(hsi_events['Person_ID'] >= 0) &
(hsi_events['Number_By_Appt_Type_Code'] != {}) &
(hsi_events['date'] == 2011),
'Squeeze_Factor'
] >= 100.0
).all() # All the events that had a non-blank footprint experienced high squeezing.


@pytest.mark.slow
def test_run_in_mode_1_with_almost_no_capacity(tmpdir, seed):
# Events should run but (for those with non-blank footprints) with high squeeze factors
# (Mode 1 -> elastic constraints)

# Establish the simulation object
sim = Simulation(
start_date=start_date,
seed=seed,
log_config={
"filename": "log",
"directory": tmpdir,
"custom_levels": {
"tlo.methods.healthsystem": logging.DEBUG,
}
}, resourcefilepath=resourcefilepath
)

# Define the service availability
service_availability = ['*']

# Register the core modules
sim.register(demography.Demography(),
simplified_births.SimplifiedBirths(),
enhanced_lifestyle.Lifestyle(),
healthsystem.HealthSystem(service_availability=service_availability,
capabilities_coefficient=0.0000001, # This will mean that capabilities are
# very close to 0 everywhere.
# (If the value was 0, then it would
# be interpreted as the officers NEVER
# being available at a facility,
# which would mean the HSIs should not
# run (as opposed to running with
# a very high squeeze factor)).
mode_appt_constraints=1),
symptommanager.SymptomManager(),
healthseekingbehaviour.HealthSeekingBehaviour(),
mockitis.Mockitis(),
chronicsyndrome.ChronicSyndrome()
)

# Run the simulation
sim.make_initial_population(n=popsize)
sim.make_initial_population(n=1000)
sim.simulate(end_date=end_date)
check_dtypes(sim)

# read the results
output = parse_log_file(sim.log_filepath, level=logging.DEBUG)

# Do the checks
assert len(output['tlo.methods.healthsystem']['HSI_Event']) > 0
hsi_events = output['tlo.methods.healthsystem']['HSI_Event']
# assert hsi_events['did_run'].all()
assert (
hsi_events.loc[(hsi_events['Person_ID'] >= 0) & (hsi_events['Number_By_Appt_Type_Code'] != {}),
'Squeeze_Factor'] >= 100.0
).all() # All the events that had a non-blank footprint experienced high squeezing.
assert (hsi_events.loc[hsi_events['Person_ID'] < 0, 'Squeeze_Factor'] == 0.0).all()

# Check that some Mockitis cures occurred (though health system)
assert any(sim.population.props['mi_status'] == 'P')
output = parse_log_file(sim.log_filepath, level=logging.INFO)
pd.set_option('display.max_columns', None)
summary = output['tlo.methods.healthsystem.summary']
capacity_by_officer_and_level = summary['Capacity_By_OfficerType_And_FacilityLevel']

# Filter rows for the two years
row_2010 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2010-12-31"].squeeze()
row_2011 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2011-12-31"].squeeze()

# Dictionary to store results
results = {}

# Check that load has significantly reduced in second year, thanks to the significant
# rescaling of capabilities.
# (There is some degeneracy here, in that load could also be reduced due to declining demand.
# However it is extremely unlikely that demand for care would have dropped by a factor of 100
# in second year, hence this is a fair test).
for col in capacity_by_officer_and_level.columns:
if col == "date":
continue # skip the date column
if not (capacity_by_officer_and_level[col] == 0).any(): # check column is not all zeros
ratio = row_2010[col] / row_2011[col]

results[col] = ratio > 100

assert all(results.values())


@pytest.mark.slow
Expand Down
Loading