Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"python.defaultInterpreterPath": "/Users/bnl28/mambaforge/envs/pyfive-25aug/bin/python",
"python.terminal.activateEnvironment":true,
"terminal.integrated.profiles.osx": {
"zsh": {
"path": "/bin/zsh",
"args": ["-l"]
}
},
"terminal.integrated.defaultProfile.osx": "zsh",
"esbonio.server.env": {
"PATH": "/Users/bnl28/mambaforge/envs/pyfive-25aug/bin:${env:PATH}",
"PYTHONPATH": "/Users/bnl28/mambaforge/envs/pyfive-25aug/lib/python3.12/site-packages"
},
"esbonio.server.pythonPath": "/Users/bnl28/mambaforge/envs/pyfive-25aug/bin/python"
}
1 change: 1 addition & 0 deletions pyfive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pyfive.h5t import check_enum_dtype, check_string_dtype, check_dtype, opaque_dtype, check_opaque_dtype
from pyfive.h5py import Datatype, Empty
from importlib.metadata import version
from pyfive.inspect import p5ncdump

__version__ = '0.5.0.dev'

52 changes: 3 additions & 49 deletions pyfive/btree.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, fh, offset):
self.offset = offset
self.depth = None
self.all_nodes = {}
self.last_offset = offset

self._read_root_node()
self._read_children()
Expand Down Expand Up @@ -53,6 +54,7 @@ def _read_node(self, offset, node_level):
node = self._read_node_header(offset, node_level)
node['keys'] = []
node['addresses'] = []
self.last_offset=max(offset,self.last_offset)
return node

def _read_node_header(self, offset):
Expand Down Expand Up @@ -149,57 +151,9 @@ def _read_node(self, offset, node_level):
addresses.append(chunk_address)
node['keys'] = keys
node['addresses'] = addresses
self.last_offset=max(offset,self.last_offset)
return node

def construct_data_from_chunks(
self, chunk_shape, data_shape, dtype, filter_pipeline):
""" Build a complete data array from chunks. """
if isinstance(dtype, tuple):
true_dtype = tuple(dtype)
dtype_class = dtype[0]
if dtype_class == 'REFERENCE':
size = dtype[1]
if size != 8:
raise NotImplementedError('Unsupported Reference type')
dtype = '<u8'
else:
raise NotImplementedError('datatype not implemented')
else:
true_dtype = None

# create array to store data
shape = [_padded_size(i, j) for i, j in zip(data_shape, chunk_shape)]
data = np.zeros(shape, dtype=dtype)

# loop over chunks reading each into the full data array
count = np.prod(chunk_shape)
itemsize = np.dtype(dtype).itemsize
chunk_buffer_size = count * itemsize
for node in self.all_nodes[0]:
for node_key, addr in zip(node['keys'], node['addresses']):
self.fh.seek(addr)
if filter_pipeline is None:
chunk_buffer = self.fh.read(chunk_buffer_size)
else:
chunk_buffer = self.fh.read(node_key['chunk_size'])
filter_mask = node_key['filter_mask']
chunk_buffer = self._filter_chunk(
chunk_buffer, filter_mask, filter_pipeline, itemsize)

chunk_data = np.frombuffer(chunk_buffer, dtype=dtype)
start = node_key['chunk_offset'][:-1]
region = [slice(i, i+j) for i, j in zip(start, chunk_shape)]
data[tuple(region)] = chunk_data.reshape(chunk_shape)

if isinstance(true_dtype, tuple):
if dtype_class == 'REFERENCE':
to_reference = np.vectorize(Reference)
data = to_reference(data)
else:
raise NotImplementedError('datatype not implemented')

non_padded_region = tuple([slice(i) for i in data_shape])
return data[non_padded_region]

@classmethod
def _filter_chunk(cls, chunk_buffer, filter_mask, filter_pipeline, itemsize):
Expand Down
109 changes: 86 additions & 23 deletions pyfive/h5d.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from importlib.metadata import version

StoreInfo = namedtuple('StoreInfo',"chunk_offset filter_mask byte_offset size")
ChunkIndex = namedtuple('ChunkIndex',"chunk_address chunk_dims")

