-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #101 from yucongalicechen/mud
estimate mu*D from z-scan file
- Loading branch information
Showing
6 changed files
with
197 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
import numpy as np | ||
from scipy.optimize import dual_annealing | ||
from scipy.signal import convolve | ||
|
||
from diffpy.utils.parsers.loaddata import loadData | ||
|
||
|
||
def _top_hat(x, slit_width): | ||
""" | ||
create a top-hat function, return 1.0 for values within the specified slit width and 0 otherwise | ||
""" | ||
return np.where((x >= -slit_width) & (x <= slit_width), 1.0, 0) | ||
|
||
|
||
def _model_function(x, diameter, x0, I0, mud, slope): | ||
""" | ||
compute the model function with the following steps: | ||
1. Recenter x to h by subtracting x0 (so that the circle is centered at 0 and it is easier to compute length l) | ||
2. Compute length l that is the effective length for computing intensity I = I0 * e^{-mu * l}: | ||
- For h within the diameter range, l is the chord length of the circle at position h | ||
- For h outside this range, l = 0 | ||
3. Apply a linear adjustment to I0 by taking I0 as I0 - slope * x | ||
""" | ||
min_radius = -diameter / 2 | ||
max_radius = diameter / 2 | ||
h = x - x0 | ||
length = np.piecewise( | ||
h, | ||
[h < min_radius, (min_radius <= h) & (h <= max_radius), h > max_radius], | ||
[0, lambda h: 2 * np.sqrt((diameter / 2) ** 2 - h**2), 0], | ||
) | ||
return (I0 - slope * x) * np.exp(-mud / diameter * length) | ||
|
||
|
||
def _extend_x_and_convolve(x, diameter, slit_width, x0, I0, mud, slope): | ||
""" | ||
extend x values and I values for padding (so that we don't have tails in convolution), then perform convolution | ||
(note that the convolved I values are the same as modeled I values if slit width is close to 0) | ||
""" | ||
n_points = len(x) | ||
x_left_pad = np.linspace(x.min() - n_points * (x[1] - x[0]), x.min(), n_points) | ||
x_right_pad = np.linspace(x.max(), x.max() + n_points * (x[1] - x[0]), n_points) | ||
x_extended = np.concatenate([x_left_pad, x, x_right_pad]) | ||
I_extended = _model_function(x_extended, diameter, x0, I0, mud, slope) | ||
kernel = _top_hat(x_extended - x_extended.mean(), slit_width) | ||
I_convolved = I_extended # this takes care of the case where slit width is close to 0 | ||
if kernel.sum() != 0: | ||
kernel /= kernel.sum() | ||
I_convolved = convolve(I_extended, kernel, mode="same") | ||
padding_length = len(x_left_pad) | ||
return I_convolved[padding_length:-padding_length] | ||
|
||
|
||
def _objective_function(params, x, observed_data): | ||
""" | ||
compute the objective function for fitting a model to the observed/experimental data | ||
by minimizing the sum of squared residuals between the observed data and the convolved model data | ||
""" | ||
diameter, slit_width, x0, I0, mud, slope = params | ||
convolved_model_data = _extend_x_and_convolve(x, diameter, slit_width, x0, I0, mud, slope) | ||
residuals = observed_data - convolved_model_data | ||
return np.sum(residuals**2) | ||
|
||
|
||
def _compute_single_mud(x_data, I_data): | ||
""" | ||
perform dual annealing optimization and extract the parameters | ||
""" | ||
bounds = [ | ||
(1e-5, x_data.max() - x_data.min()), # diameter: [small positive value, upper bound] | ||
(0, (x_data.max() - x_data.min()) / 2), # slit width: [0, upper bound] | ||
(x_data.min(), x_data.max()), # x0: [min x, max x] | ||
(1e-5, I_data.max()), # I0: [small positive value, max observed intensity] | ||
(1e-5, 20), # muD: [small positive value, upper bound] | ||
(-10000, 10000), # slope: [lower bound, upper bound] | ||
] | ||
result = dual_annealing(_objective_function, bounds, args=(x_data, I_data)) | ||
diameter, slit_width, x0, I0, mud, slope = result.x | ||
convolved_fitted_signal = _extend_x_and_convolve(x_data, diameter, slit_width, x0, I0, mud, slope) | ||
residuals = I_data - convolved_fitted_signal | ||
rmse = np.sqrt(np.mean(residuals**2)) | ||
return mud, rmse | ||
|
||
|
||
def compute_mud(filepath): | ||
""" | ||
compute the best-fit mu*D value from a z-scan file | ||
Parameters | ||
---------- | ||
filepath str | ||
the path to the z-scan file | ||
Returns | ||
------- | ||
a float contains the best-fit mu*D value | ||
""" | ||
x_data, I_data = loadData(filepath, unpack=True) | ||
best_mud, _ = min((_compute_single_mud(x_data, I_data) for _ in range(10)), key=lambda pair: pair[1]) | ||
return best_mud |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from pathlib import Path | ||
|
||
import numpy as np | ||
import pytest | ||
|
||
from diffpy.labpdfproc.mud_calculator import _extend_x_and_convolve, compute_mud | ||
|
||
|
||
def test_compute_mud(tmp_path): | ||
diameter, slit_width, x0, I0, mud, slope = 1, 0.1, 0, 1e5, 3, 0 | ||
x_data = np.linspace(-1, 1, 50) | ||
convolved_I_data = _extend_x_and_convolve(x_data, diameter, slit_width, x0, I0, mud, slope) | ||
|
||
directory = Path(tmp_path) | ||
file = directory / "testfile" | ||
with open(file, "w") as f: | ||
for x, I in zip(x_data, convolved_I_data): | ||
f.write(f"{x}\t{I}\n") | ||
|
||
expected_mud = 3 | ||
actual_mud = compute_mud(file) | ||
assert actual_mud == pytest.approx(expected_mud, rel=1e-4, abs=0.1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
load_user_metadata, | ||
preprocessing_args, | ||
set_input_lists, | ||
set_mud, | ||
set_output_directory, | ||
set_wavelength, | ||
) | ||
|
@@ -188,6 +189,32 @@ def test_set_wavelength_bad(inputs, msg): | |
actual_args = set_wavelength(actual_args) | ||
|
||
|
||
def test_set_mud(user_filesystem): | ||
cli_inputs = ["2.5", "data.xy"] | ||
actual_args = get_args(cli_inputs) | ||
actual_args = set_mud(actual_args) | ||
assert actual_args.mud == pytest.approx(2.5, rel=1e-4, abs=0.1) | ||
assert actual_args.z_scan_file is None | ||
|
||
cwd = Path(user_filesystem) | ||
test_dir = cwd / "test_dir" | ||
os.chdir(cwd) | ||
inputs = ["--z-scan-file", "test_dir/testfile.xy"] | ||
expected = [3, str(test_dir / "testfile.xy")] | ||
cli_inputs = ["2.5", "data.xy"] + inputs | ||
actual_args = get_args(cli_inputs) | ||
actual_args = set_mud(actual_args) | ||
assert actual_args.mud == pytest.approx(expected[0], rel=1e-4, abs=0.1) | ||
assert actual_args.z_scan_file == expected[1] | ||
|
||
|
||
def test_set_mud_bad(): | ||
cli_inputs = ["2.5", "data.xy", "--z-scan-file", "invalid file"] | ||
actual_args = get_args(cli_inputs) | ||
with pytest.raises(FileNotFoundError, match="Cannot find invalid file. Please specify a valid file path."): | ||
actual_args = set_mud(actual_args) | ||
|
||
|
||
params5 = [ | ||
([], []), | ||
( | ||
|
@@ -317,5 +344,6 @@ def test_load_metadata(mocker, user_filesystem): | |
"username": "cli_username", | ||
"email": "[email protected]", | ||
"package_info": {"diffpy.labpdfproc": "1.2.3", "diffpy.utils": "3.3.0"}, | ||
"z_scan_file": None, | ||
} | ||
assert actual_metadata == expected_metadata |