Coverage for aftercovid/models/_base_sir_estimation.py: 100%

111 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-05-09 03:09 +0200

1# coding: utf-8 

2""" 

3Common methods about training, predicting for :epkg:`SIR` models. 

4""" 

5import pprint 

6import numpy 

7from sympy import Symbol, diff as sympy_diff 

8from ..optim import SGDOptimizer 

9 

10 

11class BaseSIREstimation: 

12 """ 

13 Common methods about training, predicting for :epkg:`SIR` models. 

14 """ 

15 

16 def _check_fit_predict(self, X, y=None): 

17 if not isinstance(X, numpy.ndarray): 

18 raise TypeError("X must be a numpy array.") 

19 if len(X.shape) != 2: 

20 raise ValueError("X must be a matrix.") 

21 clq = self.quantity_names 

22 if len(clq) != X.shape[1]: 

23 raise ValueError(f"Unexpected number of columns, got " 

24 f"{X.shape[1]}, expected {len(clq)}.") 

25 sums = numpy.sum(X, axis=1) 

26 mi, ma = sums.min(), sums.max() 

27 df = abs(ma - mi) 

28 if df > abs(ma) * 1e-3: 

29 raise ValueError( 

30 "All rows must sum up to the same amount. Current " 

31 "range: [{}, {}].".format(mi, max)) 

32 if y is not None: 

33 if not isinstance(y, numpy.ndarray): 

34 raise TypeError("y must be a numpy array.") 

35 if y.shape != X.shape: 

36 raise ValueError(f"Unexpected shape of y, got {y.shape}, " 

37 f"expected {X.shape}.") 

38 return clq, ma 

39 

40 def fit(self, X, y, t=0, max_iter=100, 

41 learning_rate_init=0.1, lr_schedule='constant', 

42 momentum=0.9, power_t=0.5, early_th=None, 

43 min_threshold=None, max_threshold=None, 

44 verbose=False): 

45 """ 

46 Fits a model :class:`BaseSIR <aftercovid.models._base_sir.BaseSIR>`. 

47 

48 :param X: known values for every quantity at time *t*, 

49 every column is mapped to the list returned by 

50 :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>` 

51 :param y: known derivative for every quantity at time *t*, 

52 comes in the same order as *X*, 

53 both *X* and *y* have the same shape. 

54 :param t: implicit feature 

55 :param max_iter: number of iteration 

56 :param learning_rate_init: see :class:`SGDOptimizer 

57 <aftercovid.optim.SGDOptimizer>` 

58 :param lr_schedule: see :class:`SGDOptimizer 

59 <aftercovid.optim.SGDOptimizer>` 

60 :param momentum: see :class:`SGDOptimizer 

61 <aftercovid.optim.SGDOptimizer>` 

62 :param power_t: see :class:`SGDOptimizer 

63 <aftercovid.optim.SGDOptimizer>` 

64 :param early_th: see :class:`SGDOptimizer 

65 <aftercovid.optim.SGDOptimizer>` 

66 :param verbose: see :class:`SGDOptimizer 

67 <aftercovid.optim.SGDOptimizer>` 

68 :param min_threshold: see :class:`SGDOptimizer 

69 <aftercovid.optim.SGDOptimizer>` 

70 :param max_threshold: see :class:`SGDOptimizer 

71 <aftercovid.optim.SGDOptimizer>` 

72 

73 The training needs two steps. The first one creates a training 

74 datasets. The second one estimates the coefficients by using 

75 a stochastic gradient descent (see :class:`SGDOptimizer 

76 <aftercovid.optim.SGDOptimizer>`). 

77 Let's use a SIR model (see :class:`CovidSIR 

78 <aftercovid.models.CovidSIR>`).as an example. 

79 Let's denote the parameters as :math:`\\Omega` 

80 and :math:`Z_1=S`, ..., :math:`Z_4=R`. 

81 The model is defined by 

82 :math:`\\frac{dZ_i}{dt} = f_i(\\Omega, Z)` 

83 where :math:`Z=(Z_1, ..., Z_4)`. 

84 *y* is used to compute the expected derivates 

85 :math:`\\frac{dZ_i}{dt}`. The loss function is defined as: 

86 

87 .. math:: 

88 

89 L(\\Omega,Z) = \\sum_{i=1}^4 \\left( f_i(\\Omega,Z) - 

90 \\frac{dZ_i}{dt}\\right)^2 

91 

92 Then the gradient is: 

93 

94 .. math:: 

95 

96 \\frac{\\partial L(\\Omega,Z)}{\\partial\\Omega} = 

97 2 \\sum_{i=1}^4 \\frac{\\partial f_i(\\Omega,Z)}{\\partial\\Omega} 

98 \\left( f_i(\\Omega,Z) - \\frac{dZ_i}{dt} \\right) 

99 

100 A stochastic gradient descent takes care of the rest. 

101 """ 

102 self._fit(X, y, t=0, max_iter=max_iter, 

103 learning_rate_init=learning_rate_init, 

104 lr_schedule=lr_schedule, momentum=momentum, 

105 power_t=power_t, verbose=verbose, early_th=early_th, 

106 min_threshold=min_threshold, max_threshold=max_threshold) 

107 return self 

108 

109 def _losses_sympy(self): 

110 """ 

111 Builds the loss functions using :epkg:`sympy`, 

112 one for each quantity. 

113 """ 

114 clq = self.quantity_names 

115 res = [] 

116 for name in clq: 

117 sym = Symbol('d' + name) 

118 eq = self._eq[name] 

119 lo = (eq - sym) ** 2 

120 la = self._lambdify_(f'loss-{name}', lo, derivative=True) 

121 res.append((lo, la)) 

122 return res 