class DatasetID:
"""
Expand All @@ -28,10 +29,15 @@ class DatasetID:
from the parent file access as both share underlying C-structures.*

"""
def __init__(self, dataobject, pseudo_chunking_size_MB=4):
def __init__(self, dataobject, noindex=False, pseudo_chunking_size_MB=4):
"""
Instantiated with the ``pyfive`` ``datasetdataobject``, we copy and cache everything
we want so that the only file operations are now data accesses.

noindex provides a method for controlling how lazy the data load
actually is. This version supports values of False (normal behaviour
index is read when datasetid first instantiated) or True (index
is only read when the data is accessed).

if ``pseudo_chunking_size_MB`` is set to a value greater than zero, and
if the storage is not local posix (and hence ``np.mmap``is not available) then
Expand Down Expand Up @@ -89,15 +95,21 @@ def __init__(self, dataobject, pseudo_chunking_size_MB=4):

self._meta = DatasetMeta(dataobject)

self._index = None
self._index = None
self.__index_built = False
self._index_params = None
# throws a flake8 wobbly for Python<3.10; match is Py3.10+ syntax
match self.layout_class: # noqa
case 0: #compact storage
self._data = self._get_compact_data(dataobject)
case 1: # contiguous storage
self.data_offset, = struct.unpack_from('<Q', dataobject.msg_data, self.property_offset)
case 2: # chunked storage
self._build_index(dataobject)
self._index_params = ChunkIndex(
dataobject._chunk_address,
dataobject._chunk_dims)
if not noindex:
self._build_index()

def __hash__(self):
""" The hash is based on assuming the file path, the location
Expand All @@ -113,37 +125,52 @@ def __eq__(self, other):
"""
return self._unique == other._unique

def __chunk_init_check(self):
"""
Used by all the chunk methods to see if this dataset is
chunked, and if so, if the index is present, and if not,
build it. Otherwise handle errors etc.
"""
if self.layout_class != 2:
raise TypeError('Dataset is not chunked ')
return not self.index == {}


def get_chunk_info(self, index):
"""
Retrieve storage information about a chunk specified by its index.
"""
if not self._index:
return None
else:
if self.__chunk_init_check():
return self._index[self._nthindex[index]]
else:
return None


def get_chunk_info_by_coord(self, coordinate_index):
"""
Retrieve information about a chunk specified by the array address of the chunk’s
first element in each dimension.
"""
if not self._index:
return None
else:
if self.__chunk_init_check():
return self._index[coordinate_index]
else:
return None

def get_num_chunks(self):
"""
Return total number of chunks in dataset
"""
return len(self._index)
if self.__chunk_init_check():
return len(self._index)
else:
return 0

def read_direct_chunk(self, chunk_position, **kwargs):
"""
Returns a tuple containing the filter_mask and the raw data storing this chunk as bytes.
Additional arguments supported by ``h5py`` are not supported here.
"""
if not self.index:
if not self.__chunk_init_check():
return None
if chunk_position not in self._index:
raise OSError("Chunk coordinates must lie on chunk boundaries")
Expand All @@ -166,6 +193,8 @@ def get_data(self, args, fillvalue):
else:
return self._get_contiguous_data(args, fillvalue)
case 2: # chunked storage
if not self.__index_built:
self._build_index()
if not self._index:
no_storage = True
else:
Expand All @@ -189,9 +218,9 @@ def iter_chunks(self, args):
intersection of the given chunk with the selection area.
This can be used to read data in that chunk.
"""
if self.chunks is None:
raise TypeError('Dataset is not chunked')
if not self.__chunk_init_check():
return None

def convert_selection(tuple_of_slices):
# while a slice of the form slice(a,b,None) is equivalent
# in function to a slice of form (a,b,1) it is not the same.
Expand All @@ -218,18 +247,33 @@ def convert_slice(aslice):
else:
yield convert_selection(out_selection)



##### The following property is made available to support ActiveStorage
##### and to help those who may want to generate kerchunk indices and
##### bypass the iterator methods.
@property
def index(self):
""" Direct access to the chunk index, if there is one. This is a ``pyfive`` API extension. """
if self._index is None:
raise ValueError('No chunk index available for HDF layout class {self.layout}')
else:
return self._index
# can't use init_chunk_check because that would be an infinite regression
if self.layout_class != 2:
raise TypeError("Data is not chunked")
if not self._index:
self._build_index()
return self._index


##### This property is made available to help understand object store performance
@property
def btree_range(self):
""" A tuple with the addresses of the first b-tree node
for this variable, and the address of the furthest away node
(Which may not be the last one in the chunk index). This property
may be of use in understanding the read performance of chunked
data in object stores. ``btree_range`` is a ``pyfive`` API extgension.
"""
self.__chunk_init_check()
return (self._btree_start, self._btree_end)

#### The following method can be used to set pseudo chunking size after the
#### file has been closed and before data transactions. This is pyfive specific
def set_pseudo_chunk_size(self, newsize_MB):
Expand Down Expand Up @@ -267,7 +311,7 @@ def get_chunk_info_from_chunk_coord(self, chunk_coords):
# third parties to use them. They are not H5Py methods.
######

def _build_index(self, dataobject):
def _build_index(self):
"""
Build the chunk index if it doesn't exist. This is only
called for chunk data, and only when the variable is accessed.
Expand All @@ -279,27 +323,46 @@ def _build_index(self, dataobject):
if self._index is not None:
return

if self._index_params is None:
raise RuntimeError('Attempt to build index with no chunk index parameters')

# look out for an empty dataset, which will have no btree
if np.prod(self.shape) == 0 or dataobject._chunk_address == UNDEFINED_ADDRESS:
if np.prod(self.shape) == 0 or self._index_params.chunk_address == UNDEFINED_ADDRESS:
self._index = {}
#FIXME: There are other edge cases for self._index = {} to handle
self._btree_end, self._btree_start = None, None
return

logging.info(f'Building chunk index in pyfive {version("pyfive")}')

#FIXME: How do we know it's a V1 B-tree?
# There are potentially five different chunk indexing options according to
# https://docs.hdfgroup.org/archive/support/HDF5/doc/H5.format.html#AppendixC

fh = self._fh
chunk_btree = BTreeV1RawDataChunks(
dataobject.fh, dataobject._chunk_address, dataobject._chunk_dims)

fh, self._index_params.chunk_address, self._index_params.chunk_dims)
if self.posix:
fh.close()

self._index = {}
self._nthindex = []

for node in chunk_btree.all_nodes[0]:

for node_key, addr in zip(node['keys'], node['addresses']):
start = node_key['chunk_offset'][:-1]
key = start
size = node_key['chunk_size']
filter_mask = node_key['filter_mask']
self._nthindex.append(key)
self._index[key] = StoreInfo(key, filter_mask, addr, size)


self._btree_start=chunk_btree.offset
self._btree_end=chunk_btree.last_offset

self.__index_built=True

def _get_contiguous_data(self, args, fillvalue):

Expand Down
Loading
Loading