Skip to content

Commit 24fe98e

Browse files
committed
wip
1 parent e70c014 commit 24fe98e

File tree

1 file changed

+79
-46
lines changed

1 file changed

+79
-46
lines changed

neo/rawio/xarray_utils.py

Lines changed: 79 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,19 @@
22

33
import numpy as np
44

5+
import base64
56

67

7-
def to_xarray_reference_api(rawio_reader, block_index=0, seg_index=0):
8+
9+
def to_xarray_reference_api(rawio_reader, block_index=0, seg_index=0, stream_index=0):
810
"""
911
Transform the buffer_description_api into a dict ready for the xarray API 'reference://'
12+
13+
14+
See https://fsspec.github.io/kerchunk/spec.html
15+
See https://docs.xarray.dev/en/latest/user-guide/io.html#kerchunk
16+
17+
1018
"""
1119
rfs = dict()
1220
rfs["version"] = 1
@@ -15,56 +23,80 @@ def to_xarray_reference_api(rawio_reader, block_index=0, seg_index=0):
1523

1624
# rawio_reader.
1725
signal_streams = rawio_reader.header["signal_streams"]
18-
for stream_index in range(len(signal_streams)):
19-
stream_name = signal_streams["name"][stream_index]
20-
stream_id = signal_streams["id"][stream_index]
21-
print(stream_index, stream_name, stream_id)
22-
23-
24-
descr = rawio_reader.get_analogsignal_buffer_description(block_index=block_index, seg_index=seg_index,
25-
stream_index=stream_index)
26-
27-
if descr["type"] == "binary":
28-
dtype = np.dtype(descr["dtype"])
29-
zarray = dict(
30-
chunks=descr["shape"],
31-
compressor=None,
32-
dtype=dtype.str,
33-
fill_value=None,
34-
filters=None,
35-
order=descr["order"],
36-
shape=descr["shape"],
37-
zarr_format=2,
38-
)
39-
zattrs = dict(
40-
_ARRAY_DIMENSIONS=[f'time_{stream_id}', f'channel_{stream_id}'],
41-
)
42-
# unique big chunk
43-
# TODO : optional split in several small chunks
44-
array_size = np.prod(descr["shape"], dtype='int64')
45-
chunk_size = array_size * dtype.itemsize
46-
rfs["refs"][f"{stream_id}/0.0"] = [str(descr["file_path"]), descr["file_offset"], chunk_size]
47-
rfs["refs"][f"{stream_id}/.zarray"] =json.dumps(zarray)
48-
rfs["refs"][f"{stream_id}/.zattrs"] =json.dumps(zattrs)
49-
elif descr["type"] == "hdf5":
50-
raise NotImplementedError
51-
else:
52-
raise ValueError(f"buffer_description type not handled {descr['type']}")
53-
54-
55-
56-
# TODO magnitude gain and offset
57-
# TODO channel names
58-
# TODO sampling rate
59-
# TODO annotations
60-
# TODO array_annotations
26+
27+
stream_name = signal_streams["name"][stream_index]
28+
stream_id = signal_streams["id"][stream_index]
29+
# print(stream_index, stream_name, stream_id)
30+
31+
32+
descr = rawio_reader.get_analogsignal_buffer_description(block_index=block_index, seg_index=seg_index,
33+
stream_index=stream_index)
34+
35+
if descr["type"] == "binary":
36+
dtype = np.dtype(descr["dtype"])
37+
zarray = dict(
38+
chunks=descr["shape"],
39+
compressor=None,
40+
dtype=dtype.str,
41+
fill_value=None,
42+
filters=None,
43+
order=descr["order"],
44+
shape=descr["shape"],
45+
zarr_format=2,
46+
)
47+
zattrs = dict(
48+
_ARRAY_DIMENSIONS=['time', 'channel'],
49+
name=stream_name,
50+
stream_id=stream_id,
51+
)
52+
# unique big chunk
53+
# TODO : optional split in several small chunks
54+
array_size = np.prod(descr["shape"], dtype='int64')
55+
chunk_size = array_size * dtype.itemsize
56+
rfs["refs"]["traces/0.0"] = [str(descr["file_path"]), descr["file_offset"], chunk_size]
57+
rfs["refs"]["traces/.zarray"] =json.dumps(zarray)
58+
rfs["refs"]["traces/.zattrs"] =json.dumps(zattrs)
59+
60+
# small enough can be internal
61+
mask = rawio_reader.header["signal_channels"]["stream_id"] == stream_id
62+
channel_ids = rawio_reader.header["signal_channels"][mask]["id"]
63+
base64_encoded = base64.b64encode(channel_ids.tobytes())
64+
rfs["refs"]["yep/0"] = "base64:" + base64_encoded.decode()
65+
zarray = dict(
66+
chunks=channel_ids.shape,
67+
compressor=None,
68+
dtype=channel_ids.dtype.str,
69+
fill_value=None,
70+
filters=None,
71+
order="C",
72+
shape=channel_ids.shape,
73+
zarr_format=2,
74+
)
75+
zattrs = dict(
76+
_ARRAY_DIMENSIONS=['channel'],
77+
)
78+
rfs["refs"]["yep/.zattrs"] =json.dumps(zattrs)
79+
rfs["refs"]["yep/.zarray"] =json.dumps(zarray)
80+
81+
elif descr["type"] == "hdf5":
82+
raise NotImplementedError
83+
else:
84+
raise ValueError(f"buffer_description type not handled {descr['type']}")
85+
86+
87+
88+
# TODO magnitude gain and offset
89+
# TODO channel names
90+
# TODO sampling rate
91+
# TODO annotations
92+
# TODO array_annotations
6193

6294

6395
return rfs
6496

6597

6698

67-
def to_xarray_dataset(rawio_reader):
99+
def to_xarray_dataset(rawio_reader, block_index=0, seg_index=0, stream_index=0):
68100
"""
69101
Utils fonction that transorm an instance a rawio into a xarray.Dataset
70102
with lazy access.
@@ -76,10 +108,11 @@ def to_xarray_dataset(rawio_reader):
76108
"""
77109
import xarray as xr
78110

79-
rfs = to_xarray_reference_api(rawio_reader)
111+
rfs = to_xarray_reference_api(rawio_reader, block_index=block_index, seg_index=seg_index, stream_index=stream_index)
80112

81113
ds = xr.open_dataset(
82114
"reference://",
115+
# mask_and_scale=True,
83116
mask_and_scale=False,
84117
engine="zarr",
85118
backend_kwargs={

0 commit comments

Comments
 (0)