Skip to content

[RFC] Provide an explicit caching API #947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
343 changes: 235 additions & 108 deletions openfisca_core/data_storage.py
Original file line number Diff line number Diff line change
@@ -1,153 +1,280 @@
# -*- coding: utf-8 -*-

import shutil
import abc
import os
import shutil
import warnings
from typing import Any, Dict, KeysView, Optional, Union

import numpy as np
import numpy

from openfisca_core import periods
from openfisca_core.periods import ETERNITY
from openfisca_core.indexed_enums import EnumArray
from openfisca_core.periods import Period

StateType = Dict[Period, Any]

class InMemoryStorage(object):
"""
Low-level class responsible for storing and retrieving calculated vectors in memory
"""

def __init__(self, is_eternal = False):
self._arrays = {}
self.is_eternal = is_eternal
class StorageLike(abc.ABC):
"""Blueprint for an explicit Storage API."""
@abc.abstractmethod
def get(self, state: StateType, key: Period) -> Any:
...

def get(self, period):
if self.is_eternal:
period = periods.period(ETERNITY)
period = periods.period(period)
@abc.abstractmethod
def put(self, state: StateType, key: Period, value: Any) -> StateType:
...

values = self._arrays.get(period)
if values is None:
return None
return values
@abc.abstractmethod
def delete(self, state: StateType, key: Period) -> StateType:
...

def put(self, value, period):
if self.is_eternal:
period = periods.period(ETERNITY)
period = periods.period(period)
@abc.abstractmethod
def delete_all(self, state: StateType) -> dict:
...

self._arrays[period] = value
@abc.abstractmethod
def memory_usage(self, state: StateType) -> Dict[str, int]:
...

def delete(self, period = None):
if period is None:
self._arrays = {}
return

if self.is_eternal:
period = periods.period(ETERNITY)
period = periods.period(period)
class CachingLike(abc.ABC):
"""Blueprint for an explicit Cache API."""

self._arrays = {
period_item: value
for period_item, value in self._arrays.items()
if not period.contains(period_item)
}
@abc.abstractmethod
def get(self, period: Period) -> Any:
...

@abc.abstractmethod
def put(self, value: Any, period: Period) -> None:
...

def get_known_periods(self):
return self._arrays.keys()
@abc.abstractmethod
def delete(self, period: Optional[Period] = None) -> None:
...

def get_memory_usage(self):
if not self._arrays:
return dict(
nb_arrays = 0,
total_nb_bytes = 0,
cell_size = np.nan,
)
@abc.abstractmethod
def known_periods(self) -> KeysView[Period]:
...

nb_arrays = len(self._arrays)
array = next(iter(self._arrays.values()))
return dict(
nb_arrays = nb_arrays,
total_nb_bytes = array.nbytes * nb_arrays,
cell_size = array.itemsize,
)
@abc.abstractmethod
def memory_usage(self) -> Dict[str, int]:
...


class OnDiskStorage(object):
class SupportsPeriodCasting(abc.ABC):
"""
Low-level class responsible for storing and retrieving calculated vectors on disk
Extracting eternal period resolution.

TODO: get rid of.
"""

def __init__(self, storage_dir, is_eternal = False, preserve_storage_dir = False):
self._files = {}
self._enums = {}
self.is_eternal = is_eternal
self.preserve_storage_dir = preserve_storage_dir
self.storage_dir = storage_dir
def cast_period(self, period: Optional[Period], eternal: bool) -> Period:
if eternal:
return periods.period(periods.ETERNITY)

def _decode_file(self, file):
enum = self._enums.get(file)
if enum is not None:
return EnumArray(np.load(file), enum)
else:
return np.load(file)
return periods.period(period)


class MemoryStorage(StorageLike):
"""Low-level class responsible for storing and retrieving values in memory."""

def get(self, state: Dict[Period, Any], key: Period) -> Any:
return state.get(key)

def put(self, state: StateType, key: Period, value: Any) -> StateType:
state[key] = value
return state

def get(self, period):
if self.is_eternal:
period = periods.period(ETERNITY)
period = periods.period(period)
def delete(self, state: StateType, key: Period) -> StateType:
return {item: value for item, value in state.items() if not key.contains(item)}

values = self._files.get(period)
if values is None:
def delete_all(self, state: StateType) -> dict:
state.clear()
return state

def memory_usage(self, state: StateType) -> Dict[str, int]:
if not state:
return {
"nb_arrays": 0,
"total_nb_bytes": 0,
"cell_size": numpy.nan,
}

nb_arrays = len(state)
array = next(iter(state.values()))

return {
"nb_arrays": nb_arrays,
"total_nb_bytes": array.nbytes * nb_arrays,
"cell_size": array.itemsize,
}

def restore(self, state: StateType) -> StateType:
...


class DiskStorage(StorageLike):
"""Low-level class responsible for storing and retrieving values on disk."""

directory: str
preserve: bool

def __init__(self, directory: str, preserve: bool) -> None:
self.directory = directory
self.preserve = preserve

if not os.path.isdir(self.directory):
os.makedirs(self.directory, exist_ok = True)

