From 6b217520348bfbb76ca4a4ac9c1807f57ad2209e Mon Sep 17 00:00:00 2001 From: Erikpostt Date: Wed, 25 Sep 2024 15:22:43 +0200 Subject: [PATCH] solve another bug --- src/paradigma/feature_extraction.py | 92 +++++++++++++++-------------- src/paradigma/gait_analysis.py | 2 +- 2 files changed, 49 insertions(+), 45 deletions(-) diff --git a/src/paradigma/feature_extraction.py b/src/paradigma/feature_extraction.py index 5a22479..94cae26 100644 --- a/src/paradigma/feature_extraction.py +++ b/src/paradigma/feature_extraction.py @@ -454,54 +454,58 @@ def extract_angle_extremes( The extracted angle extremes (peaks) """ # determine peaks - df[f'{angle_colname}_maxima'] = df.apply( - lambda row: find_peaks(row[angle_colname], - distance=sampling_frequency * 0.6 / row[dominant_frequency_colname], - prominence=2)[0], axis=1 - ) - df[f'{angle_colname}_minima'] = df.apply( - lambda row: find_peaks([-x for x in row[angle_colname]], - distance=sampling_frequency * 0.6 / row[dominant_frequency_colname], - prominence=2)[0], axis=1 - ) + df[f'{angle_colname}_maxima'] = df.apply(lambda x: signal.find_peaks(x[angle_colname], distance=sampling_frequency * 0.6 / x[dominant_frequency_colname], prominence=0.5)[0], axis=1) + df[f'{angle_colname}_minima'] = df.apply(lambda x: signal.find_peaks([-x for x in x[angle_colname]], distance=sampling_frequency * 0.6 / x[dominant_frequency_colname], prominence=0.5)[0], axis=1) df[f'{angle_colname}_new_minima'] = df[f'{angle_colname}_minima'].copy() df[f'{angle_colname}_new_maxima'] = df[f'{angle_colname}_maxima'].copy() - def refine_extrema(minima, maxima, angle_values): - """Refine the minima and maxima by removing consecutive points that don't adhere to the rules.""" - i_pks = 0 - while i_pks < len(minima) - 1 and i_pks < len(maxima): - # Min-min or max-max sequences handling - if minima[i_pks+1] < maxima[i_pks]: - if angle_values[minima[i_pks+1]] < angle_values[minima[i_pks]]: - minima = np.delete(minima, i_pks) - else: - minima = np.delete(minima, i_pks+1) - i_pks -= 1 - elif maxima[i_pks] < minima[i_pks]: - if angle_values[maxima[i_pks]] < angle_values[maxima[i_pks-1]]: - maxima = np.delete(maxima, i_pks) - else: - maxima = np.delete(maxima, i_pks-1) - i_pks -= 1 - i_pks += 1 - return minima, maxima - - # Apply refinement to each row - for index, row in df.iterrows(): - minima, maxima = refine_extrema(row[f'{angle_colname}_new_minima'], row[f'{angle_colname}_new_maxima'], row[angle_colname]) - df.loc[index, f'{angle_colname}_new_minima'] = minima - df.loc[index, f'{angle_colname}_new_maxima'] = maxima - - # Ensure scalar values are converted to lists - df[f'{angle_colname}_new_minima'] = df[f'{angle_colname}_new_minima'].apply(lambda x: [x] if isinstance(x, int) else x) - df[f'{angle_colname}_new_maxima'] = df[f'{angle_colname}_new_maxima'].apply(lambda x: [x] if isinstance(x, int) else x) - - # Combine and sort minima and maxima - df[f'{angle_colname}_extrema_values'] = df.apply( - lambda row: [row[angle_colname][i] for i in sorted(np.concatenate([row[f'{angle_colname}_new_minima'], row[f'{angle_colname}new_maxima']]))], axis=1 - ) + for index, _ in df.iterrows(): + i_pks = 0 # iterable to keep track of consecutive min-min and max-max versus min-max + n_min = df.loc[index, f'{angle_colname}_new_minima'].size # number of minima in window + n_max = df.loc[index, f'{angle_colname}_new_maxima'].size # number of maxima in window + + if n_min > 0 and n_max > 0: + if df.loc[index, f'{angle_colname}_new_maxima'][0] > df.loc[index, f'{angle_colname}_new_minima'][0]: # if first minimum occurs before first maximum, start with minimum + while i_pks < df.loc[index, f'{angle_colname}_new_minima'].size - 1 and i_pks < df.loc[index, f'{angle_colname}_new_maxima'].size: # only continue if there's enough minima and maxima to perform operations + if df.loc[index, f'{angle_colname}_new_minima'][i_pks+1] < df.loc[index, f'{angle_colname}_new_maxima'][i_pks]: # if angle of next minimum comes before the current maxima, we have two minima in a row + if df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_minima'][i_pks+1]] < df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_minima'][i_pks]]: # if second minimum if smaller than first, keep second + df.at[index, f'{angle_colname}_new_minima'] = np.delete(df.loc[index, f'{angle_colname}_new_minima'], i_pks) + else: # otherwise keep the first + df.at[index, f'{angle_colname}_new_minima'] = np.delete(df.loc[index, f'{angle_colname}_new_minima'], i_pks+1) + i_pks -= 1 + if i_pks >= 0 and df.loc[index, f'{angle_colname}_new_minima'][i_pks] > df.loc[index, f'{angle_colname}_new_maxima'][i_pks]: + if df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_maxima'][i_pks]] < df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_maxima'][i_pks-1]]: + df.at[index, f'{angle_colname}_new_maxima'] = np.delete(df.loc[index, f'{angle_colname}_new_maxima'], i_pks) + else: + df.at[index, f'{angle_colname}_new_maxima'] = np.delete(df.loc[index, f'{angle_colname}_new_maxima'], i_pks-1) + i_pks -= 1 + i_pks += 1 + + elif df.loc[index, f'{angle_colname}_new_maxima'][0] < df.loc[index, f'{angle_colname}_new_minima'][0]: # if the first maximum occurs before the first minimum, start with the maximum + while i_pks < df.loc[index, f'{angle_colname}_new_minima'].size and i_pks < df.loc[index, f'{angle_colname}_new_maxima'].size-1: + if df.loc[index, f'{angle_colname}_new_minima'][i_pks] > df.loc[index, f'{angle_colname}_new_maxima'][i_pks+1]: + if df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_maxima'][i_pks+1]] > df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_maxima'][i_pks]]: + df.at[index, f'{angle_colname}_new_maxima'] = np.delete(df.loc[index, f'{angle_colname}_new_maxima'], i_pks) + else: + df.at[index, f'{angle_colname}_new_maxima'] = np.delete(df.loc[index, f'{angle_colname}_new_maxima'], i_pks+1) + i_pks -= 1 + if i_pks > 0 and df.loc[index, f'{angle_colname}_new_minima'][i_pks] < df.loc[index, f'{angle_colname}_new_maxima'][i_pks]: + if df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_minima'][i_pks]] < df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_minima'][i_pks-1]]: + df.at[index, f'{angle_colname}_new_minima'] = np.delete(df.loc[index, f'{angle_colname}_new_minima'], i_pks-1) + + else: + df.at[index, f'{angle_colname}_new_minima'] = np.delete(df.loc[index, f'{angle_colname}_new_minima'], i_pks) + i_pks -= 1 + i_pks += 1 + + # for some peculiar reason, if a single item remains in the row for {angle_colname}_new_minima or + # angle_new_maxima, it could be either a scalar or a vector. + for col in [f'{angle_colname}_new_minima', f'{angle_colname}_new_maxima']: + df.loc[df.apply(lambda x: type(x[col].tolist())==int, axis=1), col] = df.loc[df.apply(lambda x: type(x[col].tolist())==int, axis=1), col].apply(lambda x: [x]) + + # extract amplitude + df[f'{angle_colname}_extrema_values'] = df.apply(lambda x: [x[angle_colname][i] for i in sorted(np.concatenate([x[f'{angle_colname}_new_minima'], x[f'{angle_colname}_new_maxima']]))], axis=1) return df[f'{angle_colname}_extrema_values'] diff --git a/src/paradigma/gait_analysis.py b/src/paradigma/gait_analysis.py index ae68e90..211b1f9 100644 --- a/src/paradigma/gait_analysis.py +++ b/src/paradigma/gait_analysis.py @@ -207,7 +207,7 @@ def extract_arm_swing_features(input_path: Union[str, Path], output_path: Union[ ) # determine the extrema (minima and maxima) of the angle signal - extract_angle_extremes( + df_windowed['angle_extrema_values'] = extract_angle_extremes( df=df_windowed, angle_colname=config.angle_smooth_colname, dominant_frequency_colname='angle_dominant_frequency',