forked from JamesRitchie/scikit-rvm
-
Notifications
You must be signed in to change notification settings - Fork 10
/
rvm.py
316 lines (237 loc) · 10.7 KB
/
rvm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
"""Relevance Vector Machine classes for regression and classification.
Tom Bolton
27/02/2019
Adapted from JamesRitchie/scikit-rvm repository. The main differences between
original code and this implementation are the following:
- Only regression is considered (classification is ignored).
- Kernel transformations are removed, such that each column/feature in X is
assumed to be a basis function.
- A list of basis function labels is supplied to the model, and after fitting
the sparse representation of the basis functions is printed.
- When verbose is switched on, the model communicates which basis functions
are being pruned from the data.
Without the application of a kernel, this model solves the following
(linear) Bayesian regression problem
y = w.phi
where an individual prior is assigned to each regression weight in w. The
hyperparameters of said hyperpriors are found through type-II maximum
likelihood (i.e. the evidence approximation). The type-II maximum likelihood
calculation is conducted iteratively.
"""
import numpy as np
from scipy.optimize import minimize
from scipy.special import expit
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_X_y
from sklearn.metrics import r2_score
class BaseRVM(BaseEstimator):
"""Base Relevance Vector Machine class.
Implementation of Mike Tipping's Relevance Vector Machine using the
scikit-learn API. Add a posterior over weights method and a predict
in subclass to use for classification or regression.
"""
def __init__(
self,
n_iter=3000,
tol=1e-3,
alpha=1e-6,
threshold_alpha=1e9,
beta=1.e-6,
beta_fixed=False,
bias_used=True,
verbose=True,
verb_freq=10,
standardise=False
):
"""Copy params to object properties, no validation."""
self.n_iter = n_iter
self.tol = tol
self.alpha = alpha
self.threshold_alpha = threshold_alpha
self.beta = beta
self.beta_fixed = beta_fixed
self.bias_used = bias_used
self.verbose = verbose
self.verb_freq = verb_freq
self.standardise = standardise
def get_params(self, deep=True):
"""Return parameters as a dictionary."""
params = {
'n_iter': self.n_iter,
'tol': self.tol,
'alpha': self.alpha,
'threshold_alpha': self.threshold_alpha,
'beta': self.beta,
'beta_fixed': self.beta_fixed,
'bias_used': self.bias_used,
'verbose': self.verbose,
'verbose_frequency': self.verb_freq,
'standardise': self.standardise
}
return params
def set_params(self, **parameters):
"""Set parameters using kwargs."""
for parameter, value in parameters.items():
setattr(self, parameter, value)
return self
def _prune(self):
"""Remove basis functions based on alpha values.
As alpha becomes very large, the corresponding
weight of the basis function tends to zero.
Only keep basis functions which have an alpha
value less than the threshold.
"""
keep_alpha = self.alpha_ < self.threshold_alpha
# check if we are chucking away all basis functions
if not np.any(keep_alpha):
keep_alpha[0] = True
self.labels = self.labels[ keep_alpha ]
# prune all parameters to new set of basis functions
self.alpha_ = self.alpha_[ keep_alpha ]
self.alpha_old = self.alpha_old[ keep_alpha ]
self.gamma = self.gamma[ keep_alpha ]
self.phi = self.phi[ :, keep_alpha ]
self.sigma_ = self.sigma_[ np.ix_( keep_alpha, keep_alpha ) ]
self.m_ = self.m_[ keep_alpha ]
if self.standardise :
self.mu_x = self.mu_x[ keep_alpha ]
self.si_x = self.si_x[ keep_alpha ]
def fit(self, X, y, X_labels, standardise=False ):
"""Fit the RVR to the training data.
X: 2D array where each row is an observation and
each column represents a basis function.
y: 1D array of targets
X_labels: A list of strings providing a description
of each basis function e.g. d2u/dx2.
"""
self.standardise = standardise
X, y = check_X_y(X, y)
n_samples, n_basis_functions = X.shape
self.phi = np.copy(X) # phi = feature values
self.t = np.copy(y) # t = target values
self.labels = np.array( X_labels )
self.alpha_ = self.alpha * np.ones(n_basis_functions)
self.beta_ = self.beta
self.m_ = np.zeros(n_basis_functions)
self.alpha_old = self.alpha_
# rescale features and targets to zero mean
# and unit variance
if standardise :
self.mu_x = np.mean( self.phi, axis=0 )
self.si_x = np.std( self.phi, axis=0 )
self.phi -= np.tile( self.mu_x, X.shape[0] ).reshape( X.shape )
self.phi /= np.tile( self.si_x, X.shape[0] ).reshape( X.shape )
self.mu_y = np.mean( self.t )
self.si_y = np.std( self.t )
self.t -= np.tile( self.mu_y, len(y) )
self.t /= np.tile( self.si_y, len(y) )
if self.verbose :
print "Fit: beginning with {} candidate terms".format( n_basis_functions )
print "Fit: {}\n".format( X_labels )
# begin iterative type-2 maximum likelihood
# estimation for hyperpriors
for i in range(self.n_iter):
self._posterior()
self.gamma = 1 - self.alpha_ * np.diag( self.sigma_ )
self.alpha_ = self.gamma / ( self.m_ ** 2 )
if not self.beta_fixed:
self.beta_ = ( n_samples - np.sum( self.gamma ) ) / (
np.sum( ( self.t - np.dot( self.phi, self.m_ ) ) ** 2 ) )
self._prune()
# print results of this iteration
if self.verbose and ( (i+1) % self.verb_freq == 0 ):
print "Fit @ iteration {}:".format(i)
print "--Alpha {}".format( self.alpha_ )
print "--Beta {}".format( self.beta_ )
print "--Gamma {}".format( self.gamma )
print "--m {}".format( self.m_ )
print "--# of relevance vectors {} / {}".format( self.phi.shape[1], n_basis_functions )
print "--Remaining candidate terms {}\n".format( list( self.labels ) )
# check if threshold has been reached
delta = np.amax( np.absolute( self.alpha_ - self.alpha_old ) )
if delta < self.tol and i > 1 :
print "Fit: delta= {}".format( delta )
print "Fit: delta < tol @ iteration {}, finished.".format(i)
print "Fit: {} out of {} candidate terms.".format( len(self.m_), n_basis_functions )
if standardise :
print "Fit: rescaled weights (+ un-scaled weights) of each term:"
for n, label in enumerate( self.labels ) :
print "{} : {} ({})".format( label, self.m_[n] * self.si_y / self.si_x[n], self.m_[n] )
else :
print "Fit: regression weights of each term:"
for n, label in enumerate( self.labels ) :
print "{} : {}".format( label, self.m_[n] )
# make vector indicating which basis functions remain
self.retained_ = np.isin( np.array( X_labels ), self.labels )
self.n_retained = np.sum( self.retained_ )
break
self.alpha_old = self.alpha_
return self
class RVR(BaseRVM, RegressorMixin):
"""Relevance Vector Machine Regression.
Implementation of Mike Tipping's Relevance Vector Machine for regression
using the scikit-learn API.
"""
def _posterior(self):
"""Compute the posterior distriubtion over weights."""
i_s = np.diag( self.alpha_ ) + self.beta_ * np.dot( self.phi.T, self.phi )
self.sigma_ = np.linalg.inv(i_s)
self.m_ = self.beta_ * np.dot( self.sigma_, np.dot( self.phi.T, self.t ) )
def post_pred_var(self, X ):
"""Evaluate the posterior predictive variance
of the RVR model for feature data X.
For large X arrays, the matrix multiplication
below can produce memory errors. Therefore
compute the variance individually.
The return var is a 1D vector of length equal
to the number of data points in X.
"""
# remove basis functions/features that were
# pruned during training from X
phi = X[:,self.retained_]
N_points, N_feat = X.shape
var = np.zeros( (N_points,) )
for i in range( N_points ) :
var[i] = ( 1/self.beta_ ) + np.dot( phi[i,:], np.dot( self.sigma_, phi[i,:].T ) )
return var
def predict(self, X) :
"""
Make predictions for y using X.
"""
# remove basis functions/features that were
# pruned during training from X
phi = X[:,self.retained_]
if self.standardise :
y_pred = np.dot( phi, self.m_ * self.si_y / self.si_x )
else :
y_pred = np.dot( phi, self.m_ )
return y_pred
def score_MSE(self, X_test, y_test ):
""" Calculate mean-squared error (MSE)
between the true values of y and the
predicted values using X_test
"""
# remove basis functions/features that were
# pruned during training from X
phi = X_test[:,self.retained_]
if self.standardise :
y_pred = np.dot( phi, self.m_ * self.si_y / self.si_x )
else :
y_pred = np.dot( phi, self.m_ )
MSE = np.mean( np.square( y_test - y_pred ) )
return MSE
def score_R2(self, X_test, y_test ):
""" Calculate coefficient of determination
between the true values of y and the
predicted values using X_test
"""
# remove basis functions/features that were
# pruned during training from X
phi = X_test[:,self.retained_]
if self.standardise :
y_pred = np.dot( phi, self.m_ * self.si_y / self.si_x )
else :
y_pred = np.dot( phi, self.m_ )
R2 = r2_score( y_test, y_pred )
return R2