Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
aed9c4e
initial commit, experimenting with pickling non-libE_field fields (x …
jlnav Jun 26, 2025
00bb9c6
Merge branch 'develop' into feature/shelve_sims
jlnav Aug 28, 2025
b33dd7b
additional poking around and experimenting with history saving cache,…
jlnav Aug 28, 2025
a020a58
better making of .npy database, use History attributes created upon t…
jlnav Aug 29, 2025
fe4bc82
little note...?
jlnav Aug 29, 2025
60bd6f5
comments
jlnav Aug 29, 2025
8c20870
Merge branch 'develop' into feature/shelve_sims
jlnav Sep 11, 2025
a5d33cc
experimenting with having caching being a step of the alloc, once we'…
jlnav Sep 11, 2025
e8816a8
set those H entries to sim_started?
jlnav Sep 11, 2025
2894523
grab update-able indexes, then call update_history_x_out and update_h…
jlnav Sep 12, 2025
df29125
moving cache logic into manager, into handle_msg_from_worker that ove…
jlnav Sep 17, 2025
0a70303
grow the manager's internal record of cache hits, instead of overwrit…
jlnav Sep 18, 2025
593b522
save presumptive workerID for the worker that would've been given cac…
jlnav Sep 18, 2025
e55e691
prevent redundant insertions into local cache retrieval. fix a bug in…
jlnav Sep 19, 2025
9575d76
refactor, and remove first draft of code that was in alloc_f
jlnav Sep 19, 2025
c01dcd3
for now, enable cache for sims that lasted longer than a second
jlnav Sep 19, 2025
6e90b5c
experimenting with making disk cache name match calling script plus e…
jlnav Sep 19, 2025
c7a7f52
fix redundant send of work if rows send to gen. tiny test fix
jlnav Sep 19, 2025
4af459d
Merge branch 'develop' into feature/shelve_sims
jlnav Sep 24, 2025
343cc9b
add libE_specs.cache_long_sims, plus more/better docstrings
jlnav Sep 24, 2025
ca7fe00
manager builds libE_stats messages corresponding to cache retrievals.…
jlnav Sep 25, 2025
1f55a61
user can specify database name; trying to figure out occasionally-mal…
jlnav Sep 26, 2025
a21fb44
Merge branch 'develop' into feature/shelve_sims
jlnav Oct 22, 2025
9a3e0c4
fix syntax error uncaught by black and other tools?
jlnav Oct 22, 2025
702c8d9
cache_name is only string. path not needed in specs.py
jlnav Oct 22, 2025
08c1790
param fix
jlnav Oct 22, 2025
9270b93
still want to send Work on persis_stop if we're doing final_gen_send
jlnav Oct 22, 2025
d92b3c3
don't necessarily need cache collisions for these executor tests - si…
jlnav Oct 22, 2025
10946bc
fix iterating over blank template cache entries as though they're val…
jlnav Oct 22, 2025
11694e7
add functionality test for cache_sims
jlnav Oct 22, 2025
019b82e
non-existing cache already dealt with earlier?
jlnav Oct 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions libensemble/history.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import time
from pathlib import Path

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -93,6 +94,7 @@ def __init__(
self.index = len(H0)
self.grow_count = 0
self.safe_mode = False
self.use_cache = False

self.sim_started_count = np.sum(H["sim_started"])
self.sim_ended_count = np.sum(H["sim_ended"])
Expand All @@ -106,6 +108,15 @@ def __init__(
self.last_started = -1
self.last_ended = -1

def init_cache(self, cache_name: str) -> None:
self.cache_dir = Path.home() / ".libE"
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache = self.cache_dir / Path(cache_name + ".npy")
if not self.cache.exists():
self.cache.touch()
self.use_cache = True
self.cache_set = False

def _append_new_fields(self, H_f: npt.NDArray) -> None:
dtype_new = np.dtype(list(set(self.H.dtype.descr + H_f.dtype.descr)))
H_new = np.zeros(len(self.H), dtype=dtype_new)
Expand All @@ -114,6 +125,30 @@ def _append_new_fields(self, H_f: npt.NDArray) -> None:
H_new[field][: len(self.H)] = self.H[field]
self.H = H_new

def _shelf_longrunning_sims(self, index):
"""Cache any f values that ran for more than a second."""
if self.H[index]["sim_ended_time"] - self.H[index]["sim_started_time"] > 1:
# ('f', 'x') and ('x', 'f') are not equivalent dtypes, unfortunately. So maybe sorted helps.
self.cache_keys = sorted(
[i for i in self.H.dtype.names if i not in [k[0] for k in libE_fields]]
) # ('f', 'x') keys only
self.cache_dtype = sorted(
[(name, self.H.dtype.fields[name][0]) for name in self.cache_keys]
) # only needed to init cache
try:
in_cache = np.load(self.cache, allow_pickle=True)
except EOFError:
in_cache = np.zeros(1, dtype=self.cache_dtype)
entry = self.H[index][self.cache_keys]
if entry not in in_cache:
in_cache = np.append(in_cache, entry)
in_cache = np.unique(in_cache, axis=0) # attempt to remove duplicates
np.save(self.cache, in_cache, allow_pickle=True)
self.cache_set = True

def get_shelved_sims(self) -> npt.NDArray:
return np.load(self.cache, allow_pickle=True)

def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None:
"""
Updates the history after points have been evaluated
Expand Down Expand Up @@ -147,6 +182,8 @@ def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None:
self.H["sim_ended"][ind] = True
self.H["sim_ended_time"][ind] = time.time()
self.sim_ended_count += 1
if self.use_cache:
self._shelf_longrunning_sims(ind)

if kill_canceled_sims:
for j in range(self.last_ended + 1, np.max(new_inds) + 1):
Expand Down
173 changes: 158 additions & 15 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
from numpy.lib.recfunctions import repack_fields

from libensemble.comms.comms import CommFinishedException, QCommThread
from libensemble.comms.logs import LogConfig
from libensemble.executors.executor import Executor
from libensemble.message_numbers import (
CACHE_RETRIEVE,
EVAL_GEN_TAG,
EVAL_SIM_TAG,
FINISHED_PERSISTENT_GEN_TAG,
Expand All @@ -29,6 +31,7 @@
MAN_SIGNAL_KILL,
PERSIS_STOP,
STOP_TAG,
calc_status_strings,
calc_type_strings,
)
from libensemble.resources.resources import Resources
Expand All @@ -37,7 +40,7 @@
from libensemble.utils.misc import _WorkerIndexer, extract_H_ranges
from libensemble.utils.output_directory import EnsembleDirectory
from libensemble.utils.timer import Timer
from libensemble.worker import WorkerErrMsg, worker_main
from libensemble.worker import Worker, WorkerErrMsg, worker_main

logger = logging.getLogger(__name__)
# For debug messages - uncomment
Expand Down Expand Up @@ -204,6 +207,7 @@ def __init__(
timer.start()
self.date_start = timer.date_start.replace(" ", "_")
self.safe_mode = libE_specs.get("safe_mode")
self.use_cache = libE_specs.get("cache_long_sims")
self.kill_canceled_sims = libE_specs.get("kill_canceled_sims")
self.hist = hist
self.hist.safe_mode = self.safe_mode
Expand All @@ -217,6 +221,11 @@ def __init__(
self.WorkerExc = False
self.persis_pending = []
self.live_data = libE_specs.get("live_data")
if self.use_cache:
self.hist.init_cache(self.libE_specs.get("cache_name"))
self.from_cache = []
self.cache_index = 0
self.cache_hit = False

dyn_keys = ("resource_sets", "num_procs", "num_gpus")
dyn_keys_in_H = any(k in self.hist.H.dtype.names for k in dyn_keys)
Expand Down Expand Up @@ -410,27 +419,110 @@ def _freeup_resources(self, w: int) -> None:
if self.resources:
self.resources.resource_manager.free_rsets(w)

def _refresh_from_cache(
self, cache: npt.NDArray, dtype_with_idx: np.dtype, cache_row: npt.NDArray, work_row: int, w: int
) -> None:
"""Add a cache entry, workerID, and H_row to the local record array.

Later on when we iterate over the cache for entries that could've been sent to a worker (but weren't),
we'll process that entry as though it came from this worker, with these H_rows.
"""
self.cache_hit = True
from_cache_entry = np.empty(1, dtype=dtype_with_idx)
from_cache_entry["H_row"] = work_row
from_cache_entry["worker_id"] = w
for remaining_field in cache.dtype.names:
from_cache_entry[remaining_field] = cache_row[remaining_field]
self.from_cache[self.cache_index] = from_cache_entry
self.cache_index += 1

def _cache_scan(
self, cache: npt.NDArray, Work: dict, w: int, dtype_with_idx: np.dtype, new_dtype: np.dtype
) -> None:
"""
Check if any work rows are in the cache, and if so, call the above, _refresh_from_cache
to update the local `from_cache` record.
"""

self.cache_timer = Timer()
with self.cache_timer:
for field in np.dtype(new_dtype).names:
if field in cache.dtype.names:
for work_row in Work["libE_info"]["H_rows"]:
for cache_row in cache:
if (
np.allclose(cache_row[field], self.hist.H[field][work_row])
and work_row not in self.from_cache["H_row"]
): # we found outbound work in cache, that's not already in the local record
self._refresh_from_cache(cache, dtype_with_idx, cache_row, work_row, w)

def _update_state_from_cache(self, Work: dict, work_rows: npt.NDArray, w: int, new_dtype: np.dtype) -> None:
"""Retrieve saved cache from history, create local record-array of matching cache entries.

The `from_cache` local record contains cache entries and the workerID and H_rows they are associated with, had
they been sent to a worker.

Cache entries *must* be associated with the preempted outbound worker and H_rows because those values
are always associated with actual inbound results. Later on, when we iterate over the cache for entries that
could've been sent to a worker (but weren't), we'll process that entry as though it came from that worker,
with those H_rows.
"""

cache = self.hist.get_shelved_sims()

# our local record resembles the cache, but additionally with the worker_id and H_row from the alloc_f
dtype_with_idx = np.dtype(cache.dtype.descr + np.dtype([("H_row", int), ("worker_id", int)]).descr)

# initialize or grow the local record, then call _cache_scan to fill it
if not len(self.from_cache):
self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx)
else:
self.from_cache = np.append(self.from_cache, np.zeros(len(work_rows), dtype=dtype_with_idx))

# populates the local record
self._cache_scan(cache, Work, w, dtype_with_idx, new_dtype)

def _send_work_order(self, Work: dict, w: int) -> None:
"""Sends an allocation function order to a worker"""
logger.debug(f"Manager sending work unit to worker {w}")

work_rows = Work["libE_info"]["H_rows"]
new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]]

if self.use_cache and Work["tag"] == EVAL_SIM_TAG and len(work_rows) and self.hist.cache_set:
self._update_state_from_cache(Work, work_rows, w, new_dtype)

if self.resources:
self._set_resources(Work, w)

self.wcomms[w].send(Work["tag"], Work)

if Work["tag"] == EVAL_GEN_TAG:
self.W[w]["gen_started_time"] = time.time()
self.wcomms[w].send(Work["tag"], Work)

work_rows = Work["libE_info"]["H_rows"]
work_name = calc_type_strings[Work["tag"]]
logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}")
if self.cache_hit:
logger.debug(
f"Manager retrieved {work_name} work for worker {w} from cache. Rows {extract_H_ranges(Work) or None}"
)
else:
logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}")

if len(work_rows):
new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]]

if self.cache_hit:
work_rows = [row for row in work_rows if row not in self.from_cache["H_row"]]
if (
all([i in self.from_cache["H_row"] for i in work_rows]) and Work["tag"] == EVAL_SIM_TAG
): # if all rows in work_rows are found in cache
logger.debug("Manager skipping sending *all* work to worker %s due to cache", w)
return

H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype)
for i, row in enumerate(work_rows):
H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row])

if Work["tag"] in [EVAL_SIM_TAG, PERSIS_STOP]: # inclusion of PERSIS_STOP for final_gen_send
self.wcomms[w].send(Work["tag"], Work)
self.wcomms[w].send(0, H_to_be_sent)

def _update_state_on_alloc(self, Work: dict, w: int):
Expand All @@ -452,13 +544,35 @@ def _update_state_on_alloc(self, Work: dict, w: int):

# --- Handle incoming messages from workers

def _receive_from_workers(self, persis_info: dict) -> dict:
"""Receives calculation output from workers. Loops over all
def _receive_from_workers_or_cache(self, persis_info: dict) -> dict:
"""
Two stage process of handling either:
1. Messages that could've been sent to a worker, but are already in the cache.
2. Messages that have been sent by a worker.

1.
If the cache is not empty, the cache is scanned for messages that could've been sent.
Messages are processed as though they came from their corresponding worker. The local
record of the cache is then cleared to prevent duplicate processing.

2.
Receives calculation output from workers. Loops over all
active workers and probes to see if worker is ready to
communticate. If any output is received, all other workers are
looped back over.
"""
time.sleep(0.0001) # Critical for multiprocessing performance

# Process messages from the cache
if self.cache_hit:
self.cache_hit = False
for w in self.from_cache["worker_id"]:
if w > 0: # actual cache entry - not blank. assuming w0 gets no sim work
self._handle_msg_from_worker(persis_info, w, process_cache=True)
self.from_cache = []
self.cache_index = 0

# Process messages from workers
new_stuff = True
while new_stuff:
new_stuff = False
Expand Down Expand Up @@ -516,11 +630,34 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -
if D_recv.get("persis_info"):
persis_info.setdefault(int(w), {}).update(D_recv["persis_info"])

def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None:
"""Handles a message from worker w"""
def _create_simulated_D_recv(self, w: int) -> dict:
"""Create a simulated worker message containing the cache entry instead of a message from a worker."""

cache_entry_by_worker = self.from_cache[self.from_cache["worker_id"] == w]
D_recv = {
"calc_out": cache_entry_by_worker[[name[0] for name in self.sim_specs["out"]]],
"libE_info": {
"H_rows": cache_entry_by_worker["H_row"],
"workerID": w,
},
"calc_status": CACHE_RETRIEVE,
"calc_type": 1,
}
return D_recv

def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool = False) -> None:
"""Handles a message from worker w.

If processing from the cache, create a simulated worker message containing
the cache entry.
"""
try:
msg = self.wcomms[w].recv()
tag, D_recv = msg
if process_cache:
D_recv = self._create_simulated_D_recv(w)
enum_desc, calc_id = Worker._extract_debug_data(1, D_recv)
else:
msg = self.wcomms[w].recv()
tag, D_recv = msg
except CommFinishedException:
logger.debug(f"Finalizing message from Worker {w}")
return
Expand All @@ -535,7 +672,13 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None:
logger.vdebug(f"Manager received a log message from worker {w}")
logging.getLogger(D_recv.name).handle(D_recv)
else:
logger.debug(f"Manager received data message from worker {w}")
if process_cache:
logger.debug(f"Manager retrieved cached message redirected from worker {w}")
calc_msg = f"""{enum_desc} {calc_id}: {"sim"} {self.cache_timer}"""
calc_msg += f" Status: {calc_status_strings[CACHE_RETRIEVE]}"
logging.getLogger(LogConfig.config.stats_name).info(calc_msg) # libE_stats
else:
logger.debug(f"Manager received data message from worker {w}")
self._update_state_on_worker_msg(persis_info, D_recv, w)

