Source code for aftercovid.models._base_sir_estimation

# coding: utf-8
"""
Common methods about training, predicting for :epkg:`SIR` models.
"""
import pprint
import numpy
from sympy import Symbol, diff as sympy_diff
from ..optim import SGDOptimizer


[docs]class BaseSIREstimation: """ Common methods about training, predicting for :epkg:`SIR` models. """ def _check_fit_predict(self, X, y=None): if not isinstance(X, numpy.ndarray): raise TypeError("X must be a numpy array.") if len(X.shape) != 2: raise ValueError("X must be a matrix.") clq = self.quantity_names if len(clq) != X.shape[1]: raise ValueError(f"Unexpected number of columns, got " f"{X.shape[1]}, expected {len(clq)}.") sums = numpy.sum(X, axis=1) mi, ma = sums.min(), sums.max() df = abs(ma - mi) if df > abs(ma) * 1e-3: raise ValueError( "All rows must sum up to the same amount. Current " "range: [{}, {}].".format(mi, max)) if y is not None: if not isinstance(y, numpy.ndarray): raise TypeError("y must be a numpy array.") if y.shape != X.shape: raise ValueError(f"Unexpected shape of y, got {y.shape}, " f"expected {X.shape}.") return clq, ma
[docs] def fit(self, X, y, t=0, max_iter=100, learning_rate_init=0.1, lr_schedule='constant', momentum=0.9, power_t=0.5, early_th=None, min_threshold=None, max_threshold=None, verbose=False): """ Fits a model :class:`BaseSIR <aftercovid.models._base_sir.BaseSIR>`. :param X: known values for every quantity at time *t*, every column is mapped to the list returned by :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>` :param y: known derivative for every quantity at time *t*, comes in the same order as *X*, both *X* and *y* have the same shape. :param t: implicit feature :param max_iter: number of iteration :param learning_rate_init: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param lr_schedule: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param momentum: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param power_t: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param early_th: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param verbose: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param min_threshold: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` :param max_threshold: see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>` The training needs two steps. The first one creates a training datasets. The second one estimates the coefficients by using a stochastic gradient descent (see :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>`). Let's use a SIR model (see :class:`CovidSIR <aftercovid.models.CovidSIR>`).as an example. Let's denote the parameters as :math:`\\Omega` and :math:`Z_1=S`, ..., :math:`Z_4=R`. The model is defined by :math:`\\frac{dZ_i}{dt} = f_i(\\Omega, Z)` where :math:`Z=(Z_1, ..., Z_4)`. *y* is used to compute the expected derivates :math:`\\frac{dZ_i}{dt}`. The loss function is defined as: .. math:: L(\\Omega,Z) = \\sum_{i=1}^4 \\left( f_i(\\Omega,Z) - \\frac{dZ_i}{dt}\\right)^2 Then the gradient is: .. math:: \\frac{\\partial L(\\Omega,Z)}{\\partial\\Omega} = 2 \\sum_{i=1}^4 \\frac{\\partial f_i(\\Omega,Z)}{\\partial\\Omega} \\left( f_i(\\Omega,Z) - \\frac{dZ_i}{dt} \\right) A stochastic gradient descent takes care of the rest. """ self._fit(X, y, t=0, max_iter=max_iter, learning_rate_init=learning_rate_init, lr_schedule=lr_schedule, momentum=momentum, power_t=power_t, verbose=verbose, early_th=early_th, min_threshold=min_threshold, max_threshold=max_threshold) return self
def _losses_sympy(self): """ Builds the loss functions using :epkg:`sympy`, one for each quantity. """ clq = self.quantity_names res = [] for name in clq: sym = Symbol('d' + name) eq = self._eq[name] lo = (eq - sym) ** 2 la = self._lambdify_(f'loss-{name}', lo, derivative=True) res.append((lo, la)) return res def _grads_sympy(self): """ Builds the gradient for every parameter, exactly number of quantities multiplied by the number of parameters. """ losses = self._losses_sympy() clq = self.quantity_names prn = self.param_names res = [] for name, eq in zip(clq, losses): row = [] for pn in prn: pns = Symbol(pn) df = sympy_diff(eq[0], pns) gr = self._lambdify_(f'grad-{name}/{pn}', df, derivative=True) row.append((df, gr)) res.append(row) return res
[docs] def predict(self, X, t=0): """ Predicts the derivative at time *t*. :param X: known values for every quantity at time *t*, every column is mapped to the list returned by :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>` :param t: implicit feature :return: predictive derivative """ N = self._check_fit_predict(X)[1] err = abs(N - self['N']) / N if err > 1e-4: raise ValueError( # pragma: no cover f"All rows must sum up to {self['N']} not {N}.") n = X.shape[0] C = X.shape[1] pred = numpy.empty(X.shape, dtype=X.dtype) x = self.vect(t=t) for i in range(t, t + n): x[-1] = i x[:C] = X[i, :] for c, f in enumerate(self._leqa): pred[i, c] = f(*x) return pred
[docs] def score(self, X, y, t=0): """ Scores the predictions. Returns L2 norm divided by the number of rows. :param X: known values for every quantity at time *t*, every column is mapped to the list returned by :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>` :param y: expected values :param t: implicit feature :return: predictive derivative """ self._check_fit_predict(X, y) pred = self.predict(X, t=t) delta = (pred - y) ** 2 return numpy.sum(delta) / X.shape[0]
[docs] def score_l1(self, X, y, t=0): """ Scores the predictions. Returns L1 norm divided by the number of rows and the population. :param X: known values for every quantity at time *t*, every column is mapped to the list returned by :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>` :param y: expected values :param t: implicit feature :return: predictive derivative """ self._check_fit_predict(X, y) pred = self.predict(X, t=t) delta = numpy.abs(pred - y) return numpy.sum(delta) / (X.shape[0] * self['N'])
def _fit(self, X, y, t, max_iter, learning_rate_init, lr_schedule, momentum, power_t, early_th, min_threshold, max_threshold, verbose): ''' See method :meth:`fit <aftercovid.models._base_sir_estimation.BaseSIREstimation.fit>` and :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>`. ''' N = self._check_fit_predict(X, y)[1] err = abs(N - self['N']) / N if err > 1e-4: raise ValueError( # pragma: no cover f"All rows must sum up to {self['N']} not {N}.") # loss and gradients functions losses = self._losses_sympy() grads = self._grads_sympy() xy0 = self.vect(t=0, derivative=True) def pformat(d): # pragma: no cover nd = {str(k): (str(v), type(v), type(k)) for k, v in d.items()} return pprint.pformat(nd) def fct_loss(coef, X, y): 'Computes the loss function for every X and y.' xy0[self._val_ind[1]:self._val_ind[2]] = coef res = 0. for i in range(X.shape[0]): xy0[-1] = i xy0[:X.shape[1]] = X[i, :] xy0[-self._val_ind[1]:] = y[i, :] for loss in losses: res += loss[1](*xy0) return res def fct_grad(coef, x, y, i): 'Computes the gradient function for every X and y.' xy0[:self._val_ind[1]] = x xy0[self._val_ind[1]:self._val_ind[2]] = coef xy0[-self._val_ind[1]:] = y res = numpy.zeros((coef.shape[0], ), dtype=x.dtype) for row in grads: for i, g in enumerate(row): res[i] += g[1](*xy0) return res pnames = self.param_names coef = numpy.array([self[p] for p in pnames]) lrn = 1. / (N ** 1.5) sgd = SGDOptimizer( coef, learning_rate_init=learning_rate_init * lrn, lr_schedule=lr_schedule, momentum=momentum, power_t=power_t, min_threshold=min_threshold, max_threshold=max_threshold) sgd.train(X, y, fct_loss, fct_grad, max_iter=max_iter, early_th=early_th, verbose=verbose) # uses trained coefficients coef = sgd.coef for n, c in zip(pnames, coef): self[n] = c self.iter_ = sgd.iter_