Skip to content

Commit

Permalink
solve another bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Erikpostt committed Sep 25, 2024
1 parent 1a335d8 commit 6b21752
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 45 deletions.
92 changes: 48 additions & 44 deletions src/paradigma/feature_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,54 +454,58 @@ def extract_angle_extremes(
The extracted angle extremes (peaks)
"""
# determine peaks
df[f'{angle_colname}_maxima'] = df.apply(
lambda row: find_peaks(row[angle_colname],
distance=sampling_frequency * 0.6 / row[dominant_frequency_colname],
prominence=2)[0], axis=1
)
df[f'{angle_colname}_minima'] = df.apply(
lambda row: find_peaks([-x for x in row[angle_colname]],
distance=sampling_frequency * 0.6 / row[dominant_frequency_colname],
prominence=2)[0], axis=1
)
df[f'{angle_colname}_maxima'] = df.apply(lambda x: signal.find_peaks(x[angle_colname], distance=sampling_frequency * 0.6 / x[dominant_frequency_colname], prominence=0.5)[0], axis=1)
df[f'{angle_colname}_minima'] = df.apply(lambda x: signal.find_peaks([-x for x in x[angle_colname]], distance=sampling_frequency * 0.6 / x[dominant_frequency_colname], prominence=0.5)[0], axis=1)

df[f'{angle_colname}_new_minima'] = df[f'{angle_colname}_minima'].copy()
df[f'{angle_colname}_new_maxima'] = df[f'{angle_colname}_maxima'].copy()

def refine_extrema(minima, maxima, angle_values):
"""Refine the minima and maxima by removing consecutive points that don't adhere to the rules."""
i_pks = 0
while i_pks < len(minima) - 1 and i_pks < len(maxima):
# Min-min or max-max sequences handling
if minima[i_pks+1] < maxima[i_pks]:
if angle_values[minima[i_pks+1]] < angle_values[minima[i_pks]]:
minima = np.delete(minima, i_pks)
else:
minima = np.delete(minima, i_pks+1)
i_pks -= 1
elif maxima[i_pks] < minima[i_pks]:
if angle_values[maxima[i_pks]] < angle_values[maxima[i_pks-1]]:
maxima = np.delete(maxima, i_pks)
else:
maxima = np.delete(maxima, i_pks-1)
i_pks -= 1
i_pks += 1
return minima, maxima

# Apply refinement to each row
for index, row in df.iterrows():
minima, maxima = refine_extrema(row[f'{angle_colname}_new_minima'], row[f'{angle_colname}_new_maxima'], row[angle_colname])
df.loc[index, f'{angle_colname}_new_minima'] = minima
df.loc[index, f'{angle_colname}_new_maxima'] = maxima

# Ensure scalar values are converted to lists
df[f'{angle_colname}_new_minima'] = df[f'{angle_colname}_new_minima'].apply(lambda x: [x] if isinstance(x, int) else x)
df[f'{angle_colname}_new_maxima'] = df[f'{angle_colname}_new_maxima'].apply(lambda x: [x] if isinstance(x, int) else x)

# Combine and sort minima and maxima
df[f'{angle_colname}_extrema_values'] = df.apply(
lambda row: [row[angle_colname][i] for i in sorted(np.concatenate([row[f'{angle_colname}_new_minima'], row[f'{angle_colname}new_maxima']]))], axis=1
)
for index, _ in df.iterrows():
i_pks = 0 # iterable to keep track of consecutive min-min and max-max versus min-max
n_min = df.loc[index, f'{angle_colname}_new_minima'].size # number of minima in window
n_max = df.loc[index, f'{angle_colname}_new_maxima'].size # number of maxima in window

if n_min > 0 and n_max > 0:
if df.loc[index, f'{angle_colname}_new_maxima'][0] > df.loc[index, f'{angle_colname}_new_minima'][0]: # if first minimum occurs before first maximum, start with minimum
while i_pks < df.loc[index, f'{angle_colname}_new_minima'].size - 1 and i_pks < df.loc[index, f'{angle_colname}_new_maxima'].size: # only continue if there's enough minima and maxima to perform operations
if df.loc[index, f'{angle_colname}_new_minima'][i_pks+1] < df.loc[index, f'{angle_colname}_new_maxima'][i_pks]: # if angle of next minimum comes before the current maxima, we have two minima in a row
if df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_minima'][i_pks+1]] < df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_minima'][i_pks]]: # if second minimum if smaller than first, keep second
df.at[index, f'{angle_colname}_new_minima'] = np.delete(df.loc[index, f'{angle_colname}_new_minima'], i_pks)
else: # otherwise keep the first
df.at[index, f'{angle_colname}_new_minima'] = np.delete(df.loc[index, f'{angle_colname}_new_minima'], i_pks+1)
i_pks -= 1
if i_pks >= 0 and df.loc[index, f'{angle_colname}_new_minima'][i_pks] > df.loc[index, f'{angle_colname}_new_maxima'][i_pks]:
if df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_maxima'][i_pks]] < df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_maxima'][i_pks-1]]:
df.at[index, f'{angle_colname}_new_maxima'] = np.delete(df.loc[index, f'{angle_colname}_new_maxima'], i_pks)
else:
df.at[index, f'{angle_colname}_new_maxima'] = np.delete(df.loc[index, f'{angle_colname}_new_maxima'], i_pks-1)
i_pks -= 1
i_pks += 1