def _kill_cancelled_sims(self) -> None:
Expand Down Expand Up @@ -596,7 +739,7 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int):

exit_flag = 0
while (any(self.W["active"]) or any(self.W["persis_state"])) and exit_flag == 0:
persis_info = self._receive_from_workers(persis_info)
persis_info = self._receive_from_workers_or_cache(persis_info)
if self.term_test(logged=False) == 2:
# Elapsed Wallclock has expired
if not any(self.W["persis_state"]):
Expand Down Expand Up @@ -685,7 +828,7 @@ def run(self, persis_info: dict) -> (dict, int, int):
try:
while not self.term_test():
self._kill_cancelled_sims()
persis_info = self._receive_from_workers(persis_info)
persis_info = self._receive_from_workers_or_cache(persis_info)
Work, persis_info, flag = self._alloc_work(self.hist.trim_H(), persis_info)
if flag:
break
Expand Down
3 changes: 3 additions & 0 deletions libensemble/message_numbers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
WORKER_DONE = 35 # Calculation was successful
# last_calc_status_rst_tag
CALC_EXCEPTION = 36 # Reserved: Automatically used if user_f raised an exception
CACHE_RETRIEVE = 40 # Manager retrieved sim from cache


MAN_KILL_SIGNALS = [MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL]

Expand All @@ -57,6 +59,7 @@
TASK_FAILED_TO_START: "Task Failed to start",
WORKER_DONE: "Completed",
CALC_EXCEPTION: "Exception occurred",
CACHE_RETRIEVE: "Retrieved from cache",
None: "Unknown Status",
}
# last_calc_status_string_rst_tag
19 changes: 19 additions & 0 deletions libensemble/specs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import random
import sys
import warnings
from pathlib import Path

Expand Down Expand Up @@ -306,6 +307,24 @@ class LibeSpecs(BaseModel):
Forms the base of a generator directory.
"""

cache_long_sims: bool | None = False
"""
Cache simulation results with runtimes >1s to disk. Subsequent runs of the same
base script with the same command-line arguments will access this cache.

Upon the generator creating points already in the cache, those points will be skipped from
being sent for evaluation. Instead the corresponding cached results are retrieved and returned
to the generator.

The cache is saved in $HOME/.libE, and by default is named after the joined command-line arguments.
"""

cache_name: str | None = "".join(sys.argv)
"""
The name of the cache file. By default is the joined command-line arguments.
Stored in $HOME/.libE, and by default is named after the joined command-line arguments.
"""

calc_dir_id_width: int | None = 4
"""
The width of the numerical ID component of a calculation directory name. Leading
Expand Down
Loading
Loading