def get(self, state: StateType, key: Period) -> Any:
file = state.get(key)

if file is None:
return None
return self._decode_file(values)

def put(self, value, period):
if self.is_eternal:
period = periods.period(ETERNITY)
period = periods.period(period)
filepath, value = file

if value is None:
return numpy.load(filepath)

return EnumArray(numpy.load(filepath), value)

def put(self, state: StateType, key: Period, value: Any) -> StateType:
filename = str(key)
filepath = os.path.join(self.directory, filename) + ".npy"

filename = str(period)
path = os.path.join(self.storage_dir, filename) + '.npy'
if isinstance(value, EnumArray):
self._enums[path] = value.possible_values
value = value.view(np.ndarray)
np.save(path, value)
self._files[period] = path
state[key] = filepath, value.possible_values
value = value.view(numpy.ndarray)

def delete(self, period = None):
if period is None:
self._files = {}
return
else:
state[key] = filepath, None

numpy.save(filepath, value)

return state

def delete(self, state: StateType, key: Period) -> StateType:
return {item: value for item, value in state.items() if not key.contains(item)}

if self.is_eternal:
period = periods.period(ETERNITY)
period = periods.period(period)
def delete_all(self, state: StateType) -> dict:
state.clear()
return state

if period is not None:
self._files = {
period_item: value
for period_item, value in self._files.items()
if not period.contains(period_item)
def memory_usage(self, state: StateType) -> Dict[str, int]:
if not state:
return {
"nb_files": 0,
"total_nb_bytes": 0,
"cell_size": numpy.nan,
}

def get_known_periods(self):
return self._files.keys()
filename, _ = next(iter(state.values()))
nb_files = len(state)
period = next(iter(state.keys()))
array = self.get(state, period)
size = os.path.getsize(filename)

def restore(self):
self._files = files = {}
# Restore self._files from content of storage_dir.
for filename in os.listdir(self.storage_dir):
return {
"nb_files": nb_files,
"total_nb_bytes": size * nb_files,
"cell_size": array.itemsize,
}

def restore(self, state: StateType) -> StateType:
state = self.delete_all(state)

# Restore files from content of directory.
for filename in os.listdir(self.directory):
if not filename.endswith('.npy'):
continue
path = os.path.join(self.storage_dir, filename)

filepath = os.path.join(self.directory, filename)
filename_core = filename.rsplit('.', 1)[0]
period = periods.period(filename_core)
files[period] = path
state[period] = filepath, None

def __del__(self):
if self.preserve_storage_dir:
return state

def __del__(self) -> None:
if self.preserve:
return
shutil.rmtree(self.storage_dir) # Remove the holder temporary files

# Remove the holder temporary files
shutil.rmtree(self.directory)

# If the simulation temporary directory is empty, remove it
parent_dir = os.path.abspath(os.path.join(self.storage_dir, os.pardir))
parent_dir = os.path.abspath(os.path.join(self.directory, os.pardir))

if not os.listdir(parent_dir):
shutil.rmtree(parent_dir)


StorageType = Union[MemoryStorage, DiskStorage]


class Cache(CachingLike, SupportsPeriodCasting):
"""
Explicit Cache API responsible of:

* keeping cache state
* reading from storages
* writing to storages
"""

state: StateType
storage: StorageType
is_eternal: bool

def __init__(self, storage: StorageType, is_eternal: bool = False) -> None:
self.state = {}
self.storage = storage
self.is_eternal = is_eternal

def get(self, period: Period) -> Any:
casted: Period = self.cast_period(period, self.is_eternal)
return self.storage.get(self.state, casted)

def put(self, value: Any, period: Period) -> None:
casted: Period = self.cast_period(period, self.is_eternal)
self.state = self.storage.put(self.state, casted, value)

def delete(self, period: Optional[Period] = None) -> None:
if period is None:
self.state = self.storage.delete_all(self.state)
return

casted: Period = self.cast_period(period, self.is_eternal)
self.state = self.storage.delete(self.state, casted)

def known_periods(self) -> KeysView[Period]:
return self.state.keys()

def memory_usage(self) -> Dict[str, int]:
return self.storage.memory_usage(self.state)

def get_known_periods(self) -> KeysView[Period]:
message = [
"The 'Cache.get_known_periods' method has been deprecated since version",
"34.8.0, and will be removed in the future. Henceforth, please prefer",
"Cache.known_periods.",
]
warnings.warn(" ".join(message), DeprecationWarning)
return self.known_periods()

def get_memory_usage(self) -> Dict[str, int]:
message = [
"The 'Cache.get_memory_usage' method has been deprecated since version",
"34.8.0, and will be removed in the future. Henceforth, please prefer",
"Cache.memory_usage.",
]
warnings.warn(" ".join(message), DeprecationWarning)
return self.memory_usage()

def restore(self) -> StateType:
message = [
"The 'Cache.restore' method has been deprecated since version",
"34.8.0, and will be removed in the future. Henceforth, please prefer",
"DiskStorage.restore.",
]
warnings.warn(" ".join(message), DeprecationWarning)
return self.storage.restore(self.state)
Loading