elif df.loc[index, f'{angle_colname}_new_maxima'][0] < df.loc[index, f'{angle_colname}_new_minima'][0]: # if the first maximum occurs before the first minimum, start with the maximum
while i_pks < df.loc[index, f'{angle_colname}_new_minima'].size and i_pks < df.loc[index, f'{angle_colname}_new_maxima'].size-1:
if df.loc[index, f'{angle_colname}_new_minima'][i_pks] > df.loc[index, f'{angle_colname}_new_maxima'][i_pks+1]:
if df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_maxima'][i_pks+1]] > df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_maxima'][i_pks]]:
df.at[index, f'{angle_colname}_new_maxima'] = np.delete(df.loc[index, f'{angle_colname}_new_maxima'], i_pks)
else:
df.at[index, f'{angle_colname}_new_maxima'] = np.delete(df.loc[index, f'{angle_colname}_new_maxima'], i_pks+1)
i_pks -= 1
if i_pks > 0 and df.loc[index, f'{angle_colname}_new_minima'][i_pks] < df.loc[index, f'{angle_colname}_new_maxima'][i_pks]:
if df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_minima'][i_pks]] < df.loc[index, angle_colname][df.loc[index, f'{angle_colname}_new_minima'][i_pks-1]]:
df.at[index, f'{angle_colname}_new_minima'] = np.delete(df.loc[index, f'{angle_colname}_new_minima'], i_pks-1)

else:
df.at[index, f'{angle_colname}_new_minima'] = np.delete(df.loc[index, f'{angle_colname}_new_minima'], i_pks)
i_pks -= 1
i_pks += 1

# for some peculiar reason, if a single item remains in the row for {angle_colname}_new_minima or
# angle_new_maxima, it could be either a scalar or a vector.
for col in [f'{angle_colname}_new_minima', f'{angle_colname}_new_maxima']:
df.loc[df.apply(lambda x: type(x[col].tolist())==int, axis=1), col] = df.loc[df.apply(lambda x: type(x[col].tolist())==int, axis=1), col].apply(lambda x: [x])

# extract amplitude
df[f'{angle_colname}_extrema_values'] = df.apply(lambda x: [x[angle_colname][i] for i in sorted(np.concatenate([x[f'{angle_colname}_new_minima'], x[f'{angle_colname}_new_maxima']]))], axis=1)

return df[f'{angle_colname}_extrema_values']

Expand Down
2 changes: 1 addition & 1 deletion src/paradigma/gait_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def extract_arm_swing_features(input_path: Union[str, Path], output_path: Union[
)

# determine the extrema (minima and maxima) of the angle signal
extract_angle_extremes(
df_windowed['angle_extrema_values'] = extract_angle_extremes(
df=df_windowed,
angle_colname=config.angle_smooth_colname,
dominant_frequency_colname='angle_dominant_frequency',
Expand Down

0 comments on commit 6b21752

Please sign in to comment.