-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathloader.py
289 lines (240 loc) · 9.72 KB
/
loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
'''
loader.py
* Image and audio preprocessing
* Data set class
* Data loader
'''
import os
import re
import torch
import random
import pathlib
import torchaudio
import numpy as np
import glob as glob
from PIL import Image
from torch.utils.data import DataLoader
from pystct import sdct_torch, isdct_torch
from torch_stft import STFT
import matplotlib.pyplot as plt
MY_FOLDER = os.environ.get('USER_PATH')
DATA_FOLDER = os.environ.get('DATA_PATH')
AUDIO_FOLDER = f"{DATA_FOLDER}/FSDnoisy/FSDnoisy18k.audio_"
IMAGE_FOLDER = f'{DATA_FOLDER}/imagenet'
class ImageProcessor():
"""
Function to preprocess the images from the custom
dataset. It includes a series of transformations:
- At __init__ we convert the image to the desired [colorspace].
- Crop function crops the image to the desired [proportion].
- Scale scales the images to desired size [n]x[n].
- Normalize performs the normalization of the channels.
"""
def __init__(self, image_path, colorspace='RGB'):
self.image = Image.open(image_path).convert(colorspace)
def crop(self, proportion = 2 ** 6):
nx, ny = self.image.size
n = min(nx, ny)
left = top = n / proportion
right = bottom = (proportion - 1) * n / proportion
self.image = self.image.crop((left, top, right, bottom))
def scale(self, n = 256):
self.image = self.image.resize((n, n), Image.ANTIALIAS)
def normalize(self):
self.image = np.array(self.image).astype('float') / 255.0
def forward(self):
self.crop()
self.scale()
self.normalize()
return self.image
class AudioProcessor():
"""
Function to preprocess the audios from the custom
dataset. We set the [_limit] in terms of samples,
the [_frame_length] and [_frame_step] of the [transform]
transform.
If transform is [cosine] it returns just the STDCT matrix.
Else, if transform is [fourier] returns the STFT magnitude
and phase.
"""
def __init__(self, transform, stft_small=True, random_init=True):
# Corresponds to 1.5 seconds approximately
self._limit = 67522 # 2 ** 16 + 2 ** 11 - 2 ** 6 + 2
if transform == 'cosine':
self._frame_length = 2 ** 10
self._frame_step = 2 ** 7 + 2
else:
if stft_small:
self._frame_length = 2 ** 11 - 1
self._frame_step = 2 ** 7 + 4
else:
self._frame_length = 2 ** 12 - 1
self._frame_step = 2 ** 6 + 2
self.random_init = random_init
self._transform = transform
if self._transform == 'fourier':
self.stft = STFT(
filter_length=self._frame_length,
hop_length=self._frame_step,
win_length=self._frame_length,
window='hann'
)
def forward(self, audio_path):
self.sound, self.sr = torchaudio.load(audio_path)
# Get the samples dimension
sound = self.sound[0]
# Create a temporary array
tmp = torch.zeros([self._limit, ])
# Check if the audio is shorter than the limit
if sound.numel() < self._limit:
# Zero-pad at the end, or randomly at both start and end
if self.random_init:
i = random.randint(0, self._limit - len(sound))
tmp[i:i+sound.numel()] = sound[:]
else:
tmp[:sound.numel()] = sound[:]
else:
# Use only part of the audio. Either start at beginning or random
if self.random_init:
i = random.randint(0, len(sound) - self._limit)
else:
i = 0
tmp[:] = sound[i:i + self._limit]
if self._transform == 'cosine':
return sdct_torch(
tmp.type(torch.float32),
frame_length = self._frame_length,
frame_step = self._frame_step
)
elif self._transform == 'fourier':
magnitude, phase = self.stft.transform(tmp.unsqueeze(0).type(torch.float32))
return magnitude, phase
else: raise Exception(f'Transform not implemented')
class StegoDataset(torch.utils.data.Dataset):
"""
Custom datasets pairing images with spectrograms.
- [image_root] defines the path to read the images from.
- [audio_root] defines the path to read the audio clips from.
- [folder] can be either [train] or [test].
- [mappings] is the dictionary containing a descriptive name for
the images from ImageNet. It is used to index the different
subdirectories.
- [rgb] is a boolean that indicated whether we are using color (RGB)
images or black and white ones (B&W).
- [transform] defines the transform to use to process audios. Can be
either [cosine] or [fourier].
- [image_extension] defines the extension of the image files.
By default it is set to JPEG.
- [audio_extension] defines the extension of the audio files.
By default it is set to WAV.
"""
def __init__(
self,
image_root: str,
audio_root: str,
folder: str,
mappings: dict,
rgb: bool = True,
transform: str = 'cosine',
stft_small: bool = True,
image_extension: str = "JPEG",
audio_extension: str = "wav"
):
# self._image_data_path = pathlib.Path(image_root) / folder
self._image_data_path = pathlib.Path(image_root) / 'train'
self._audio_data_path = pathlib.Path(f'{audio_root}{folder}')
self._MAX_LIMIT = 10000 if folder == 'train' else 900
self._TOTAL = 10000
self._MAX_AUDIO_LIMIT = 17584 if folder == 'train' else 946
self._colorspace = 'RGB' if rgb else 'L'
self._transform = transform
self._stft_small = stft_small
print(f'IMAGE DATA LOCATED AT: {self._image_data_path}')
print(f'AUDIO DATA LOCATED AT: {self._audio_data_path}')
self.image_extension = image_extension
self.audio_extension = audio_extension
self._index = 0
self._indices = []
self._audios = []
#IMAGE PATH RETRIEVING
test_i, test_j = 0, 0
#keys are n90923u23
if (folder == 'train'):
for key in mappings.keys():
for j, img in enumerate(glob.glob(f'{self._image_data_path}/{key}/*.{self.image_extension}')):
if j >= 10: break
self._indices.append((key, re.search(r'(?<=_)\d+', img).group()))
self._index += 1
if self._index == self._MAX_LIMIT: break
if self._index == self._MAX_LIMIT: break
elif (folder == "test"):
for key in mappings.keys():
for img in glob.glob(f'{self._image_data_path}/{key}/*.{self.image_extension}'):
if test_i > self._TOTAL:
if test_j >= 10:
test_j = 0
break
self._indices.append((key, re.search(r'(?<=_)\d+', img).group()))
self._index += 1
test_j += 1
test_i += 1
if self._index == self._MAX_LIMIT: break
if self._index == self._MAX_LIMIT: break
#AUDIO PATH RETRIEVING (here the paths for test and train are different)
self._index_aud = 0
for audio_path in glob.glob(f'{self._audio_data_path}/*.{self.audio_extension}'):
self._audios.append(audio_path)
self._index_aud += 1
if (self._index_aud == self._MAX_AUDIO_LIMIT): break
self._AUDIO_PROCESSOR = AudioProcessor(transform=self._transform, stft_small=self._stft_small)
print('Set up done')
def __len__(self):
return self._index
def __getitem__(self, index):
key = self._indices[index][0]
indexer = self._indices[index][1]
rand_indexer = random.randint(0, self._MAX_AUDIO_LIMIT - 1)
img_path = glob.glob(f'{self._image_data_path}/{key}/{key}_{indexer}.{self.image_extension}')[0]
audio_path = self._audios[rand_indexer]
img = np.asarray(ImageProcessor(image_path=img_path, colorspace=self._colorspace).forward()).astype('float64')
if self._transform == 'cosine':
sound_stct = self._AUDIO_PROCESSOR.forward(audio_path)
return (img, sound_stct)
elif self._transform == 'fourier':
magnitude_stft, phase_stft = self._AUDIO_PROCESSOR.forward(audio_path)
return (img, magnitude_stft, phase_stft)
else: raise Exception(f'Transform not implemented')
def loader(set='train', rgb=True, transform='cosine', stft_small=True, batch_size=1, shuffle=False):
"""
Prepares the custom dataloader.
- [set] defines the set type. Can be either [train] or [test].
- [rgb] is a boolean that indicated whether we are using color (RGB)
images or black and white ones (B&W).
- [transform] defines the transform to use to process audios. Can be
either [cosine] or [fourier].
"""
print('Preparing dataset...')
mappings = {}
with open(f'{IMAGE_FOLDER}/mappings.txt') as f:
for line in f:
words = line.split()
mappings[words[0]] = words[1]
dataset = StegoDataset(
image_root=f'{IMAGE_FOLDER}/ILSVRC/Data/CLS-LOC',
audio_root=AUDIO_FOLDER,
folder=set,
mappings=mappings,
rgb=rgb,
transform=transform,
stft_small=stft_small
)
print('Dataset prepared.')
dataloader = torch.utils.data.DataLoader(
dataset,
batch_size=batch_size,
num_workers=4,
pin_memory=True,
shuffle=shuffle
)
print('Data loaded ++')
return dataloader