Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 84 additions & 42 deletions src/freesas/app/extract_ascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

__author__ = "Jérôme Kieffer"
__license__ = "MIT"
__copyright__ = "2020, ESRF"
__date__ = "15/01/2021"
__copyright__ = "2020-2024, ESRF"
__date__ = "16/04/2025"

import io
import os
Expand All @@ -38,6 +38,7 @@
import posixpath
from collections import namedtuple, OrderedDict
import json
import zipfile
import copy
import pyFAI
from pyFAI.io import Nexus
Expand Down Expand Up @@ -78,6 +79,13 @@ def parse():
help="extract every individual frame",
default=False,
)
parser.add_argument(
"-z",
"--zip",
action="store_true",
help="extract every individual frame into a zip-file",
default=False,
)
return parser.parse_args()


Expand All @@ -87,11 +95,24 @@ def extract_averaged(filename):
results["filename"] = filename
# Missing: comment normalization
with Nexus(filename, "r") as nxsr:
entry_grp = nxsr.get_entries()[0]
default = nxsr.h5.attrs.get("default")
if default and default in nxsr.h5:
entry_grp = nxsr.h5[default]
else:
entry_grp = nxsr.get_entries()[0]
results["h5path"] = entry_grp.name
nxdata_grp = nxsr.h5[entry_grp.attrs["default"]]
# program = entry_grp.get("program_name")
# if program == ""
default = entry_grp.attrs["default"]
if posixpath.split(default)[-1] == "hplc":
default = posixpath.join(posixpath.split(default)[0],"results")
# print(default)
nxdata_grp = nxsr.h5[default]
signal = nxdata_grp.attrs["signal"]
axis = nxdata_grp.attrs["axes"]
if not isinstance(axis, (str,bytes)):
logger.error(f"There are several curves if the dataset {default} from file {filename}, please use the option --all to extract them all")
sys.exit(1)
results["I"] = nxdata_grp[signal][()]
results["q"] = nxdata_grp[axis][()]
results["std"] = nxdata_grp["errors"][()]
Expand All @@ -102,15 +123,12 @@ def extract_averaged(filename):
results["geometry"] = json.loads(
integration_grp["configuration/data"][()]
)
results["polarization"] = integration_grp[
"configuration/polarization_factor"
][()]
results["polarization"] = integration_grp["configuration/polarization_factor"][()]

instrument_grps = nxsr.get_class(entry_grp, class_type="NXinstrument")
if instrument_grps:
detector_grp = nxsr.get_class(
instrument_grps[0], class_type="NXdetector"
)[0]
detector_grp = nxsr.get_class(instrument_grps[0],
class_type="NXdetector")[0]
results["mask"] = detector_grp["pixel_mask"].attrs["filename"]
sample_grp = nxsr.get_class(entry_grp, class_type="NXsample")[0]
results["sample"] = posixpath.split(sample_grp.name)[-1]
Expand All @@ -119,52 +137,68 @@ def extract_averaged(filename):
results["exposure temperature"] = sample_grp["temperature"][()]
results["concentration"] = sample_grp["concentration"][()]
if "2_correlation_mapping" in entry_grp:
results["to_merge"] = entry_grp[
"2_correlation_mapping/results/to_merge"
][()]
results["to_merge"] = entry_grp["2_correlation_mapping/results/to_merge"][()]
return results


def extract_all(filename):
"return some infomations extracted from a HDF5 file for all individual frames"
"""return some infomations extracted from a HDF5 file for all individual frames.
Supports HPLC and SC and freshly integrated blocks of frames"""
res = []
results = OrderedDict()
results["filename"] = filename
with Nexus(filename, "r") as nxsr:
entry_grp = nxsr.get_entries()[0]
default = nxsr.h5.attrs.get("default")
if default and default in nxsr.h5:
entry_grp = nxsr.h5[default]
else:
entry_grp = nxsr.get_entries()[0]
program = entry_grp.get("program_name")[()].decode() if "program_name" in entry_grp else None

