forked from dfdazac/wassdistance
-
Notifications
You must be signed in to change notification settings - Fork 0
/
layers.py
93 lines (79 loc) · 3.4 KB
/
layers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import torch
import torch.nn as nn
# Adapted from https://github.com/gpeyre/SinkhornAutoDiff
class SinkhornDistance(nn.Module):
r"""
Given two empirical measures each with :math:`P_1` locations
:math:`x\in\mathbb{R}^{D_1}` and :math:`P_2` locations :math:`y\in\mathbb{R}^{D_2}`,
outputs an approximation of the regularized OT cost for point clouds.
Args:
eps (float): regularization coefficient
max_iter (int): maximum number of Sinkhorn iterations
reduction (string, optional): Specifies the reduction to apply to the output:
'none' | 'mean' | 'sum'. 'none': no reduction will be applied,
'mean': the sum of the output will be divided by the number of
elements in the output, 'sum': the output will be summed. Default: 'none'
Shape:
- Input: :math:`(N, P_1, D_1)`, :math:`(N, P_2, D_2)`
- Output: :math:`(N)` or :math:`()`, depending on `reduction`
"""
def __init__(self, eps, max_iter, reduction='none'):
super(SinkhornDistance, self).__init__()
self.eps = eps
self.max_iter = max_iter
self.reduction = reduction
def forward(self, x, y):
# The Sinkhorn algorithm takes as input three variables :
C = self._cost_matrix(x, y) # Wasserstein cost function
x_points = x.shape[-2]
y_points = y.shape[-2]
if x.dim() == 2:
batch_size = 1
else:
batch_size = x.shape[0]
# both marginals are fixed with equal weights
mu = torch.empty(batch_size, x_points, dtype=torch.float,
requires_grad=False).fill_(1.0 / x_points).squeeze()
nu = torch.empty(batch_size, y_points, dtype=torch.float,
requires_grad=False).fill_(1.0 / y_points).squeeze()
u = torch.zeros_like(mu)
v = torch.zeros_like(nu)
# To check if algorithm terminates because of threshold
# or max iterations reached
actual_nits = 0
# Stopping criterion
thresh = 1e-1
# Sinkhorn iterations
for i in range(self.max_iter):
u1 = u # useful to check the update
u = self.eps * (torch.log(mu+1e-8) - torch.logsumexp(self.M(C, u, v), dim=-1)) + u
v = self.eps * (torch.log(nu+1e-8) - torch.logsumexp(self.M(C, u, v).transpose(-2, -1), dim=-1)) + v
err = (u - u1).abs().sum(-1).mean()
actual_nits += 1
if err.item() < thresh:
break
U, V = u, v
# Transport plan pi = diag(a)*K*diag(b)
pi = torch.exp(self.M(C, U, V))
# Sinkhorn distance
cost = torch.sum(pi * C, dim=(-2, -1))
if self.reduction == 'mean':
cost = cost.mean()
elif self.reduction == 'sum':
cost = cost.sum()
return cost, pi, C
def M(self, C, u, v):
"Modified cost for logarithmic updates"
"$M_{ij} = (-c_{ij} + u_i + v_j) / \epsilon$"
return (-C + u.unsqueeze(-1) + v.unsqueeze(-2)) / self.eps
@staticmethod
def _cost_matrix(x, y, p=2):
"Returns the matrix of $|x_i-y_j|^p$."
x_col = x.unsqueeze(-2)
y_lin = y.unsqueeze(-3)
C = torch.sum((torch.abs(x_col - y_lin)) ** p, -1)
return C
@staticmethod
def ave(u, u1, tau):
"Barycenter subroutine, used by kinetic acceleration through extrapolation."
return tau * u + (1 - tau) * u1