Coverage for aftercovid/models/_base_sir_estimation.py: 100%
111 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-05-09 03:09 +0200
« prev ^ index » next coverage.py v7.1.0, created at 2024-05-09 03:09 +0200
1# coding: utf-8
2"""
3Common methods about training, predicting for :epkg:`SIR` models.
4"""
5import pprint
6import numpy
7from sympy import Symbol, diff as sympy_diff
8from ..optim import SGDOptimizer
11class BaseSIREstimation:
12 """
13 Common methods about training, predicting for :epkg:`SIR` models.
14 """
16 def _check_fit_predict(self, X, y=None):
17 if not isinstance(X, numpy.ndarray):
18 raise TypeError("X must be a numpy array.")
19 if len(X.shape) != 2:
20 raise ValueError("X must be a matrix.")
21 clq = self.quantity_names
22 if len(clq) != X.shape[1]:
23 raise ValueError(f"Unexpected number of columns, got "
24 f"{X.shape[1]}, expected {len(clq)}.")
25 sums = numpy.sum(X, axis=1)
26 mi, ma = sums.min(), sums.max()
27 df = abs(ma - mi)
28 if df > abs(ma) * 1e-3:
29 raise ValueError(
30 "All rows must sum up to the same amount. Current "
31 "range: [{}, {}].".format(mi, max))
32 if y is not None:
33 if not isinstance(y, numpy.ndarray):
34 raise TypeError("y must be a numpy array.")
35 if y.shape != X.shape:
36 raise ValueError(f"Unexpected shape of y, got {y.shape}, "
37 f"expected {X.shape}.")
38 return clq, ma
40 def fit(self, X, y, t=0, max_iter=100,
41 learning_rate_init=0.1, lr_schedule='constant',
42 momentum=0.9, power_t=0.5, early_th=None,
43 min_threshold=None, max_threshold=None,
44 verbose=False):
45 """
46 Fits a model :class:`BaseSIR <aftercovid.models._base_sir.BaseSIR>`.
48 :param X: known values for every quantity at time *t*,
49 every column is mapped to the list returned by
50 :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>`
51 :param y: known derivative for every quantity at time *t*,
52 comes in the same order as *X*,
53 both *X* and *y* have the same shape.
54 :param t: implicit feature
55 :param max_iter: number of iteration
56 :param learning_rate_init: see :class:`SGDOptimizer
57 <aftercovid.optim.SGDOptimizer>`
58 :param lr_schedule: see :class:`SGDOptimizer
59 <aftercovid.optim.SGDOptimizer>`
60 :param momentum: see :class:`SGDOptimizer
61 <aftercovid.optim.SGDOptimizer>`
62 :param power_t: see :class:`SGDOptimizer
63 <aftercovid.optim.SGDOptimizer>`
64 :param early_th: see :class:`SGDOptimizer
65 <aftercovid.optim.SGDOptimizer>`
66 :param verbose: see :class:`SGDOptimizer
67 <aftercovid.optim.SGDOptimizer>`
68 :param min_threshold: see :class:`SGDOptimizer
69 <aftercovid.optim.SGDOptimizer>`
70 :param max_threshold: see :class:`SGDOptimizer
71 <aftercovid.optim.SGDOptimizer>`
73 The training needs two steps. The first one creates a training
74 datasets. The second one estimates the coefficients by using
75 a stochastic gradient descent (see :class:`SGDOptimizer
76 <aftercovid.optim.SGDOptimizer>`).
77 Let's use a SIR model (see :class:`CovidSIR
78 <aftercovid.models.CovidSIR>`).as an example.
79 Let's denote the parameters as :math:`\\Omega`
80 and :math:`Z_1=S`, ..., :math:`Z_4=R`.
81 The model is defined by
82 :math:`\\frac{dZ_i}{dt} = f_i(\\Omega, Z)`
83 where :math:`Z=(Z_1, ..., Z_4)`.
84 *y* is used to compute the expected derivates
85 :math:`\\frac{dZ_i}{dt}`. The loss function is defined as:
87 .. math::
89 L(\\Omega,Z) = \\sum_{i=1}^4 \\left( f_i(\\Omega,Z) -
90 \\frac{dZ_i}{dt}\\right)^2
92 Then the gradient is:
94 .. math::
96 \\frac{\\partial L(\\Omega,Z)}{\\partial\\Omega} =
97 2 \\sum_{i=1}^4 \\frac{\\partial f_i(\\Omega,Z)}{\\partial\\Omega}
98 \\left( f_i(\\Omega,Z) - \\frac{dZ_i}{dt} \\right)
100 A stochastic gradient descent takes care of the rest.
101 """
102 self._fit(X, y, t=0, max_iter=max_iter,
103 learning_rate_init=learning_rate_init,
104 lr_schedule=lr_schedule, momentum=momentum,
105 power_t=power_t, verbose=verbose, early_th=early_th,
106 min_threshold=min_threshold, max_threshold=max_threshold)
107 return self
109 def _losses_sympy(self):
110 """
111 Builds the loss functions using :epkg:`sympy`,
112 one for each quantity.
113 """
114 clq = self.quantity_names
115 res = []
116 for name in clq:
117 sym = Symbol('d' + name)
118 eq = self._eq[name]
119 lo = (eq - sym) ** 2
120 la = self._lambdify_(f'loss-{name}', lo, derivative=True)
121 res.append((lo, la))
122 return res
124 def _grads_sympy(self):
125 """
126 Builds the gradient for every parameter,
127 exactly number of quantities multiplied by
128 the number of parameters.
129 """
130 losses = self._losses_sympy()
131 clq = self.quantity_names
132 prn = self.param_names
133 res = []
134 for name, eq in zip(clq, losses):
135 row = []
136 for pn in prn:
137 pns = Symbol(pn)
138 df = sympy_diff(eq[0], pns)
139 gr = self._lambdify_(f'grad-{name}/{pn}', df,
140 derivative=True)
141 row.append((df, gr))
142 res.append(row)
143 return res
145 def predict(self, X, t=0):
146 """
147 Predicts the derivative at time *t*.
149 :param X: known values for every quantity at time *t*,
150 every column is mapped to the list returned by
151 :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>`
152 :param t: implicit feature
153 :return: predictive derivative
154 """
155 N = self._check_fit_predict(X)[1]
156 err = abs(N - self['N']) / N
157 if err > 1e-4:
158 raise ValueError( # pragma: no cover
159 f"All rows must sum up to {self['N']} not {N}.")
160 n = X.shape[0]
161 C = X.shape[1]
162 pred = numpy.empty(X.shape, dtype=X.dtype)
163 x = self.vect(t=t)
164 for i in range(t, t + n):
165 x[-1] = i
166 x[:C] = X[i, :]
167 for c, f in enumerate(self._leqa):
168 pred[i, c] = f(*x)
169 return pred
171 def score(self, X, y, t=0):
172 """
173 Scores the predictions. Returns L2 norm
174 divided by the number of rows.
176 :param X: known values for every quantity at time *t*,
177 every column is mapped to the list returned by
178 :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>`
179 :param y: expected values
180 :param t: implicit feature
181 :return: predictive derivative
182 """
183 self._check_fit_predict(X, y)
184 pred = self.predict(X, t=t)
185 delta = (pred - y) ** 2
186 return numpy.sum(delta) / X.shape[0]
188 def score_l1(self, X, y, t=0):
189 """
190 Scores the predictions. Returns L1 norm
191 divided by the number of rows and the population.
193 :param X: known values for every quantity at time *t*,
194 every column is mapped to the list returned by
195 :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>`
196 :param y: expected values
197 :param t: implicit feature
198 :return: predictive derivative
199 """
200 self._check_fit_predict(X, y)
201 pred = self.predict(X, t=t)
202 delta = numpy.abs(pred - y)
203 return numpy.sum(delta) / (X.shape[0] * self['N'])
205 def _fit(self, X, y, t, max_iter,
206 learning_rate_init, lr_schedule,
207 momentum, power_t, early_th, min_threshold,
208 max_threshold, verbose):
209 '''
210 See method :meth:`fit
211 <aftercovid.models._base_sir_estimation.BaseSIREstimation.fit>`
212 and :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>`.
213 '''
214 N = self._check_fit_predict(X, y)[1]
215 err = abs(N - self['N']) / N
216 if err > 1e-4:
217 raise ValueError( # pragma: no cover
218 f"All rows must sum up to {self['N']} not {N}.")
220 # loss and gradients functions
221 losses = self._losses_sympy()
222 grads = self._grads_sympy()
223 xy0 = self.vect(t=0, derivative=True)
225 def pformat(d): # pragma: no cover
226 nd = {str(k): (str(v), type(v), type(k)) for k, v in d.items()}
227 return pprint.pformat(nd)
229 def fct_loss(coef, X, y):
230 'Computes the loss function for every X and y.'
231 xy0[self._val_ind[1]:self._val_ind[2]] = coef
233 res = 0.
234 for i in range(X.shape[0]):
235 xy0[-1] = i
236 xy0[:X.shape[1]] = X[i, :]
237 xy0[-self._val_ind[1]:] = y[i, :]
238 for loss in losses:
239 res += loss[1](*xy0)
240 return res
242 def fct_grad(coef, x, y, i):
243 'Computes the gradient function for every X and y.'
244 xy0[:self._val_ind[1]] = x
245 xy0[self._val_ind[1]:self._val_ind[2]] = coef
246 xy0[-self._val_ind[1]:] = y
248 res = numpy.zeros((coef.shape[0], ), dtype=x.dtype)
249 for row in grads:
250 for i, g in enumerate(row):
251 res[i] += g[1](*xy0)
252 return res
254 pnames = self.param_names
255 coef = numpy.array([self[p] for p in pnames])
256 lrn = 1. / (N ** 1.5)
257 sgd = SGDOptimizer(
258 coef, learning_rate_init=learning_rate_init * lrn,
259 lr_schedule=lr_schedule, momentum=momentum,
260 power_t=power_t, min_threshold=min_threshold,
261 max_threshold=max_threshold)
263 sgd.train(X, y, fct_loss, fct_grad, max_iter=max_iter,
264 early_th=early_th, verbose=verbose)
266 # uses trained coefficients
267 coef = sgd.coef
268 for n, c in zip(pnames, coef):
269 self[n] = c
270 self.iter_ = sgd.iter_