# Copyright (c) 2017 the GPy Austhors (see AUTHORS.txt)
# Licensed under the BSD 3-clause license (see LICENSE.txt)
from ..core import Model
from ..core.parameterization import Param
from ..core import Mapping
from ..kern import Kern, RBF
from ..inference.latent_function_inference import ExactStudentTInference
from ..util.normalizer import Standardize
import numpy as np
from scipy import stats
from paramz import ObsAr
from paramz.transformations import Logexp
import warnings
[docs]class TPRegression(Model):
"""
Student-t Process model for regression, as presented in
Shah, A., Wilson, A. and Ghahramani, Z., 2014, April. Student-t processes as alternatives to Gaussian processes.
In Artificial Intelligence and Statistics (pp. 877-885).
:param X: input observations
:param Y: observed values
:param kernel: a GPy kernel, defaults to rbf
:param deg_free: initial value for the degrees of freedom hyperparameter
:param Norm normalizer: [False]
Normalize Y with the norm given.
If normalizer is False, no normalization will be done
If it is None, we use GaussianNorm(alization)
.. Note:: Multiple independent outputs are allowed using columns of Y
"""
def __init__(self, X, Y, kernel=None, deg_free=5., normalizer=None, mean_function=None, name='TP regression'):
super(TPRegression, self).__init__(name=name)
# X
assert X.ndim == 2
self.set_X(X)
self.num_data, self.input_dim = self.X.shape
# Y
assert Y.ndim == 2
if normalizer is True:
self.normalizer = Standardize()
elif normalizer is False:
self.normalizer = None
else:
self.normalizer = normalizer
self.set_Y(Y)
if Y.shape[0] != self.num_data:
# There can be cases where we want inputs than outputs, for example if we have multiple latent
# function values
warnings.warn("There are more rows in your input data X, \
than in your output data Y, be VERY sure this is what you want")
self.output_dim = self.Y.shape[1]
# Kernel
kernel = kernel or RBF(self.X.shape[1])
assert isinstance(kernel, Kern)
self.kern = kernel
self.link_parameter(self.kern)
if self.kern._effective_input_dim != self.X.shape[1]:
warnings.warn(
"Your kernel has a different input dimension {} then the given X dimension {}. Be very sure this is "
"what you want and you have not forgotten to set the right input dimenion in your kernel".format(
self.kern._effective_input_dim, self.X.shape[1]))
# Mean function
self.mean_function = mean_function
if mean_function is not None:
assert isinstance(self.mean_function, Mapping)
assert mean_function.input_dim == self.input_dim
assert mean_function.output_dim == self.output_dim
self.link_parameter(mean_function)
# Degrees of freedom
self.nu = Param('deg_free', float(deg_free), Logexp())
self.link_parameter(self.nu)
# Inference
self.inference_method = ExactStudentTInference()
self.posterior = None
self._log_marginal_likelihood = None
# Insert property for plotting (not used)
self.Y_metadata = None
def _update_posterior_dof(self, dof, which):
if self.posterior is not None:
self.posterior.nu = dof
@property
def _predictive_variable(self):
return self.X
[docs] def set_XY(self, X, Y):
"""
Set the input / output data of the model
This is useful if we wish to change our existing data but maintain the same model
:param X: input observations
:type X: np.ndarray
:param Y: output observations
:type Y: np.ndarray or ObsAr
"""
self.update_model(False)
self.set_Y(Y)
self.set_X(X)
self.update_model(True)
[docs] def set_X(self, X):
"""
Set the input data of the model
:param X: input observations
:type X: np.ndarray
"""
assert isinstance(X, np.ndarray)
state = self.update_model()
self.update_model(False)
self.X = ObsAr(X)
self.update_model(state)
[docs] def set_Y(self, Y):
"""
Set the output data of the model
:param Y: output observations
:type Y: np.ndarray or ObsArray
"""
assert isinstance(Y, (np.ndarray, ObsAr))
state = self.update_model()
self.update_model(False)
if self.normalizer is not None:
self.normalizer.scale_by(Y)
self.Y_normalized = ObsAr(self.normalizer.normalize(Y))
self.Y = Y
else:
self.Y = ObsAr(Y) if isinstance(Y, np.ndarray) else Y
self.Y_normalized = self.Y
self.update_model(state)
[docs] def parameters_changed(self):
"""
Method that is called upon any changes to :class:`~GPy.core.parameterization.param.Param` variables within the model.
In particular in this class this method re-performs inference, recalculating the posterior, log marginal likelihood and gradients of the model
.. warning::
This method is not designed to be called manually, the framework is set up to automatically call this method upon changes to parameters, if you call
this method yourself, there may be unexpected consequences.
"""
self.posterior, self._log_marginal_likelihood, grad_dict = self.inference_method.inference(self.kern,
self.X,
self.Y_normalized,
self.nu + 2 + np.finfo(
float).eps,
self.mean_function)
self.kern.update_gradients_full(grad_dict['dL_dK'], self.X)
if self.mean_function is not None:
self.mean_function.update_gradients(grad_dict['dL_dm'], self.X)
self.nu.gradient = grad_dict['dL_dnu']
[docs] def log_likelihood(self):
"""
The log marginal likelihood of the model, :math:`p(\mathbf{y})`, this is the objective function of the model being optimised
"""
return self._log_marginal_likelihood or self.inference()[1]
def _raw_predict(self, Xnew, full_cov=False, kern=None):
"""
For making predictions, does not account for normalization or likelihood
full_cov is a boolean which defines whether the full covariance matrix
of the prediction is computed. If full_cov is False (default), only the
diagonal of the covariance is returned.
.. math::
p(f*|X*, X, Y) = \int^{\inf}_{\inf} p(f*|f,X*)p(f|X,Y) df
= MVN\left(\nu + N,f*| K_{x*x}(K_{xx})^{-1}Y,
\frac{\nu + \beta - 2}{\nu + N - 2}K_{x*x*} - K_{xx*}(K_{xx})^{-1}K_{xx*}\right)
\nu := \texttt{Degrees of freedom}
"""
mu, var = self.posterior._raw_predict(kern=self.kern if kern is None else kern, Xnew=Xnew,
pred_var=self._predictive_variable, full_cov=full_cov)
if self.mean_function is not None:
mu += self.mean_function.f(Xnew)
return mu, var
[docs] def predict(self, Xnew, full_cov=False, kern=None, **kwargs):
"""
Predict the function(s) at the new point(s) Xnew. For Student-t processes, this method is equivalent to
predict_noiseless as no likelihood is included in the model.
"""
return self.predict_noiseless(Xnew, full_cov=full_cov, kern=kern)
[docs] def predict_noiseless(self, Xnew, full_cov=False, kern=None):
"""
Predict the underlying function f at the new point(s) Xnew.
:param Xnew: The points at which to make a prediction
:type Xnew: np.ndarray (Nnew x self.input_dim)
:param full_cov: whether to return the full covariance matrix, or just the diagonal
:type full_cov: bool
:param kern: The kernel to use for prediction (defaults to the model kern).
:returns: (mean, var):
mean: posterior mean, a Numpy array, Nnew x self.input_dim
var: posterior variance, a Numpy array, Nnew x 1 if full_cov=False, Nnew x Nnew otherwise
If full_cov and self.input_dim > 1, the return shape of var is Nnew x Nnew x self.input_dim.
If self.input_dim == 1, the return shape is Nnew x Nnew.
This is to allow for different normalizations of the output dimensions.
"""
# Predict the latent function values
mu, var = self._raw_predict(Xnew, full_cov=full_cov, kern=kern)
# Un-apply normalization
if self.normalizer is not None:
mu, var = self.normalizer.inverse_mean(mu), self.normalizer.inverse_variance(var)
return mu, var
[docs] def predict_quantiles(self, X, quantiles=(2.5, 97.5), kern=None, **kwargs):
"""
Get the predictive quantiles around the prediction at X
:param X: The points at which to make a prediction
:type X: np.ndarray (Xnew x self.input_dim)
:param quantiles: tuple of quantiles, default is (2.5, 97.5) which is the 95% interval
:type quantiles: tuple
:param kern: optional kernel to use for prediction
:type predict_kw: dict
:returns: list of quantiles for each X and predictive quantiles for interval combination
:rtype: [np.ndarray (Xnew x self.output_dim), np.ndarray (Xnew x self.output_dim)]
"""
mu, var = self._raw_predict(X, full_cov=False, kern=kern)
quantiles = [stats.t.ppf(q / 100., self.nu + 2 + self.num_data) * np.sqrt(var) + mu for q in quantiles]
if self.normalizer is not None:
quantiles = [self.normalizer.inverse_mean(q) for q in quantiles]
return quantiles
[docs] def posterior_samples(self, X, size=10, full_cov=False, Y_metadata=None, likelihood=None, **predict_kwargs):
"""
Samples the posterior GP at the points X, equivalent to posterior_samples_f due to the absence of a likelihood.
"""
return self.posterior_samples_f(X, size, full_cov=full_cov, **predict_kwargs)
[docs] def posterior_samples_f(self, X, size=10, full_cov=True, **predict_kwargs):
"""
Samples the posterior TP at the points X.
:param X: The points at which to take the samples.
:type X: np.ndarray (Nnew x self.input_dim)
:param size: the number of a posteriori samples.
:type size: int.
:param full_cov: whether to return the full covariance matrix, or just the diagonal.
:type full_cov: bool.
:returns: fsim: set of simulations
:rtype: np.ndarray (D x N x samples) (if D==1 we flatten out the first dimension)
"""
mu, var = self._raw_predict(X, full_cov=full_cov, **predict_kwargs)
if self.normalizer is not None:
mu, var = self.normalizer.inverse_mean(mu), self.normalizer.inverse_variance(var)
def sim_one_dim(m, v):
nu = self.nu + 2 + self.num_data
v = np.diag(v.flatten()) if not full_cov else v
Z = np.random.multivariate_normal(np.zeros(X.shape[0]), v, size).T
g = np.tile(np.random.gamma(nu / 2., 2. / nu, size), (X.shape[0], 1))
return m + Z / np.sqrt(g)
if self.output_dim == 1:
return sim_one_dim(mu, var)
else:
fsim = np.empty((self.output_dim, self.num_data, size))
for d in range(self.output_dim):
if full_cov and var.ndim == 3:
fsim[d] = sim_one_dim(mu[:, d], var[:, :, d])
elif (not full_cov) and var.ndim == 2:
fsim[d] = sim_one_dim(mu[:, d], var[:, d])
else:
fsim[d] = sim_one_dim(mu[:, d], var)
return fsim