if program == "bm29.hplc":
target = "1_chromatogram"
elif program == "bm29.integratemultiframe":
target = "1_integration"
elif program == "bm29.subtract":
target = "3_azimuthal_integration"
else:
raise RuntimeError(f"Unable to read file written by {program}")
target = posixpath.join(entry_grp.name, target, "results")
results["h5path"] = entry_grp.name
nxdata_grp = nxsr.h5[entry_grp.name + "/1_integration/results"]
nxdata_grp = nxsr.h5[target]
signal = nxdata_grp.attrs["signal"]
axis = nxdata_grp.attrs["axes"][1]
I = nxdata_grp[signal][()]
results["q"] = nxdata_grp[axis][()]
std = nxdata_grp["errors"][()]
results["unit"] = pyFAI.units.to_unit(
axis + "_" + nxdata_grp[axis].attrs["units"]
)
try:
results["unit"] = pyFAI.units.to_unit(axis + "_" + nxdata_grp[axis].attrs["units"])
except KeyError:
logger.warning("Unable to parse radial units")
integration_grp = nxdata_grp.parent
results["geometry"] = json.loads(
integration_grp["configuration/data"][()]
)
results["polarization"] = integration_grp[
"configuration/polarization_factor"
][()]
instrument_grp = nxsr.get_class(entry_grp, class_type="NXinstrument")[
0
]
detector_grp = nxsr.get_class(instrument_grp, class_type="NXdetector")[
0
]
results["mask"] = detector_grp["pixel_mask"].attrs["filename"]
sample_grp = nxsr.get_class(entry_grp, class_type="NXsample")[0]
results["sample"] = posixpath.split(sample_grp.name)[-1]
results["buffer"] = sample_grp["buffer"][()]
if "temperature_env" in sample_grp:
results["storage temperature"] = sample_grp["temperature_env"][()]
if "temperature" in sample_grp:
results["exposure temperature"] = sample_grp["temperature"][()]
if "concentration" in sample_grp:
results["concentration"] = sample_grp["concentration"][()]
if "configuration/data" in integration_grp:
results["geometry"] = json.loads(integration_grp["configuration/data"][()])
else:
logger.warning("Unable to parse AzimuthalIntegrator configuration")
if "configuration/polarization_factor" in integration_grp:
results["polarization"] = integration_grp["configuration/polarization_factor"][()]
instrument_grp = nxsr.get_class(entry_grp, class_type="NXinstrument")
if instrument_grp:
detector_grp = nxsr.get_class(instrument_grp[0], class_type="NXdetector")
if detector_grp:
results["mask"] = detector_grp[0]["pixel_mask"].attrs["filename"]
sample_grp = nxsr.get_class(entry_grp, class_type="NXsample")
if sample_grp:
sample_grp = sample_grp[0]
results["sample"] = posixpath.split(sample_grp.name)[-1]
if "buffer" in sample_grp:
results["buffer"] = sample_grp["buffer"][()]
if "temperature_env" in sample_grp:
results["storage temperature"] = sample_grp["temperature_env"][()]
if "temperature" in sample_grp:
results["exposure temperature"] = sample_grp["temperature"][()]
if "concentration" in sample_grp:
results["concentration"] = sample_grp["concentration"][()]
# if "2_correlation_mapping" in entry_grp:
# results["to_merge"] = entry_grp["2_correlation_mapping/results/to_merge"][()]
for i, s in zip(I, std):
Expand Down Expand Up @@ -326,15 +360,23 @@ def main():
input_len = len(files)
logger.debug("%s input files", input_len)
for src in files:
print(f"{src} \t --> ", end="")
if args.all:
dest = os.path.splitext(src)[0] + "%04i.dat"
for idx, frame in enumerate(extract_all(src)):
print(src, " --> ", dest % idx)
write_ascii(frame, dest % idx)
print(dest)
elif args.zip:
dest = os.path.splitext(src)[0] + ".zip"
with zipfile.ZipFile(dest, "w") as z:
for idx, frame in enumerate(extract_all(src)):
z.writestr(f'frame_{idx:04d}.dat', write_ascii(frame))
print(dest)
else:
dest = os.path.splitext(src)[0] + ".dat"
write_ascii(extract_averaged(src), dest)
print(src, " --> ", dest)
print(dest)


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions src/freesas/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ py.install_sources([
'sasio.py',
'transformations.py',
'dnn.py',
'nexus_parser.py',
],
pure: false, # Will be installed next to binaries
subdir: 'freesas' # Folder relative to site-packages to install to
Expand Down
182 changes: 182 additions & 0 deletions src/freesas/nexus_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
__author__ = "Jérôme Kieffer"
__license__ = "MIT"
__copyright__ = "2017, ESRF"
__date__ = "20/01/2025"

import sys, os
import zipfile
import posixpath
import logging
from typing import Union
from silx.io.nxdata import NXdata
from dataclasses import dataclass
import numpy
logger = logging.getLogger(__name__)

try:
import h5py
except ImportError:
logger.error("H5py is mandatory to parse HDF5 files")
h5py = None


@dataclass
class IntegratedPattern:
"""Store one pyFAI integrated pattern"""

point: Union[float, int, None]
radial: numpy.ndarray
intensity: numpy.ndarray
intensity_errors: Union[numpy.ndarray, None]=None
radial_name: str = ""
radial_units: str = ""
intensity_name: str = ""
intensity_units: str = ""

def __repr__(self):
line = f"# {self.radial_name}({self.radial_units}) \t {self.intensity_name}({self.intensity_units})"
if self.intensity_errors is not None:
line += " \t uncertainties"
res = [line]
if self.intensity_errors is None:
for q,i,s in zip(self.radial, self.intensity):
res.append(f"{q} \t {i}")
else:
for q,i,s in zip(self.radial, self.intensity, self.intensity_errors):
res.append(f"{q} \t {i} \t {s}")
return os.linesep.join(res)


def read_nexus_integrated_patterns(group):
"""Read integrated patterns from a HDF5 NXdata group.

It reads from both single (1D signal) or multi (2D signal) NXdata.
:param group : h5py.Group
:return: list of IntegratedPattern instances.
"""
nxdata = NXdata(group)
if not nxdata.is_valid:
raise RuntimeError(
f"Cannot parse NXdata group: {group.file.filename}::{group.name}"
)
if not (nxdata.signal_is_1d or nxdata.signal_is_2d):
raise RuntimeError(
f"Signal is not a 1D or 2D dataset: {group.file.filename}::{group.name}"
)

if nxdata.signal_is_1d:
points = [None]
else: # 2d
if nxdata.axes[0] is None:
points = [None] * nxdata.signal.shape[0]
else:
points = nxdata.axes[0][()]

if nxdata.axes[-1] is None:
radial = numpy.arange(nxdata.signal.shape[1])
radial_units = ""
radial_name = ""
else:
axis_dataset = nxdata.axes[-1]
radial = axis_dataset[()]
radial_name = axis_dataset.name.split("/")[-1]
radial_units = axis_dataset.attrs.get("units", "")

intensities = numpy.atleast_2d(nxdata.signal)
intensity_name = nxdata.signal.name.split("/")[-1]
intensity_units = nxdata.signal.attrs.get("units", "")


if nxdata.errors is None:
errors = [None] * intensities.shape[0]
else:
errors = numpy.atleast_2d(nxdata.errors)

if (len(points), len(radial)) != intensities.shape:
raise RuntimeError("Shape mismatch between axes and signal")

return [IntegratedPattern(
point, radial, intensity, intensity_errors, radial_name, radial_units, intensity_name, intensity_units) for point, intensity, intensity_errors in zip(points, intensities, errors)]


class Tree:
def __init__(self, root=None):
self.root = root or {}
self.skip = set()
def visit_item(self, name, obj):
if name in self.skip:
return
node = self.root
path = [i.replace(" ","_") for i in name.split("/")]
for i in path[:-1]:
if i not in node:
node[i] = {}
node = node[i]
if isinstance(obj, h5py.Group):
if obj.attrs.get("NX_class") == "NXdata" and "errors" in obj:
try:
node[path[-1]] = read_nexus_integrated_patterns(obj)
except (KeyError, OSError) as err:
print(f"{type(err).__name__}: {err} while readding {path}")
for key in obj:
self.skip.add(posixpath.join(name,key))
if isinstance(obj[key], h5py.Group):
for sub in obj[key]:
self.skip.add(posixpath.join(name,key, sub))
else:
node[path[-1]] = {}
if isinstance(obj, h5py.Dataset):
if len(obj.shape) <= 1:
node[path[-1]] = obj[()]

def save(self, filename):
with zipfile.ZipFile(filename, "w") as z:
def write(path, name, obj):
new_path = posixpath.join(path, name)
if isinstance(obj, dict):
if sys.version_info>=(3,11): z.mkdir(new_path)
for key, value in obj.items():
write(new_path, key, value)
elif isinstance(obj, numpy.ndarray):
if obj.ndim == 1:
z.writestr(new_path, os.linesep.join(str(i) for i in obj))
else:
z.writestr(new_path, str(obj))
elif isinstance(obj, list):
if sys.version_info>=(3,11): z.mkdir(new_path)
if len(obj)==1:
fname = new_path+"/biosaxs.dat"
z.writestr(fname, str(obj[0]))
else:
for i,j in enumerate(obj):
fname = new_path+f"/bioxaxs_{i:04d}.dat"
z.writestr(fname, str(j))
elif isinstance(obj, (int, float, numpy.number, bool, numpy.bool)):
z.writestr(new_path, str(obj))
elif isinstance(obj, (str, bytes)):
z.writestr(new_path, obj)
else:
print(f"skip {new_path} for {obj} of type {obj.__class__.__mro__}")

root = self.root
for key, value in root.items():
write("", key, value)
def get(self, path):
node = self.root
for i in path.split("/"):
node = node[i]
return node

def convert_nexus2zip(nexusfile, outfile=None):
""" Convert a nexus-file, as produced by BM29 beamline into a zip file

:param nexusfile: string with the path of the input file
:param outfile: name of the output file, unless, just replace the extension with ".zip"
:return: nothing, maybe an error code ?
"""
tree = Tree()
with h5py.File(nexusfile, "r") as h:
h.visititems(tree.visit_item)
outfile = outfile or (os.path.splitext(nexusfile)[0]+".h5")
tree.save(outfile)

Loading
Loading