123 

124 def _grads_sympy(self): 

125 """ 

126 Builds the gradient for every parameter, 

127 exactly number of quantities multiplied by 

128 the number of parameters. 

129 """ 

130 losses = self._losses_sympy() 

131 clq = self.quantity_names 

132 prn = self.param_names 

133 res = [] 

134 for name, eq in zip(clq, losses): 

135 row = [] 

136 for pn in prn: 

137 pns = Symbol(pn) 

138 df = sympy_diff(eq[0], pns) 

139 gr = self._lambdify_(f'grad-{name}/{pn}', df, 

140 derivative=True) 

141 row.append((df, gr)) 

142 res.append(row) 

143 return res 

144 

145 def predict(self, X, t=0): 

146 """ 

147 Predicts the derivative at time *t*. 

148 

149 :param X: known values for every quantity at time *t*, 

150 every column is mapped to the list returned by 

151 :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>` 

152 :param t: implicit feature 

153 :return: predictive derivative 

154 """ 

155 N = self._check_fit_predict(X)[1] 

156 err = abs(N - self['N']) / N 

157 if err > 1e-4: 

158 raise ValueError( # pragma: no cover 

159 f"All rows must sum up to {self['N']} not {N}.") 

160 n = X.shape[0] 

161 C = X.shape[1] 

162 pred = numpy.empty(X.shape, dtype=X.dtype) 

163 x = self.vect(t=t) 

164 for i in range(t, t + n): 

165 x[-1] = i 

166 x[:C] = X[i, :] 

167 for c, f in enumerate(self._leqa): 

168 pred[i, c] = f(*x) 

169 return pred 

170 

171 def score(self, X, y, t=0): 

172 """ 

173 Scores the predictions. Returns L2 norm 

174 divided by the number of rows. 

175 

176 :param X: known values for every quantity at time *t*, 

177 every column is mapped to the list returned by 

178 :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>` 

179 :param y: expected values 

180 :param t: implicit feature 

181 :return: predictive derivative 

182 """ 

183 self._check_fit_predict(X, y) 

184 pred = self.predict(X, t=t) 

185 delta = (pred - y) ** 2 

186 return numpy.sum(delta) / X.shape[0] 

187 

188 def score_l1(self, X, y, t=0): 

189 """ 

190 Scores the predictions. Returns L1 norm 

191 divided by the number of rows and the population. 

192 

193 :param X: known values for every quantity at time *t*, 

194 every column is mapped to the list returned by 

195 :meth:`quantity_names <aftercovid.models._base_sir.quantity_names>` 

196 :param y: expected values 

197 :param t: implicit feature 

198 :return: predictive derivative 

199 """ 

200 self._check_fit_predict(X, y) 

201 pred = self.predict(X, t=t) 

202 delta = numpy.abs(pred - y) 

203 return numpy.sum(delta) / (X.shape[0] * self['N']) 

204 

205 def _fit(self, X, y, t, max_iter, 

206 learning_rate_init, lr_schedule, 

207 momentum, power_t, early_th, min_threshold, 

208 max_threshold, verbose): 

209 ''' 

210 See method :meth:`fit 

211 <aftercovid.models._base_sir_estimation.BaseSIREstimation.fit>` 

212 and :class:`SGDOptimizer <aftercovid.optim.SGDOptimizer>`. 

213 ''' 

214 N = self._check_fit_predict(X, y)[1] 

215 err = abs(N - self['N']) / N 

216 if err > 1e-4: 

217 raise ValueError( # pragma: no cover 

218 f"All rows must sum up to {self['N']} not {N}.") 

219 

220 # loss and gradients functions 

221 losses = self._losses_sympy() 

222 grads = self._grads_sympy() 

223 xy0 = self.vect(t=0, derivative=True) 

224 

225 def pformat(d): # pragma: no cover 

226 nd = {str(k): (str(v), type(v), type(k)) for k, v in d.items()} 

227 return pprint.pformat(nd) 

228 

229 def fct_loss(coef, X, y): 

230 'Computes the loss function for every X and y.' 

231 xy0[self._val_ind[1]:self._val_ind[2]] = coef 

232 

233 res = 0. 

234 for i in range(X.shape[0]): 

235 xy0[-1] = i 

236 xy0[:X.shape[1]] = X[i, :] 

237 xy0[-self._val_ind[1]:] = y[i, :] 

238 for loss in losses: 

239 res += loss[1](*xy0) 

240 return res 

241 

242 def fct_grad(coef, x, y, i): 

243 'Computes the gradient function for every X and y.' 

244 xy0[:self._val_ind[1]] = x 

245 xy0[self._val_ind[1]:self._val_ind[2]] = coef 

246 xy0[-self._val_ind[1]:] = y 

247 

248 res = numpy.zeros((coef.shape[0], ), dtype=x.dtype) 

249 for row in grads: 

250 for i, g in enumerate(row): 

251 res[i] += g[1](*xy0) 

252 return res 

253 

254 pnames = self.param_names 

255 coef = numpy.array([self[p] for p in pnames]) 

256 lrn = 1. / (N ** 1.5) 

257 sgd = SGDOptimizer( 

258 coef, learning_rate_init=learning_rate_init * lrn, 

259 lr_schedule=lr_schedule, momentum=momentum, 

260 power_t=power_t, min_threshold=min_threshold, 

261 max_threshold=max_threshold) 

262 

263 sgd.train(X, y, fct_loss, fct_grad, max_iter=max_iter, 

264 early_th=early_th, verbose=verbose) 

265 

266 # uses trained coefficients 

267 coef = sgd.coef 

268 for n, c in zip(pnames, coef): 

269 self[n] = c 

270 self.iter_ = sgd.iter_