1
+ # Pré-processamento com extração de features prosódicas
2
+ # Fonte:
3
+
4
+ # Primeira etapa: organização dos dados
5
+
6
+ # Ter uma pasta com todos os áudios dentro .wav
7
+
8
+ # Segunda etapa:
9
+ # Extração das features prosódicas
10
+
11
+ # Related methods
12
+ # Luengo, I., Navas, E., Hernáez, I., & Sánchez, J. (2005). Automatic emotion recognition using prosodic parameters. In Ninth European conference on speech communication and technology.
13
+ # Rao, K. S., Koolagudi, S. G., & Vempada, R. R. (2013). Emotion recognition from speech using global and local prosodic features. International journal of speech technology, 16(2), 143-160.
14
+
15
+ import os
16
+ from os .path import isfile , join
17
+ import pandas as pd
18
+ import parselmouth
19
+ from adapted_feature_extraction_utils import *
20
+ import librosa
21
+ import numpy
22
+ import tgt
23
+ import chardet
24
+ import statistics
25
+
26
+
27
+ # Função para extrair features prosódicas
28
+ def extract_prosody (frame , sr , frame_id ,utt_avg_pitch , next_interval_text , next_interval_dur , interval_start_time , interval_end_time , nucleus_end_time , nucleus_start_time , nucleus_vowel , vowel_stats_dict ): #, phones_per_syl): # em vez de passar o caminho do arquivo (sound_filepath), vou passar o array de janelas de 10ms do áudio
29
+
30
+ #sound = parselmouth.Sound(sound_filepath)
31
+ sound = parselmouth .Sound (values = frame , sampling_frequency = sr )
32
+ df = pd .DataFrame ()
33
+
34
+ attributes = {}
35
+
36
+ intensity_attributes = get_intensity_attributes (sound )[0 ]
37
+ pitch_attributes , _ , avg_pitch = get_pitch_attributes (sound )#[0]
38
+
39
+ attributes ['f0_avgutt_diff' ] = abs (avg_pitch - utt_avg_pitch )
40
+ if next_interval_text == "sil" :
41
+ p_dur = next_interval_dur
42
+ else :
43
+ p_dur = 0
44
+ attributes ['p_dur' ] = p_dur
45
+
46
+ # normalized duration of the nucleus of the syllable (always a vowel)
47
+ #attributes['syllable_dur'] = interval_end_time - interval_start_time
48
+
49
+ if nucleus_vowel != None :
50
+ vowel_type_mean = vowel_stats_dict [nucleus_vowel ]["mean" ]
51
+ vowel_type_std_dev = vowel_stats_dict [nucleus_vowel ]["std_dev" ]
52
+
53
+ attributes ['n_dur' ] = ((nucleus_end_time - nucleus_start_time ) - vowel_type_mean ) / vowel_type_std_dev
54
+ else :
55
+ attributes ['n_dur' ] = 0 # não tinha vogal núcleo na sílaba
56
+
57
+ #attributes['n_phones'] = phones_per_syl
58
+
59
+ #print("n_phones", phones_per_syl )
60
+ #print(attributes['n_phones'])
61
+
62
+ attributes .update (intensity_attributes )
63
+ attributes .update (pitch_attributes )
64
+
65
+ #print(attributes)
66
+
67
+ for attribute in attributes :
68
+ df .at [0 , attribute ] = attributes [attribute ]
69
+
70
+ #df['n_phones'] = df['n_phones'].astype(int) # transforma o n_phones em int no dataframe, mas os valores continuam aparecendo com .0 na versão final...
71
+ df .at [0 , 'frame' ] = frame_id
72
+ rearranged_columns = df .columns .tolist ()[- 1 :] + df .columns .tolist ()[:- 1 ]
73
+ df = df [rearranged_columns ]
74
+
75
+ return df
76
+
77
+
78
+ def predict_encoding (tg_path ):
79
+ '''Predict a file's encoding using chardet'''
80
+ # Open the file as binary data
81
+ with open (tg_path , 'rb' ) as f :
82
+ # Join binary lines for specified number of lines
83
+ rawdata = b'' .join (f .readlines ())
84
+
85
+ return chardet .detect (rawdata )['encoding' ]
86
+
87
+ # Córpus MuPe-Diversidades
88
+
89
+ estados = ["AL" , "BA" , "CE" , "ES" , "GO" , "MG" , "MS" , "PA" , "PB" , "PE" , "PI" , "PR" , "RJ" , "RO" , "RS" , "SE" , "SP" ]
90
+ numeros = ["1" , "2" ]
91
+
92
+ # Cria lista com nomes dos áudios
93
+ common_path = os .getcwd () + "/MuPe-Diversidades/versao-1/" # os.getcwd gets the current folder
94
+ audio_files = []
95
+ for estado in estados :
96
+ for numero in numeros :
97
+ audio_id = estado + numero + '.wav'
98
+ if isfile (join (common_path + estado + "/" , audio_id )): # checando se o arquivo realmente existe
99
+ audio_files .append (estado + "/" + audio_id )
100
+
101
+ # cria lista com os nomes, ids, estados e caminhos dos áudios dentro da pasta de seu estado
102
+ audios_list = []
103
+ tg_phones_list = []
104
+ tg_reference_list = []
105
+ for path in audio_files :
106
+ audio_id = path .replace ('.wav' ,'' ).split ("/" )[1 ]
107
+ estado = path [0 :2 ]
108
+ file = path [3 :]
109
+ audios_list .append ([file ,audio_id ,estado , common_path + path ])
110
+ tg_phones = common_path + estado + "/" + audio_id + "_fones.TextGrid"
111
+ tg_reference = common_path + estado + "/" + audio_id + "_OUTPUT_revised.TextGrid"
112
+ tg_phones_list .append (tg_phones )
113
+ tg_reference_list .append (tg_reference )
114
+
115
+ for i , inquiry in enumerate (audios_list ): # [4:5], start=4
116
+ print ("COMEÇANDO EXTRAÇÃO DE FEATURES DO INQUÉRITO:" )
117
+ print (i ,inquiry )
118
+ vowel_nucleus_not_found = False
119
+ # Ler textgrid de alinhamento fonético
120
+ tg_phone = tg_phones_list [i ] # INQUÉRITO ATUAL # Textgrid gerado pelo ufpalign
121
+ tg_phone = tgt .io .read_textgrid (tg_phone , predict_encoding (tg_phone ), include_empty_intervals = False )
122
+ try :
123
+ phonemes_tier = tg_phone .get_tier_by_name ("fonemeas" )
124
+ except ValueError :
125
+ phonemes_tier = tg_phone .get_tier_by_name ("fonemas" )
126
+ try :
127
+ wordGraphemesTier = tg_phone .get_tier_by_name ("palavras-grafemas" )
128
+ except ValueError :
129
+ wordGraphemesTier = tg_phone .get_tier_by_name ("grafemas" ) # OS TEXTGRIDS DA NOVA BRANCH VIERAM COM NOMES DE TIERS DIFERENTES
130
+ #try: # NAO PRECISO MAIS DESSA VERIFICAÇÃO SE A CAMADA DE SÍLABAS EXISTE
131
+ syllables_tier = tg_phone .get_tier_by_name ("silabas-fonemas" )
132
+ #except ValueError:
133
+ # print("nao tinha camada de silabas, pulando este inquérito")
134
+ # continue
135
+ # ler o textgrid de referência para saber as labels e a posição das fronteiras # INQUÉRITO ATUAL
136
+ tg_reference = tgt .io .read_textgrid (tg_reference_list [i ], predict_encoding (tg_reference_list [i ]), include_empty_intervals = False )
137
+ print (tg_phones_list [i ], tg_reference_list [i ]) # to check if I got the correct files
138
+
139
+ # Load the audio file
140
+ audio = audios_list [i ] # INQUÉRITO ATUAL
141
+ audio_path = audio [3 ]
142
+ print (audio_path )
143
+ y , sr = librosa .load (audio_path , sr = None ) # sr=None ensures native sampling rate
144
+
145
+ # DIVISÃO DO ÁUDIO POR SÍLABAS
146
+ # Convert syllable start and end times to sample indices.
147
+
148
+ syllable_frames = [
149
+ y [int (interval .start_time * sr ):int (interval .end_time * sr )] for interval in syllables_tier
150
+ ]
151
+
152
+ print ("Qtd sílabas:" , len (syllable_frames ))
153
+
154
+ for i , (frame , interval ) in enumerate (zip (syllable_frames , syllables_tier )):
155
+ interval .text = interval .text .replace (" " , "" )
156
+ start_time = round (interval .start_time , 2 )
157
+ end_time = round (interval .end_time , 2 )
158
+ frame_id = f"frame_{ interval .text } _{ start_time } _{ end_time } "
159
+
160
+ if interval .text != "sil" :
161
+ sound = parselmouth .Sound (values = frame , sampling_frequency = sr ) # ORIGINAL SOUND
162
+
163
+ padding_duration = 0.05 # 50ms of silence on each side so pitch extraction will consider edges - if pitch is extracted from the silence padding, it is considered NAN and are supposed to be ignored
164
+ # Generate silent padding
165
+ num_padding_samples = int (sr * padding_duration ) # Convert time to samples
166
+ silent_padding = numpy .zeros (num_padding_samples ) # Silent array
167
+ extended_frame = numpy .concatenate ((silent_padding , frame , silent_padding ))
168
+ extended_sound = parselmouth .Sound (values = extended_frame , sampling_frequency = sr )
169
+
170
+ intensity_attributes = get_intensity_attributes (extended_sound )[0 ]
171
+ pitch_attributes , _ , avg_pitch = get_pitch_attributes (extended_sound ) # CHANGE TO SOUND TO GO BACK TO ORIGINAL
172
+
173
+ # AQUI, COMO AS SÍLABAS TEM TAMANHOS DIFERENTES, ELAS TERÃO NÚMERO DE SAMPLES DIFERENTES, OS QUAIS CORRESPONDEM ÀS COLUNAS E VIRAM NaN, FIZ UM PADDING
174
+ # Find the maximum frame length and pad them to the maximum length
175
+ max_length = max (len (frame ) for frame in syllable_frames )
176
+ syllable_frames = [
177
+ numpy .pad (frame , (0 , max_length - len (frame )), mode = 'constant' ) for frame in syllable_frames
178
+ ]
179
+
180
+ #print("len syllable frames", len(syllable_frames))
181
+ #df_audio_frames = pd.DataFrame(syllable_frames) # version that doesn't work for SE1 - compare some other inquiry to see if it works well too
182
+
183
+ # Create a DataFrame in chunks instead of all at once to solve memory problem
184
+ chunk_size = 400 # Adjust chunk size based on available memory
185
+ df_audio_frames = pd .DataFrame ()
186
+
187
+ for i in range (0 , len (syllable_frames ), chunk_size ):
188
+ print ("i" , i )
189
+ chunk = pd .DataFrame (syllable_frames [i :i + chunk_size ])
190
+ df_audio_frames = pd .concat ([df_audio_frames , chunk ], ignore_index = True )
191
+
192
+ # Fazer merge de todas as tiers de TB
193
+ TB_tiers = [tier for tier in tg_reference .tiers if tier .name .startswith ("TB-" ) and "ponto" not in tier .name ]
194
+
195
+ all_utterances = []
196
+
197
+ for tier in TB_tiers :
198
+ all_utterances .extend (tier .intervals )
199
+ all_utterances .sort (key = lambda interval : interval .start_time )
200
+ # lista de todas as falas ordenadas de acordo com o tempo de início menor
201
+
202
+ # calcular os tipos de todas as vogais
203
+ vowel_types = ["a" ,"e" ,"i" ,"o" ,"u" ,"a~" ,"e~" ,"i~" ,"o~" ,"u~" , "E" ,"O" ]
204
+ vowel_stats_dict = {vowel : {"mean" : None , "std_dev" : None , "durations" : []} for vowel in vowel_types }
205
+
206
+ for phone in phonemes_tier :
207
+ if phone .text in vowel_stats_dict :
208
+ vowel_stats_dict [phone .text ]["durations" ].append (phone .end_time - phone .start_time )
209
+
210
+ for vowel_type in vowel_stats_dict :
211
+ vowel_stats_dict [vowel_type ]["mean" ] = statistics .mean (vowel_stats_dict [vowel_type ]["durations" ])
212
+ vowel_stats_dict [vowel_type ]["std_dev" ] = statistics .stdev (vowel_stats_dict [vowel_type ]["durations" ]) if len (vowel_stats_dict [vowel_type ]["durations" ]) > 1 else 0
213
+
214
+ print ("vowel types mean calculated" , vowel_stats_dict [vowel_type ]["mean" ], vowel_stats_dict [vowel_type ]["std_dev" ])
215
+ utterance_averages = []
216
+ labels = []
217
+ i_utt = 0
218
+
219
+ # Percorrer a camada de sílabas, verificando o tempo de cada sílaba se é menor do que o tempo final da utterance atual. Em caso positivo, declara como "sem fronteira" e passa pra seguinte, em caso negativo declara como "TB" e passa para a fala seguinte e sílaba seguinte.
220
+ for i_syl , syllable in enumerate (syllables_tier ):
221
+ if syllable .text != "sil" :
222
+ if i_syl + 1 >= len (syllables_tier ) or i_utt >= len (all_utterances ) or syllables_tier [i_syl + 1 ].start_time >= round (all_utterances [i_utt ].end_time , 2 ):
223
+ labels .append ("TB" )
224
+ i_utt += 1
225
+ elif syllables_tier [i_syl + 1 ].text == "sil" and i_syl + 2 < len (syllables_tier ) and syllables_tier [i_syl + 2 ].start_time >= round (all_utterances [i_utt ].end_time , 2 ):
226
+ labels .append ("TB" )
227
+ i_utt += 1
228
+ else :
229
+ labels .append ("NB" )
230
+
231
+ # lista de pitch average das utterances de acordo com as camadas TB do textgrid de referência
232
+ for i_utt , utterance in enumerate (all_utterances ):
233
+ utterance_frame = y [int (utterance .start_time * sr ):int (utterance .end_time * sr )]
234
+ sound = parselmouth .Sound (values = utterance_frame , sampling_frequency = sr )
235
+ utt_avg = get_utterance_avg_pitch (sound )
236
+ utterance_averages .append (utt_avg )
237
+
238
+ #print("Lista com as médias de pitch das utterances:", utterance_averages)
239
+ print ("Qtd de médias calculadas (utterances):" , len (utterance_averages ))
240
+ print ("Length of all utterances" , len (all_utterances ))
241
+
242
+ all_syllables_prosodic_features = []
243
+ utterance_counter = 0
244
+ labels_counter = 0
245
+ phones_index = 0
246
+
247
+ for i , (frame , interval ) in enumerate (zip (syllable_frames , syllables_tier )):
248
+ #phones_per_syl = 0
249
+ #if utterance_counter >= len(all_utterances):
250
+ #if i >= len(labels): # NAO SEI SE ESSE IF TEM QUE FICAR NO CÓDIGO OU NAO
251
+ # print("última sílaba", labels[labels_counter-1], i-1, syllables_tier[i-1])
252
+ # break
253
+
254
+ if interval .text != "sil" :
255
+ # Calculate the starting and ending time for the current frame
256
+ interval .text = interval .text .replace (" " , "" )
257
+ start_time = round (interval .start_time , 2 )
258
+ end_time = round (interval .end_time , 2 )
259
+ frame_id = f"frame_{ interval .text } _{ start_time } _{ end_time } "
260
+
261
+ # alinhando a camada de fones com a camada de sílabas - chegando ao primeiro fone da sílaba atual
262
+ while phonemes_tier [phones_index ].start_time < start_time :
263
+ phones_index += 1
264
+ #phones_per_syl += 1
265
+
266
+ # chegando até a vogal núcleo da sílaba
267
+ phones_index_aux = 0
268
+ while (not any (vowel in phonemes_tier [phones_index + phones_index_aux ].text .lower () for vowel in "aeiou" )) and (phonemes_tier [phones_index + phones_index_aux ].end_time <= interval .end_time ): # procurando o núcleo da sílaba
269
+ phones_index_aux += 1
270
+ #phones_per_syl += 1
271
+
272
+ # checar se a vogal núcleo da sílaba foi encontrada ou se o índice já ultrapassou o tempo da sílaba (se já ultrapassou, voltamos para o fone inicial da sílaba e procuramos por j ou w como vogal núcleo)
273
+ if phonemes_tier [phones_index + phones_index_aux ].start_time >= interval .end_time or phonemes_tier [phones_index + phones_index_aux ].text == "sil" :
274
+ phones_index_aux = 0
275
+ while not any (vowel in phonemes_tier [phones_index + phones_index_aux ].text .lower () for vowel in "jw" ) and (phonemes_tier [phones_index + phones_index_aux ].end_time <= interval .end_time ):
276
+ phones_index_aux += 1
277
+
278
+ if phonemes_tier [phones_index + phones_index_aux ].start_time >= interval .end_time or phonemes_tier [phones_index + phones_index_aux ].text == "sil" :
279
+ vogal_nucleo = None
280
+ elif phonemes_tier [phones_index + phones_index_aux ].text == "j" :
281
+ vogal_nucleo = "i"
282
+ elif phonemes_tier [phones_index + phones_index_aux ].text == "w" :
283
+ vogal_nucleo = "u"
284
+
285
+ else :
286
+ vogal_nucleo = phonemes_tier [phones_index + phones_index_aux ].text
287
+
288
+ phones_index += phones_index_aux
289
+ nucleus_start_time = phonemes_tier [phones_index ].start_time
290
+ nucleus_end_time = phonemes_tier [phones_index ].end_time
291
+
292
+ if i + 1 == len (syllables_tier ):
293
+ next_interval_text = "fim"
294
+ next_interval_dur = 0
295
+ else :
296
+ next_interval_text = syllables_tier [i + 1 ].text
297
+ next_interval_text = syllables_tier [i + 1 ].text .replace (" " , "" )
298
+ next_interval_dur = syllables_tier [i + 1 ].end_time - syllables_tier [i + 1 ].start_time
299
+
300
+ # Call the function to extract prosodic features for the current frame
301
+ frame_prosodic_features = extract_prosody (frame , sr , frame_id , utterance_averages [utterance_counter ], next_interval_text , next_interval_dur , interval .start_time , interval .end_time , nucleus_end_time , nucleus_start_time , vogal_nucleo , vowel_stats_dict ) #, phones_per_syl) # mean pitch
302
+ all_syllables_prosodic_features .append (frame_prosodic_features )
303
+
304
+ # fim da utterance atingido
305
+ if labels [labels_counter ] == "TB" :
306
+ utterance_counter += 1
307
+ labels_counter += 1
308
+
309
+ print ("Extracted features from all frames!!" )
310
+
311
+ # Organiza os resultados da análise prosódica e rótulos em uma tabela
312
+ df_prosodic = pd .concat (all_syllables_prosodic_features ).reset_index (drop = True )
313
+
314
+ # adiciona os labels ao dataframe
315
+ df_prosodic ['label' ] = labels
316
+
317
+ print (df_prosodic ) # mostra a tabela
318
+
319
+ # Salva a tabela com as features prosódicas em um csv
320
+
321
+ df_prosodic .to_csv ('ExtractedProsodicFeatures/versao final/' + inquiry [1 ]+ '_prosodic_features.csv' ,index = False )
322
+
323
+ df_prosodic .label .hist () # faz um gráfico por categoria, acho que terei que comentar
0 commit comments