Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 44 additions & 234 deletions src/tlo/methods/healthsystem.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime
import heapq as hp
import itertools
import math
import re
import warnings
from collections import Counter, defaultdict
Expand Down Expand Up @@ -362,7 +361,6 @@ def __init__(
use_funded_or_actual_staffing: Optional[str] = None,
disable: bool = False,
disable_and_reject_all: bool = False,
compute_squeeze_factor_to_district_level: bool = True,
hsi_event_count_log_period: Optional[str] = "month",
):
"""
Expand Down Expand Up @@ -396,8 +394,6 @@ def __init__(
logging) and every HSI event runs.
:param disable_and_reject_all: If ``True``, disable health system and no HSI
events run
:param compute_squeeze_factor_to_district_level: Whether to compute squeeze_factors to the district level, or
the national level (which effectively pools the resources across all districts).
:param hsi_event_count_log_period: Period over which to accumulate counts of HSI
events that have run before logging and reseting counters. Should be on of
strings ``'day'``, ``'month'``, ``'year'``. ``'simulation'`` to log at the
Expand Down Expand Up @@ -480,12 +476,6 @@ def __init__(
assert equip_availability in (None, 'default', 'all', 'none')
self.arg_equip_availability = equip_availability

# `compute_squeeze_factor_to_district_level` is a Boolean indicating whether the computation of squeeze_factors
# should be specific to each district (when `True`), or if the computation of squeeze_factors should be on the
# basis that resources from all districts can be effectively "pooled" (when `False).
assert isinstance(compute_squeeze_factor_to_district_level, bool)
self.compute_squeeze_factor_to_district_level = compute_squeeze_factor_to_district_level

# Create the Diagnostic Test Manager to store and manage all Diagnostic Test
self.dx_manager = DxManager(self)

Expand All @@ -504,10 +494,6 @@ def __init__(
# Create counter for the running total of footprint of all the HSIs being run today
self.running_total_footprint: Counter = Counter()

# A reusable store for holding squeeze factors in get_squeeze_factors()
self._get_squeeze_factors_store_grow = 500
self._get_squeeze_factors_store = np.zeros(self._get_squeeze_factors_store_grow)

self._hsi_event_count_log_period = hsi_event_count_log_period
if hsi_event_count_log_period in {"day", "month", "year", "simulation"}:
# Counters for binning HSI events run (by unique integer keys) over
Expand Down Expand Up @@ -1606,125 +1592,7 @@ def get_appt_footprint_as_time_request(self, facility_info: FacilityInfo, appt_f

return appt_footprint_times

def get_squeeze_factors(self, footprints_per_event, total_footprint, current_capabilities,
compute_squeeze_factor_to_district_level: bool
):
"""
This will compute the squeeze factors for each HSI event from the list of all
the calls on health system resources for the day.
The squeeze factor is defined as (call/available - 1). ie. the highest
fractional over-demand among any type of officer that is called-for in the
appt_footprint of an HSI event.
A value of 0.0 signifies that there is no squeezing (sufficient resources for
the EXPECTED_APPT_FOOTPRINT).

