-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
224 lines (186 loc) · 7.36 KB
/
utils.py
File metadata and controls
224 lines (186 loc) · 7.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""
MIT License
Copyright (c) 2021 OpenAI
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from torch import nn
import torch.nn.functional as F
from scipy.ndimage import gaussian_filter1d
from scipy.signal.windows import gaussian
import logger
import torch
import logger
def get_index_from_list(vals, t, x_shape):
"""
Returns a specific index t of a passed list of values vals
while considering the batch dimension.
"""
batch_size = t.shape[0]
out = vals.gather(-1, t.cpu())
return out.reshape(batch_size, *((1,) * (len(x_shape) - 1))).to(t.device)
def mean_flat(tensor):
"""
Take the mean over all non-batch dimensions.
"""
return tensor.mean(dim=list(range(1, len(tensor.shape))))
def mse_loss(x_hat, x):
"""
Function that computes the MSE loss between two tensors.
Args:
x_hat (torch.Tensor): predicted tensor of shape (batch_size, coords, length), e.g:
(64,3,2048).
"""
# compute the squared error without reduction
se = (x-x_hat) ** 2
# average over the channels and length
mse = mean_flat(se)
# average over the batches
return mse.mean()
def avg_pool_nd(dims, *args, **kwargs):
"""
Create a 1D, 2D, or 3D average pooling module.
"""
if dims == 1:
return nn.AvgPool1d(*args, **kwargs)
elif dims == 2:
return nn.AvgPool2d(*args, **kwargs)
elif dims == 3:
return nn.AvgPool3d(*args, **kwargs)
raise ValueError(f"unsupported dimensions: {dims}")
def max_pool_nd(dims, *args, **kwargs):
"""
Create a 1D, 2D, or 3D max pooling module.
"""
if dims == 1:
return nn.MaxPool1d(*args, **kwargs)
elif dims == 2:
return nn.MaxPool2d(*args, **kwargs)
elif dims == 3:
return nn.MaxPool3d(*args, **kwargs)
raise ValueError(f"unsupported dimensions: {dims}")
def interpolate_nscales(sample, scales=2, method="nearest", smoother = None, to_numpy=False):
'''
Function that applies a pyramidal interpolation to a batch of samples. A pyramidal interpolation
is a sequence of interpolations with a scale factor of 1/2, 1/4, 1/8 and so on.
The interpolation method can be chosen among the ones available in PyTorch.
Args:
sample (torch.Tensor): a tensor of shape (batch_size, coords, length).
scales (int): number of scaling factors to apply to the interpolation.
method (str): interpolation method. Default is 'nearest-exact'.
to_numpy (bool): if True, the output will be a dictionary with numpy arrays, otherwise they will
be Tensors. Default is False.
Returns:
pyramidal_sample (dict): a dictionary with keys = levels and values = interpolated samples
'''
# sample has to be a Tensor of shape (batch_size, coords, length)
# e.g for one trajectory, Tensor should be (1, 3, 2000)
logger.log(sample.shape)
pyramidal_sample = {0: sample}
if smoother is not None:
for i in range(1, scales):
scale = 1/2**i
y = F.interpolate(smoother(sample, i), scale_factor=scale, mode=method)
if(to_numpy):
# we have to permute the dimensions to plot them
# delete the batch dimension
# and trasform to numpy
pyramidal_sample[i] = y.squeeze(0).permute(1,0).numpy()
else:
pyramidal_sample[i] = y
else:
for i in range(1, scales):
scale = 1/2**i
y = F.interpolate(sample, scale_factor=scale, mode=method)
if(to_numpy):
# we have to permute the dimensions to plot them
# delete the batch dimension
# and trasform to numpy
pyramidal_sample[i] = y.squeeze(0).permute(1,0).numpy()
else:
pyramidal_sample[i] = y
return pyramidal_sample
#%%
class GaussianSmoother(nn.Module):
def __init__(self, levels):
super(GaussianSmoother, self).__init__()
self.stds = [2, 8, 32, 128, 1, 4, 64]
self.levels = levels
windows = []
for i in range(1, levels):
std = self.stds[i-1]
window = torch.Tensor(gaussian(4 * std + 1, std))
window /= torch.sum(window)
window = window.unsqueeze(0)
window = window.unsqueeze(0)
windows.append(window)
self.windows = windows
def forward(self, input, level):
"""
Apply gaussian filter to input.
Arguments:
input (torch.Tensor): Input to apply gaussian filter on.
Returns:
filtered (torch.Tensor): Filtered output.
"""
if level == 0:
return input
return F.conv1d(input, self.windows[level - 1], padding = "same")
#%%
def fourier_nscales(sample, scales = 10, smoother = None, to_numpy = False):
if smoother is not None:
pyramidal_sample = {}
for i in range(scales):
pyramidal_sample[i] = smoother(sample, i)
return pyramidal_sample
else:
if torch.is_tensor(sample):
pyramidal_sample = {0: sample}
else:
pyramidal_sample = {0: torch.Tensor(sample)}
stds = [1, 2, 4, 16, 32, 64, 128, 256, 512]
for i in range(1, scales):
std = stds[i-1]
if torch.is_tensor(sample):
y = sample.numpy()
else:
y = sample
y = gaussian_filter1d(y, std, mode = "constant", cval = 0.0)
if to_numpy:
pyramidal_sample[i] = y
else:
pyramidal_sample[i] = torch.Tensor(y)
return pyramidal_sample
def _nested_map(struct, map_fn):
'''
With map_fn = lambda x: x.to(device) if isinstance(x, torch.Tensor) else x
this function will dive into an structure until it finds a tensor, and then
send it to a device.
Example:
if struct is a dict like:
x = {"audio": Tensor(64,22000),
"spectrogram": Tensor(64,1024,128)}
then the result is
x = {"audio": Tensor(64,22000).to(device),
"spectrogram": Tensor(64,1024,128).to(device)}
'''
if isinstance(struct, tuple):
return tuple(_nested_map(x, map_fn) for x in struct)
if isinstance(struct, list):
return [_nested_map(x, map_fn) for x in struct]
if isinstance(struct, dict):
return { k: _nested_map(v, map_fn) for k, v in struct.items() }
return map_fn(struct)