# Copyright (c) 2012-2014, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from .sparse_gp import SparseGP
from numpy.linalg.linalg import LinAlgError
from ..inference.latent_function_inference.var_dtc_parallel import update_gradients, VarDTC_minibatch
import logging
logger = logging.getLogger("sparse gp mpi")
[docs]class SparseGP_MPI(SparseGP):
"""
A general purpose Sparse GP model with MPI parallelization support
This model allows (approximate) inference using variational DTC or FITC
(Gaussian likelihoods) as well as non-conjugate sparse methods based on
these.
:param X: inputs
:type X: np.ndarray (num_data x input_dim)
:param likelihood: a likelihood instance, containing the observed data
:type likelihood: GPy.likelihood.(Gaussian | EP | Laplace)
:param kernel: the kernel (covariance function). See link kernels
:type kernel: a GPy.kern.kern instance
:param X_variance: The uncertainty in the measurements of X (Gaussian variance)
:type X_variance: np.ndarray (num_data x input_dim) | None
:param Z: inducing inputs
:type Z: np.ndarray (num_inducing x input_dim)
:param num_inducing: Number of inducing points (optional, default 10. Ignored if Z is not None)
:type num_inducing: int
:param mpi_comm: The communication group of MPI, e.g. mpi4py.MPI.COMM_WORLD
:type mpi_comm: mpi4py.MPI.Intracomm
"""
def __init__(self, X, Y, Z, kernel, likelihood, variational_prior=None,
mean_function=None, inference_method=None, name='sparse gp',
Y_metadata=None, mpi_comm=None, normalizer=False):
self._IN_OPTIMIZATION_ = False
if mpi_comm != None:
if inference_method is None:
inference_method = VarDTC_minibatch(mpi_comm=mpi_comm)
else:
assert isinstance(inference_method, VarDTC_minibatch), 'inference_method has to support MPI!'
super(SparseGP_MPI, self).__init__(X, Y, Z, kernel, likelihood, inference_method=inference_method, mean_function=mean_function, name=name, Y_metadata=Y_metadata, normalizer=normalizer)
self.update_model(False)
if variational_prior is not None:
self.link_parameter(variational_prior)
self.mpi_comm = mpi_comm
# Manage the data (Y) division
if mpi_comm != None:
from ..util.parallel import divide_data
N_start, N_end, N_list = divide_data(Y.shape[0], mpi_comm.rank, mpi_comm.size)
self.N_range = (N_start, N_end)
self.N_list = np.array(N_list)
self.Y_local = self.Y[N_start:N_end]
print('MPI RANK '+str(self.mpi_comm.rank)+' with the data range '+str(self.N_range))
mpi_comm.Bcast(self.param_array, root=0)
self.update_model(True)
def __getstate__(self):
dc = super(SparseGP_MPI, self).__getstate__()
dc['mpi_comm'] = None
if self.mpi_comm != None:
del dc['N_range']
del dc['N_list']
del dc['Y_local']
if 'normalizer' not in dc:
dc['normalizer'] = None
dc['Y_normalized'] = dc['Y']
return dc
#=====================================================
# The MPI parallelization
# - can move to model at some point
#=====================================================
@SparseGP.optimizer_array.setter
def optimizer_array(self, p):
if self.mpi_comm != None:
if self._IN_OPTIMIZATION_ and self.mpi_comm.rank==0:
self.mpi_comm.Bcast(np.int32(1),root=0)
self.mpi_comm.Bcast(p, root=0)
SparseGP.optimizer_array.fset(self,p)
[docs] def optimize(self, optimizer=None, start=None, **kwargs):
self._IN_OPTIMIZATION_ = True
if self.mpi_comm==None:
ret = super(SparseGP_MPI, self).optimize(optimizer,start,**kwargs)
elif self.mpi_comm.rank==0:
ret = super(SparseGP_MPI, self).optimize(optimizer,start,**kwargs)
self.mpi_comm.Bcast(np.int32(-1),root=0)
elif self.mpi_comm.rank>0:
x = self.optimizer_array.copy()
flag = np.empty(1,dtype=np.int32)
while True:
self.mpi_comm.Bcast(flag,root=0)
if flag==1:
try:
self.optimizer_array = x
self._fail_count = 0
except (LinAlgError, ZeroDivisionError, ValueError):
if self._fail_count >= self._allowed_failures:
raise
self._fail_count += 1
elif flag==-1:
ret = None
break
else:
self._IN_OPTIMIZATION_ = False
raise Exception("Unrecognizable flag for synchronization!")
self._IN_OPTIMIZATION_ = False
return ret
[docs] def parameters_changed(self):
if isinstance(self.inference_method,VarDTC_minibatch):
update_gradients(self, mpi_comm=self.mpi_comm)
else:
super(SparseGP_MPI,self).parameters_changed()