:param footprints_per_event: List, one entry per HSI event, containing the
minutes required from each health officer in each health facility as a
Counter (using the standard index)
:param total_footprint: Counter, containing the total minutes required from
each health officer in each health facility when non-zero, (using the
standard index)
:param current_capabilities: Series giving the amount of time available for
each health officer in each health facility (using the standard index)
:param compute_squeeze_factor_to_district_level: Boolean indicating whether
the computation of squeeze_factors should be specific to each district
(when `True`), or if the computation of squeeze_factors should be on
the basis that resources from all districts can be effectively "pooled"
(when `False).

:return: squeeze_factors: an array of the squeeze factors for each HSI event
(position in array matches that in the all_call_today list).
"""

def get_total_minutes_of_this_officer_in_this_district(_officer):
"""Returns the minutes of current capabilities for the officer identified (this officer type in this
facility_id)."""
return current_capabilities.get(_officer)

def get_total_minutes_of_this_officer_in_all_district(_officer):
"""Returns the minutes of current capabilities for the officer identified in all districts (this officer
type in this all facilities of the same level in all districts)."""

def split_officer_compound_string(cs) -> Tuple[int, str]:
"""Returns (facility_id, officer_type) for the officer identified in the string of the form:
'FacilityID_{facility_id}_Officer_{officer_type}'."""
_, _facility_id, _, _officer_type = cs.split('_', 3) # (NB. Some 'officer_type' include "_")
return int(_facility_id), _officer_type

def _match(_this_officer, facility_ids: List[int], officer_type: str):
"""Returns True if the officer identified is of the identified officer_type and is in one of the
facility_ids."""
this_facility_id, this_officer_type = split_officer_compound_string(_this_officer)
return (this_officer_type == officer_type) and (this_facility_id in facility_ids)

facility_id, officer_type = split_officer_compound_string(_officer)
facility_level = self._facility_by_facility_id[int(facility_id)].level
facilities_of_same_level_in_all_district = [
_fac.id for _fac in self._facilities_for_each_district[facility_level].values()
]

officers_in_the_same_level_in_all_districts = [
_officer for _officer in current_capabilities.keys() if
_match(_officer, facility_ids=facilities_of_same_level_in_all_district, officer_type=officer_type)
]

return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts)

# 1) Compute the load factors for each officer type at each facility that is
# called-upon in this list of HSIs
load_factor = {}
for officer, call in total_footprint.items():
if compute_squeeze_factor_to_district_level:
availability = get_total_minutes_of_this_officer_in_this_district(officer)
else:
availability = get_total_minutes_of_this_officer_in_all_district(officer)

# If officer does not exist in the relevant facility, log warning and proceed as if availability = 0
if availability is None:
logger.warning(
key="message",
data=(f"Requested officer {officer} is not contemplated by health system. ")
)
availability = 0

if availability == 0:
load_factor[officer] = float('inf')
else:
load_factor[officer] = max(call / availability - 1, 0.0)

# 2) Convert these load-factors into an overall 'squeeze' signal for each HSI,
# based on the load-factor of the officer with the largest time requirement for that
# event (or zero if event has an empty footprint)

# Instead of repeatedly creating lists for squeeze factors, we reuse a numpy array
# If the current store is too small, replace it
if len(footprints_per_event) > len(self._get_squeeze_factors_store):
# The new array size is a multiple of `grow`
new_size = math.ceil(
len(footprints_per_event) / self._get_squeeze_factors_store_grow
) * self._get_squeeze_factors_store_grow
self._get_squeeze_factors_store = np.zeros(new_size)

for i, footprint in enumerate(footprints_per_event):
if footprint:
# If any of the required officers are not available at the facility, set overall squeeze to inf
require_missing_officer = False
for officer in footprint:
if load_factor[officer] == float('inf'):
require_missing_officer = True
# No need to check the rest
break

if require_missing_officer:
self._get_squeeze_factors_store[i] = np.inf
else:
self._get_squeeze_factors_store[i] = max(load_factor[footprint.most_common()[0][0]], 0.)
else:
self._get_squeeze_factors_store[i] = 0.0

return self._get_squeeze_factors_store

def record_hsi_event(self, hsi_event, actual_appt_footprint=None, squeeze_factor=None, did_run=True, priority=None):
def record_hsi_event(self, hsi_event, actual_appt_footprint=None, squeeze_factor=0.0, did_run=True, priority=None):
"""
Record the processing of an HSI event.
It will also record the actual appointment footprint.
Expand Down Expand Up @@ -2043,115 +1911,57 @@ def run_individual_level_events_in_mode_0_or_1(self,
# from argument to Counter object called from
self.running_total_footprint.update(footprint)

# Estimate Squeeze-Factors for today
if self.mode_appt_constraints == 0:
# For Mode 0 (no Constraints), the squeeze factors are all zero.
squeeze_factor_per_hsi_event = np.zeros(
len(footprints_of_all_individual_level_hsi_event))
else:
# For Other Modes, the squeeze factors must be computed
squeeze_factor_per_hsi_event = self.get_squeeze_factors(
footprints_per_event=footprints_of_all_individual_level_hsi_event,
total_footprint=self.running_total_footprint,
current_capabilities=self.capabilities_today,
compute_squeeze_factor_to_district_level=self.compute_squeeze_factor_to_district_level,
)

for ev_num, event in enumerate(_list_of_individual_hsi_event_tuples):
_priority = event.priority
event = event.hsi_event
squeeze_factor = squeeze_factor_per_hsi_event[ev_num] # todo use zip here!

# store appt_footprint before running
_appt_footprint_before_running = event.EXPECTED_APPT_FOOTPRINT

# Mode 0: All HSI Event run, with no squeeze
# Mode 1: All HSI Events run with squeeze provided latter is not inf
ok_to_run = True

if self.mode_appt_constraints == 1 and squeeze_factor == float('inf'):
ok_to_run = False

if ok_to_run:

# Compute the bed days that are allocated to this HSI and provide this information to the HSI
if sum(event.BEDDAYS_FOOTPRINT.values()):
event._received_info_about_bed_days = \
self.bed_days.issue_bed_days_according_to_availability(
facility_id=self.bed_days.get_facility_id_for_beds(persons_id=event.target),
footprint=event.BEDDAYS_FOOTPRINT
)

# Check that a facility has been assigned to this HSI
assert event.facility_info is not None, \
f"Cannot run HSI {event.TREATMENT_ID} without facility_info being defined."
# Compute the bed days that are allocated to this HSI and provide this information to the HSI
if sum(event.BEDDAYS_FOOTPRINT.values()):
event._received_info_about_bed_days = \
self.bed_days.issue_bed_days_according_to_availability(
facility_id=self.bed_days.get_facility_id_for_beds(persons_id=event.target),
footprint=event.BEDDAYS_FOOTPRINT
)

# Run the HSI event (allowing it to return an updated appt_footprint)
actual_appt_footprint = event.run(squeeze_factor=squeeze_factor)
# Check that a facility has been assigned to this HSI
assert event.facility_info is not None, \
f"Cannot run HSI {event.TREATMENT_ID} without facility_info being defined."

# Check if the HSI event returned updated appt_footprint
if actual_appt_footprint is not None:
# The returned footprint is different to the expected footprint: so must update load factors
# Run the HSI event (allowing it to return an updated appt_footprint)
actual_appt_footprint = event.run(squeeze_factor=0.0)

# check its formatting:
assert self.appt_footprint_is_valid(actual_appt_footprint)
# Check if the HSI event returned updated appt_footprint
if actual_appt_footprint is not None:
# The returned footprint is different to the expected footprint: so must update load factors

# Update load factors:
updated_call = self.get_appt_footprint_as_time_request(
facility_info=event.facility_info,
appt_footprint=actual_appt_footprint
)
original_call = footprints_of_all_individual_level_hsi_event[ev_num]
footprints_of_all_individual_level_hsi_event[ev_num] = updated_call
self.running_total_footprint -= original_call
self.running_total_footprint += updated_call

# Don't recompute for mode=0
if self.mode_appt_constraints != 0:
squeeze_factor_per_hsi_event = self.get_squeeze_factors(
footprints_per_event=footprints_of_all_individual_level_hsi_event,
total_footprint=self.running_total_footprint,
current_capabilities=self.capabilities_today,
compute_squeeze_factor_to_district_level=self.
compute_squeeze_factor_to_district_level,
)
# check its formatting:
assert self.appt_footprint_is_valid(actual_appt_footprint)

else:
# no actual footprint is returned so take the expected initial declaration as the actual,
# as recorded before the HSI event run
actual_appt_footprint = _appt_footprint_before_running

# Write to the log
self.record_hsi_event(
hsi_event=event,
actual_appt_footprint=actual_appt_footprint,
squeeze_factor=squeeze_factor,
did_run=True,
priority=_priority
# Update load factors:
updated_call = self.get_appt_footprint_as_time_request(
facility_info=event.facility_info,
appt_footprint=actual_appt_footprint
)
original_call = footprints_of_all_individual_level_hsi_event[ev_num]
footprints_of_all_individual_level_hsi_event[ev_num] = updated_call
self.running_total_footprint -= original_call
self.running_total_footprint += updated_call

# if not ok_to_run
else:
# Do not run,
# Call did_not_run for the hsi_event
rtn_from_did_not_run = event.did_not_run()

# If received no response from the call to did_not_run, or a True signal, then
# add to the hold-over queue.
# Otherwise (disease module returns "FALSE") the event is not rescheduled and will not run.

if rtn_from_did_not_run is not False:
# reschedule event
hp.heappush(_to_be_held_over, _list_of_individual_hsi_event_tuples[ev_num])

# Log that the event did not run
self.record_hsi_event(
hsi_event=event,
actual_appt_footprint=event.EXPECTED_APPT_FOOTPRINT,
squeeze_factor=squeeze_factor,
did_run=False,
priority=_priority
)
# no actual footprint is returned so take the expected initial declaration as the actual,
# as recorded before the HSI event run
actual_appt_footprint = _appt_footprint_before_running

# Write to the log
self.record_hsi_event(
hsi_event=event,
actual_appt_footprint=actual_appt_footprint,
did_run=True,
priority=_priority
)

return _to_be_held_over

Expand Down Expand Up @@ -2277,7 +2087,11 @@ def _get_events_due_today(self) -> List:

return due_today

def process_events_mode_0_and_1(self, hold_over: List[HSIEventQueueItem]) -> None:
def process_events_mode_0_and_1(self) -> None:
# Run all events due today, repeating the check for due events until none are due
# (this allows for HSI that are added to the queue in the course of other HSI
# for this today to be run this day).

while True:
# Get the events that are due today:
list_of_individual_hsi_event_tuples_due_today = self._get_events_due_today()
Expand All @@ -2296,10 +2110,9 @@ def process_events_mode_0_and_1(self, hold_over: List[HSIEventQueueItem]) -> Non
list_of_individual_hsi_event_tuples_due_today_that_have_essential_equipment.append(item)

# Try to run the list of individual-level events that have their essential equipment
_to_be_held_over = self.module.run_individual_level_events_in_mode_0_or_1(
self.module.run_individual_level_events_in_mode_0_or_1(
list_of_individual_hsi_event_tuples_due_today_that_have_essential_equipment,
)
hold_over.extend(_to_be_held_over)

def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None:

Expand Down Expand Up @@ -2604,10 +2417,7 @@ def apply(self, population):
hold_over = list()

if self.module.mode_appt_constraints in (0, 1):
# Run all events due today, repeating the check for due events until none are due
# (this allows for HSI that are added to the queue in the course of other HSI
# for this today to be run this day).
self.process_events_mode_0_and_1(hold_over)
self.process_events_mode_0_and_1()

elif self.module.mode_appt_constraints == 2:
self.process_events_mode_2(hold_over)
Expand Down
Loading