diff --git a/docs/codebook.rst b/docs/codebook.rst index 1a33073fc3..8606f413fa 100644 --- a/docs/codebook.rst +++ b/docs/codebook.rst @@ -23,8 +23,6 @@ Codebook Table #csvDataTable { width: 100%; border-collapse: collapse; - .. background-color: #f8f8f8; - color: white; } #csvDataTable th, #csvDataTable td { padding: 8px 12px; diff --git a/neurokit2/__init__.py b/neurokit2/__init__.py index 9fe63da09a..854dad13d5 100644 --- a/neurokit2/__init__.py +++ b/neurokit2/__init__.py @@ -33,7 +33,7 @@ from .video import * # Info -__version__ = "0.2.10" +__version__ = "0.2.11" # Maintainer info diff --git a/neurokit2/ecg/ecg_delineate.py b/neurokit2/ecg/ecg_delineate.py index dc64e15ba3..8a2c70b241 100644 --- a/neurokit2/ecg/ecg_delineate.py +++ b/neurokit2/ecg/ecg_delineate.py @@ -49,8 +49,13 @@ def ecg_delineate( sampling_rate : int The sampling frequency of ``ecg_signal`` (in Hz, i.e., samples/second). Defaults to 1000. method : str - Can be one of ``"peak"`` for a peak-based method, ``"cwt"`` for continuous wavelet transform - or ``"dwt"`` (default) for discrete wavelet transform. + Can be one of ``"peak"`` for a peak-based method, ``"prominence"`` for a peak-prominence-based + method (Emrich et al., 2024), ``"cwt"`` for continuous wavelet transform or ``"dwt"`` (default) + for discrete wavelet transform. + The ``"prominence"`` method might be useful to detect the waves, allowing to set individual physiological + limits (see kwargs), while the ``"dwt"`` method might be more precise for detecting the onsets and offsets + of the waves (but might exhibit lower accuracy when there is significant variation in wave morphology). + The ``"peak"`` method, which uses the zero-crossings of the signal derivatives, works best with very clean signals. show : bool If ``True``, will return a plot to visualizing the delineated waves information. show_type: str @@ -60,7 +65,16 @@ def ecg_delineate( Defaults to ``False``. If ``True``, replaces the delineated features with ``np.nan`` if its standardized distance from R-peaks is more than 3. **kwargs - Other optional arguments. + Other optional arguments: + If using the ``"prominence"`` method, additional parameters (in milliseconds) can be passed to set + individual physiological limits for the search boundaries: + - ``max_qrs_interval``: The maximum allowable QRS complex interval. Defaults to 180 ms. + - ``max_pr_interval``: The maximum PR interval duration. Defaults to 300 ms. + - ``max_r_rise_time``: Maximum duration for the R-wave rise. Defaults to 120 ms. + - ``typical_st_segment``: Typical duration of the ST segment. Defaults to 150 ms. + - ``max_p_basepoint_interval``: The maximum interval between P-wave on- and offset. Defaults to 100 ms. + - ``max_r_basepoint_interval``: The maximum interval between R-wave on- and offset. Defaults to 100 ms. + - ``max_t_basepoint_interval``: The maximum interval between T-wave on- and offset. Defaults to 200 ms. Returns ------- @@ -71,7 +85,7 @@ def ecg_delineate( ``"ECG_Q_Peaks"``, ``"ECG_S_Peaks"``, ``"ECG_T_Peaks"``, ``"ECG_P_Onsets"``, ``"ECG_T_Offsets"``, respectively. - For wavelet methods, in addition to the above information, the dictionary contains the + For the wavelet and prominence methods, in addition to the above information, the dictionary contains the samples at which QRS-onsets and QRS-offsets occur, accessible with the key ``"ECG_P_Peaks"``, ``"ECG_T_Peaks"``, ``"ECG_P_Onsets"``, ``"ECG_P_Offsets"``, ``"ECG_Q_Peaks"``, ``"ECG_S_Peaks"``, ``"ECG_T_Onsets"``, ``"ECG_T_Offsets"``, @@ -114,6 +128,8 @@ def ecg_delineate( - Martínez, J. P., Almeida, R., Olmos, S., Rocha, A. P., & Laguna, P. (2004). A wavelet-based ECG delineator: evaluation on standard databases. IEEE Transactions on biomedical engineering, 51(4), 570-581. + - Emrich, J., Gargano, A., Koka, T., & Muma, M. (2024). Physiology-Informed ECG Delineation Based + on Peak Prominence. 32nd European Signal Processing Conference (EUSIPCO), 1402-1406. """ # Sanitize input for ecg_cleaned @@ -162,10 +178,12 @@ def ecg_delineate( ) elif method in ["dwt", "discrete wavelet transform"]: waves = _dwt_ecg_delineator(ecg_cleaned, rpeaks, sampling_rate=sampling_rate) + elif method in ["prominence", "peak-prominence", "emrich", "emrich2024"]: + waves = _prominence_ecg_delineator(ecg_cleaned, rpeaks=rpeaks, sampling_rate=sampling_rate, **kwargs) else: raise ValueError( - "NeuroKit error: ecg_delineate(): 'method' should be one of 'peak'," + "NeuroKit error: ecg_delineate(): 'method' should be one of 'peak', 'prominence'," "'cwt' or 'dwt'." ) @@ -739,6 +757,195 @@ def _ecg_delineator_cwt(ecg, rpeaks=None, sampling_rate=1000): } +# ============================================================================= +# PROMINENCE METHOD (Emrich et al., 2024) +# ============================================================================= +def _prominence_ecg_delineator(ecg, rpeaks=None, sampling_rate=1000, **kwargs): + # pysiology-informed boundaries in milliseconds, adapt if needed + max_qrs_interval = int(kwargs.get("max_qrs_interval", 180) * sampling_rate / 1000) + max_pr_interval = int(kwargs.get("max_pr_interval", 300) * sampling_rate / 1000) + max_r_rise_time = int(kwargs.get("max_r_rise_time", 120) * sampling_rate / 1000) + typical_st_segment = int(kwargs.get("typical_st_segment", 150) * sampling_rate / 1000) + # max basepoint intervals + max_p_basepoint_interval = int(kwargs.get("max_p_basepoint_interval", 100) * sampling_rate / 1000) + max_r_basepoint_interval = int(kwargs.get("max_r_basepoint_interval", 100) * sampling_rate / 1000) + max_t_basepoint_interval = int(kwargs.get("max_t_basepoint_interval", 200) * sampling_rate / 1000) + + waves = { + "ECG_P_Onsets": [], + "ECG_P_Peaks": [], + "ECG_P_Offsets": [], + "ECG_Q_Peaks": [], + "ECG_R_Onsets": [], + "ECG_R_Offsets": [], + "ECG_S_Peaks": [], + "ECG_T_Onsets": [], + "ECG_T_Peaks": [], + "ECG_T_Offsets": [], + } + + # calculate RR intervals + rr = np.diff(rpeaks) + rr = np.insert(rr, 0, min(rr[0], 2 * rpeaks[0])) + rr = np.insert(rr, -1, min(rr[-1], 2 * rpeaks[-1])) + + # iterate over all beats + left = 0 + for i in range(len(rpeaks)): + # 1. split signal into segments + rpeak_pos = min(rpeaks[i], rr[i] // 2) + left = rpeaks[i] - rpeak_pos + right = rpeaks[i] + rr[i + 1] // 2 + ecg_seg = ecg[left:right] + + current_wave = { + "ECG_R_Peaks": rpeak_pos, + } + + # 2. find local extrema in signal + local_maxima = scipy.signal.find_peaks(ecg_seg)[0] + local_minima = scipy.signal.find_peaks(-ecg_seg)[0] + local_extrema = np.concatenate((local_maxima, local_minima)) + + # 3. compute prominence weight + weight_maxima = _calc_prominence(local_maxima, ecg_seg, current_wave["ECG_R_Peaks"]) + weight_minima = _calc_prominence(local_minima, ecg_seg, current_wave["ECG_R_Peaks"], minima=True) + + if local_extrema.any(): + # find waves + _prominence_find_q_wave(weight_minima, current_wave, max_r_rise_time) + _prominence_find_s_wave(ecg_seg, weight_minima, current_wave, max_qrs_interval) + _prominence_find_p_wave(local_maxima, weight_maxima, current_wave, max_pr_interval) + _prominence_find_t_wave(local_extrema, (weight_minima + weight_maxima), current_wave, typical_st_segment) + _prominence_find_on_offsets( + ecg_seg, + sampling_rate, + local_minima, + current_wave, + max_p_basepoint_interval, + max_r_basepoint_interval, + max_t_basepoint_interval, + ) + + # append waves for current beat / complex + for key in waves: + if key == "ECG_R_Peaks": + waves[key].append(int(rpeaks[i])) + elif key in current_wave: + waves[key].append(int(current_wave[key] + left)) + else: + waves[key].append(np.nan) + + return waves + + +def _calc_prominence(peaks, sig, Rpeak=None, minima=False): + """Returns an array of the same length as sig with prominences computed for the provided peaks. + + Prominence of the R-peak is excluded if the R-peak position is given. + + """ + w = np.zeros_like(sig) + + if len(peaks) < 1: + return w + # get prominence + _sig = -sig if minima else sig + w[peaks] = scipy.signal.peak_prominences(_sig, peaks)[0] + # optional: set rpeak prominence to zero to emphasize other peaks + if Rpeak is not None: + w[Rpeak] = 0 + return w + + +def _prominence_find_q_wave(weight_minima, current_wave, max_r_rise_time): + if "ECG_R_Peaks" not in current_wave: + return + q_bound = max(current_wave["ECG_R_Peaks"] - max_r_rise_time, 0) + + current_wave["ECG_Q_Peaks"] = np.argmax(weight_minima[q_bound : current_wave["ECG_R_Peaks"]]) + q_bound + + +def _prominence_find_s_wave(sig, weight_minima, current_wave, max_qrs_interval): + if "ECG_Q_Peaks" not in current_wave: + return + s_bound = current_wave["ECG_Q_Peaks"] + max_qrs_interval + s_wave = np.argmax(weight_minima[current_wave["ECG_R_Peaks"] : s_bound] > 0) + current_wave["ECG_R_Peaks"] + current_wave["ECG_S_Peaks"] = ( + np.argmin(sig[current_wave["ECG_R_Peaks"] : s_bound]) + current_wave["ECG_R_Peaks"] + if s_wave == current_wave["ECG_R_Peaks"] + else s_wave + ) + + +def _prominence_find_p_wave(local_maxima, weight_maxima, current_wave, max_pr_interval): + if "ECG_Q_Peaks" not in current_wave: + return + p_candidates = local_maxima[ + (current_wave["ECG_Q_Peaks"] - max_pr_interval <= local_maxima) & (local_maxima <= current_wave["ECG_Q_Peaks"]) + ] + if p_candidates.any(): + current_wave["ECG_P_Peaks"] = p_candidates[np.argmax(weight_maxima[p_candidates])] + + +def _prominence_find_t_wave(local_extrema, weight_extrema, current_wave, typical_st_segment): + if "ECG_S_Peaks" not in current_wave: + return + t_candidates = local_extrema[(current_wave["ECG_S_Peaks"] + typical_st_segment <= local_extrema)] + if t_candidates.any(): + current_wave["ECG_T_Peaks"] = t_candidates[np.argmax(weight_extrema[t_candidates])] + + +def _prominence_find_on_offsets( + sig, + sampling_rate, + local_minima, + current_wave, + max_p_basepoint_interval, + max_r_basepoint_interval, + max_t_basepoint_interval, +): + if "ECG_P_Peaks" in current_wave: + _, p_on, p_off = scipy.signal.peak_prominences( + sig, [current_wave["ECG_P_Peaks"]], wlen=max_p_basepoint_interval + ) + if not np.isnan(p_on): + current_wave["ECG_P_Onsets"] = p_on[0] + if not np.isnan(p_off): + current_wave["ECG_P_Offsets"] = p_off[0] + + if "ECG_T_Peaks" in current_wave: + p = -1 if np.isin(current_wave["ECG_T_Peaks"], local_minima) else 1 + + _, t_on, t_off = scipy.signal.peak_prominences( + p * sig, [current_wave["ECG_T_Peaks"]], wlen=max_t_basepoint_interval + ) + if not np.isnan(t_on): + current_wave["ECG_T_Onsets"] = t_on[0] + if not np.isnan(t_off): + current_wave["ECG_T_Offsets"] = t_off[0] + + # correct R-peak position towards local maxima (otherwise prominence will be falsely computed) + r_pos = _correct_peak(sig, sampling_rate, current_wave["ECG_R_Peaks"]) + _, r_on, r_off = scipy.signal.peak_prominences(sig, [r_pos], wlen=max_r_basepoint_interval) + if not np.isnan(r_on): + current_wave["ECG_R_Onsets"] = r_on[0] + + if not np.isnan(r_off): + current_wave["ECG_R_Offsets"] = r_off[0] + + +def _correct_peak(sig, fs, peak, window=0.02): + """Correct peak towards local maxima within provided window.""" + + left = peak - int(window * fs) + right = peak + int(window * fs) + if len(sig[left:right]) > 0: + return np.argmax(sig[left:right]) + left + else: + return peak + + # Internals # --------------------- @@ -798,11 +1005,7 @@ def _onset_offset_delineator(ecg, peaks, peak_type="rpeaks", sampling_rate=1000) epsilon_onset = 0.25 * wt_peaks_data["peak_heights"][-1] leftbase = wt_peaks_data["left_bases"][-1] + index_peak - half_wave_width if peak_type == "rpeaks": - candidate_onsets = ( - np.where(cwtmatr[2, nfirst - 100 : nfirst] < epsilon_onset)[0] - + nfirst - - 100 - ) + candidate_onsets = np.where(cwtmatr[2, nfirst - 100 : nfirst] < epsilon_onset)[0] + nfirst - 100 elif peak_type in ["tpeaks", "ppeaks"]: candidate_onsets = ( np.where(-cwtmatr[4, nfirst - 100 : nfirst] < epsilon_onset)[0] @@ -1123,6 +1326,7 @@ def _ecg_delineate_plot( sampling_rate=1000, window_start=-0.35, window_end=0.55, + **kwargs ): """ import neurokit2 as nk diff --git a/neurokit2/ecg/ecg_quality.py b/neurokit2/ecg/ecg_quality.py index 1f6cadb27f..a4f804d1dc 100644 --- a/neurokit2/ecg/ecg_quality.py +++ b/neurokit2/ecg/ecg_quality.py @@ -57,7 +57,7 @@ def ecg_quality( ------- array or str Vector containing the quality index ranging from 0 to 1 for ``"averageQRS"`` method, - returns string classification (``Unacceptable``, ``Barely Acceptable`` or ``Excellent``) + returns string classification (``Unacceptable``, ``Barely acceptable`` or ``Excellent``) of the signal for ``"zhao2018"`` method. See Also @@ -157,7 +157,7 @@ def _ecg_quality_averageQRS(ecg_cleaned, rpeaks=None, sampling_rate=1000): # Interpolate quality = signal_interpolate( - rpeaks, quality, x_new=np.arange(len(ecg_cleaned)), method="quadratic" + rpeaks, quality, x_new=np.arange(len(ecg_cleaned)), method="previous" ) return quality diff --git a/neurokit2/eda/eda_process.py b/neurokit2/eda/eda_process.py index c93361eaa3..279008e8d4 100644 --- a/neurokit2/eda/eda_process.py +++ b/neurokit2/eda/eda_process.py @@ -50,7 +50,7 @@ def eda_process( SCR_Height|The SCR amplitude of the signal including the Tonic component. Note that cumulative \ effects of close-occurring SCRs might lead to an underestimation of the amplitude. SCR_Amplitude|The SCR amplitude of the signal excluding the Tonic component. - SCR_RiseTime|The SCR amplitude of the signal excluding the Tonic component. + SCR_RiseTime|The time taken for SCR onset to reach peak amplitude within the SCR. SCR_Recovery|The samples at which SCR peaks recover (decline) to half amplitude, marked as "1" \ in a list of zeros. diff --git a/neurokit2/hrv/hrv_time.py b/neurokit2/hrv/hrv_time.py index 2ec3158b94..46b97ef70a 100644 --- a/neurokit2/hrv/hrv_time.py +++ b/neurokit2/hrv/hrv_time.py @@ -91,6 +91,11 @@ def hrv_time(peaks, sampling_rate=1000, show=False, **kwargs): -------- ecg_peaks, ppg_peaks, hrv_frequency, hrv_summary, hrv_nonlinear + Notes + ----- + + Where applicable, the unit used for these metrics is the millisecond. + Examples -------- .. ipython:: python diff --git a/neurokit2/ppg/ppg_quality.py b/neurokit2/ppg/ppg_quality.py index c53a609f94..4ef625337c 100644 --- a/neurokit2/ppg/ppg_quality.py +++ b/neurokit2/ppg/ppg_quality.py @@ -138,7 +138,7 @@ def _ppg_quality_templatematch(ppg_cleaned, ppg_pw_peaks=None, sampling_rate=100 # Interpolate beat-by-beat CCs quality = signal_interpolate( - ppg_pw_peaks[0:-1], cc, x_new=np.arange(len(ppg_cleaned)), method="quadratic" + ppg_pw_peaks[0:-1], cc, x_new=np.arange(len(ppg_cleaned)), method="previous" ) return quality @@ -193,4 +193,4 @@ def _ppg_quality_disimilarity(ppg_cleaned, ppg_pw_peaks=None, sampling_rate=1000 ppg_pw_peaks[0:-1], dis, x_new=np.arange(len(ppg_cleaned)), method="previous" ) - return quality \ No newline at end of file + return quality diff --git a/neurokit2/signal/signal_interpolate.py b/neurokit2/signal/signal_interpolate.py index c44e8053c1..9574ebbc78 100644 --- a/neurokit2/signal/signal_interpolate.py +++ b/neurokit2/signal/signal_interpolate.py @@ -101,6 +101,8 @@ def signal_interpolate( x_values = np.squeeze(x_values.values) if isinstance(x_new, pd.Series): x_new = np.squeeze(x_new.values) + if isinstance(y_values, pd.Series): + y_values = np.squeeze(y_values.values) if len(x_values) != len(y_values): raise ValueError("x_values and y_values must be of the same length.") @@ -158,7 +160,7 @@ def signal_interpolate( # scipy.interpolate.PchipInterpolator for constant extrapolation akin to the behavior of # scipy.interpolate.interp1d with fill_value=([y_values[0]], [y_values[-1]]. fill_value = ([interpolated[first_index]], [interpolated[last_index]]) - elif isinstance(fill_value, float) or isinstance(fill_value, int): + elif isinstance(fill_value, (float, int)): # if only a single integer or float is provided as a fill value, format as a tuple fill_value = ([fill_value], [fill_value]) diff --git a/tests/tests_signal.py b/tests/tests_signal.py index 399ee7babd..121ab2496a 100644 --- a/tests/tests_signal.py +++ b/tests/tests_signal.py @@ -208,14 +208,25 @@ def test_signal_filter_with_missing(): def test_signal_interpolate(): + # Test with arrays x_axis = np.linspace(start=10, stop=30, num=10) signal = np.cos(x_axis) + x_new = np.arange(1000) - interpolated = nk.signal_interpolate(x_axis, signal, x_new=np.arange(1000)) + interpolated = nk.signal_interpolate(x_axis, signal, x_new) assert len(interpolated) == 1000 assert interpolated[0] == signal[0] assert interpolated[-1] == signal[-1] + # Test with Series + x_axis = pd.Series(x_axis) + signal = pd.Series(signal) + x_new = pd.Series(x_new) + + interpolated = nk.signal_interpolate(x_axis, signal, x_new) + assert len(interpolated) == 1000 + assert interpolated[0] == signal.iloc[0] + assert interpolated[-1] == signal.iloc[-1] def test_signal_findpeaks():