Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Implements a connex split between train and test. 

5""" 

6from collections import Counter 

7import pandas 

8import numpy 

9from sklearn.model_selection import train_test_split 

10from .dataframe_helpers import dataframe_shuffle 

11 

12 

13class ImbalancedSplitException(Exception): 

14 """ 

15 Raised when an imbalanced split is detected. 

16 """ 

17 pass 

18 

19 

20def train_test_split_weights(df, weights=None, test_size=0.25, train_size=None, 

21 shuffle=True, fail_imbalanced=0.05, random_state=None): 

22 """ 

23 Splits a database in train/test given, every row 

24 can have a different weight. 

25 

26 @param df :epkg:`pandas:DataFrame` or @see cl StreamingDataFrame 

27 @param weights None or weights or weights column name 

28 @param test_size ratio for the test partition (if *train_size* is not specified) 

29 @param train_size ratio for the train partition 

30 @param shuffle shuffles before the split 

31 @param fail_imbalanced raises an exception if relative weights difference is higher than this value 

32 @param random_state seed for random generators 

33 @return train and test :epkg:`pandas:DataFrame` 

34 

35 If the dataframe is not shuffled first, the function 

36 will produce two datasets which are unlikely to be randomized 

37 as the function tries to keep equal weights among both paths 

38 without using randomness. 

39 """ 

40 if hasattr(df, 'iter_creation'): 

41 raise NotImplementedError( # pragma: no cover 

42 'Not implemented yet for StreamingDataFrame.') 

43 if isinstance(df, numpy.ndarray): 

44 raise NotImplementedError( # pragma: no cover 

45 "Not implemented on numpy arrays.") 

46 if shuffle: 

47 df = dataframe_shuffle(df, random_state=random_state) 

48 if weights is None: 

49 if test_size == 0 or train_size == 0: 

50 raise ValueError( 

51 "test_size={0} or train_size={1} cannot be null (1)." 

52 "".format(test_size, train_size)) 

53 return train_test_split(df, test_size=test_size, 

54 train_size=train_size, 

55 random_state=random_state) 

56 

57 if isinstance(weights, pandas.Series): 

58 weights = list(weights) 

59 elif isinstance(weights, str): 

60 weights = list(df[weights]) 

61 if len(weights) != df.shape[0]: 

62 raise ValueError( 

63 "Dimension mismatch between weights and dataframe " 

64 "{0} != {1}".format(df.shape[0], len(weights))) 

65 

66 p = (1 - test_size) if test_size else None 

67 if train_size is not None: 

68 p = train_size 

69 test_size = 1 - p 

70 if p is None or min(test_size, p) <= 0: 

71 raise ValueError( 

72 "test_size={0} or train_size={1} cannot be null (2)." 

73 "".format(test_size, train_size)) 

74 ratio = test_size / p 

75 

76 if random_state is None: 

77 randint = numpy.random.randint 

78 else: 

79 state = numpy.random.RandomState(random_state) 

80 randint = state.randint 

81 

82 balance = 0 

83 train_ids = [] 

84 test_ids = [] 

85 test_weights = 0 

86 train_weights = 0 

87 for i in range(0, df.shape[0]): 

88 w = weights[i] 

89 if balance == 0: 

90 h = randint(0, 1) 

91 totest = h == 0 

92 else: 

93 totest = balance < 0 

94 if totest: 

95 test_ids.append(i) 

96 balance += w 

97 test_weights += w 

98 else: 

99 train_ids.append(i) 

100 balance -= w * ratio 

101 train_weights += w * ratio 

102 

103 r = abs(train_weights - test_weights) / \ 

104 (1.0 * (train_weights + test_weights)) 

105 if r >= fail_imbalanced: 

106 raise ImbalancedSplitException( # pragma: no cover 

107 "Split is imbalanced: train_weights={0} test_weights={1} r={2}." 

108 "".format(train_weights, test_weights, r)) 

109 

110 return df.iloc[train_ids, :], df.iloc[test_ids, :] 

111 

112 

113def train_test_connex_split(df, groups, test_size=0.25, train_size=None, 

114 stratify=None, hash_size=9, unique_rows=False, 

115 shuffle=True, fail_imbalanced=0.05, keep_balance=None, 

116 stop_if_bigger=None, return_cnx=False, 

117 must_groups=None, random_state=None, fLOG=None): 

118 """ 

119 This split is for a specific case where data is linked 

120 in many ways. Let's assume we have three ids as we have 

121 for online sales: *(product id, user id, card id)*. 

122 As we may need to compute aggregated features, 

123 we need every id not to be present in both train and 

124 test set. The function computes the connected components 

125 and breaks each of them in two parts for train and test. 

126 

127 @param df :epkg:`pandas:DataFrame` 

128 @param groups columns name for the ids 

129 @param test_size ratio for the test partition (if *train_size* is not specified) 

130 @param train_size ratio for the train partition 

131 @param stratify column holding the stratification 

132 @param hash_size size of the hash to cache information about partition 

133 @param unique_rows ensures that rows are unique 

134 @param shuffle shuffles before the split 

135 @param fail_imbalanced raises an exception if relative weights difference 

136 is higher than this value 

137 @param stop_if_bigger (float) stops a connected components from being 

138 bigger than this ratio of elements, this should not be used 

139 unless a big components emerges, the algorithm stops merging 

140 but does not guarantee it returns the best cut, 

141 the value should be close to 0 

142 @param keep_balance (float), if not None, does not merge connected components 

143 if their relative sizes are too different, the value should be 

144 close to 1 

145 @param return_cnx returns connected components as a third results 

146 @param must_groups column name for ids which must not be shared by 

147 train/test partitions 

148 @param random_state seed for random generator 

149 @param fLOG logging function 

150 @return Two @see cl StreamingDataFrame, one 

151 for train, one for test. 

152 

153 The list of ids must hold in memory. 

154 There is no streaming implementation for the ids. 

155 

156 .. exref:: 

157 :title: Splits a dataframe, keep ids in separate partitions 

158 :tag: dataframe 

159 

160 In some data science problems, rows are not independant 

161 and share common value, most of the time ids. In some 

162 specific case, multiple ids from different columns are 

163 connected and must appear in the same partition. 

164 Testing that each id column is evenly split and do not 

165 appear in both sets in not enough. Connected components 

166 are needed. 

167 

168 .. runpython:: 

169 :showcode: 

170 

171 from pandas import DataFrame 

172 from pandas_streaming.df import train_test_connex_split 

173 

174 df = DataFrame([dict(user="UA", prod="PAA", card="C1"), 

175 dict(user="UA", prod="PB", card="C1"), 

176 dict(user="UB", prod="PC", card="C2"), 

177 dict(user="UB", prod="PD", card="C2"), 

178 dict(user="UC", prod="PAA", card="C3"), 

179 dict(user="UC", prod="PF", card="C4"), 

180 dict(user="UD", prod="PG", card="C5"), 

181 ]) 

182 

183 train, test = train_test_connex_split( 

184 df, test_size=0.5, groups=['user', 'prod', 'card'], 

185 fail_imbalanced=0.6) 

186 

187 print(train) 

188 print(test) 

189 

190 If *return_cnx* is True, the third results contains: 

191 

192 * connected components for each id 

193 * the dataframe with connected components as a new column 

194 

195 .. runpython:: 

196 :showcode: 

197 

198 from pandas import DataFrame 

199 from pandas_streaming.df import train_test_connex_split 

200 

201 df = DataFrame([dict(user="UA", prod="PAA", card="C1"), 

202 dict(user="UA", prod="PB", card="C1"), 

203 dict(user="UB", prod="PC", card="C2"), 

204 dict(user="UB", prod="PD", card="C2"), 

205 dict(user="UC", prod="PAA", card="C3"), 

206 dict(user="UC", prod="PF", card="C4"), 

207 dict(user="UD", prod="PG", card="C5"), 

208 ]) 

209 

210 train, test, cnx = train_test_connex_split( 

211 df, test_size=0.5, groups=['user', 'prod', 'card'], 

212 fail_imbalanced=0.6, return_cnx=True) 

213 

214 print(cnx[0]) 

215 print(cnx[1]) 

216 """ 

217 if stratify is not None: 

218 raise NotImplementedError( # pragma: no cover 

219 "Option stratify is not implemented.") 

220 if groups is None or len(groups) == 0: 

221 raise ValueError( # pragma: no cover 

222 "groups is empty. Use regular train_test_split.") 

223 if hasattr(df, 'iter_creation'): 

224 raise NotImplementedError( # pragma: no cover 

225 'Not implemented yet for StreamingDataFrame.') 

226 if isinstance(df, numpy.ndarray): 

227 raise NotImplementedError( # pragma: no cover 

228 "Not implemented on numpy arrays.") 

229 if shuffle: 

230 df = dataframe_shuffle(df, random_state=random_state) 

231 

232 dfids = df[groups].copy() 

233 if must_groups is not None: 

234 dfids_must = df[must_groups].copy() 

235 

236 name = "connex" 

237 while name in dfids.columns: 

238 name += "_" 

239 one = "weight" 

240 while one in dfids.columns: 

241 one += "_" 

242 

243 # Connected components. 

244 elements = list(range(dfids.shape[0])) 

245 counts_cnx = {i: {i} for i in elements} 

246 connex = {} 

247 avoids_merge = {} 

248 

249 def do_connex_components(dfrows, local_groups, kb, sib): 

250 "run connected components algorithms" 

251 itern = 0 

252 modif = 1 

253 

254 while modif > 0 and itern < len(elements): 

255 if fLOG and df.shape[0] > 10000: 

256 fLOG("[train_test_connex_split] iteration={0}-#nb connect={1} - " 

257 "modif={2}".format(iter, len(set(elements)), modif)) 

258 modif = 0 

259 itern += 1 

260 for i, row in enumerate(dfrows.itertuples(index=False, name=None)): 

261 vals = [val for val in zip(local_groups, row) if not isinstance( 

262 val[1], float) or not numpy.isnan(val[1])] 

263 

264 c = elements[i] 

265 

266 for val in vals: 

267 if val not in connex: 

268 connex[val] = c 

269 modif += 1 

270 

271 set_c = set(connex[val] for val in vals) 

272 set_c.add(c) 

273 new_c = min(set_c) 

274 

275 add_pair_c = [] 

276 for c in set_c: 

277 if c == new_c or (new_c, c) in avoids_merge: 

278 continue 

279 if kb is not None: 

280 maxi = min(len(counts_cnx[new_c]), len(counts_cnx[c])) 

281 if maxi > 5: 

282 diff = len(counts_cnx[new_c]) + \ 

283 len(counts_cnx[c]) - maxi 

284 r = diff / float(maxi) 

285 if r > kb: 

286 if fLOG: # pragma: no cover 

287 fLOG('[train_test_connex_split] balance ' 

288 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, ' 

289 '#[{4}]={5}'.format(r, kb, new_c, 

290 len(counts_cnx[new_c]), 

291 c, len(counts_cnx[c]))) 

292 continue 

293 

294 if sib is not None: 

295 r = (len(counts_cnx[new_c]) + 

296 len(counts_cnx[c])) / float(len(elements)) 

297 if r > sib: 

298 if fLOG: # pragma: no cover 

299 fLOG('[train_test_connex_split] no merge ' 

300 'r={0:0.00000}>{1:0.00}, #[{2}]={3}, #[{4}]={5}' 

301 ''.format(r, sib, new_c, len(counts_cnx[new_c]), 

302 c, len(counts_cnx[c]))) 

303 avoids_merge[new_c, c] = i 

304 continue 

305 

306 add_pair_c.append(c) 

307 

308 if len(add_pair_c) > 0: 

309 for c in add_pair_c: 

310 modif += len(counts_cnx[c]) 

311 for ii in counts_cnx[c]: 

312 elements[ii] = new_c 

313 counts_cnx[new_c] = counts_cnx[new_c].union( 

314 counts_cnx[c]) 

315 counts_cnx[c] = set() 

316 

317 keys = list(vals) 

318 for val in keys: 

319 if connex[val] == c: 

320 connex[val] = new_c 

321 modif += 1 

322 

323 if must_groups: 

324 do_connex_components(dfids_must, must_groups, None, None) 

325 do_connex_components(dfids, groups, keep_balance, stop_if_bigger) 

326 

327 # final 

328 dfids[name] = elements 

329 dfids[one] = 1 

330 grsum = dfids[[name, one]].groupby(name, as_index=False).sum() 

331 if fLOG: 

332 for g in groups: 

333 fLOG("[train_test_connex_split] #nb in '{0}': {1}".format( 

334 g, len(set(dfids[g])))) 

335 fLOG( 

336 "[train_test_connex_split] #connex {0}/{1}".format( 

337 grsum.shape[0], dfids.shape[0])) 

338 if grsum.shape[0] <= 1: 

339 raise ValueError( # pragma: no cover 

340 "Every element is in the same connected components.") 

341 

342 # Statistics: top connected components 

343 if fLOG: 

344 # Global statistics 

345 counts = Counter(elements) 

346 cl = [(v, k) for k, v in counts.items()] 

347 cum = 0 

348 maxc = None 

349 fLOG("[train_test_connex_split] number of connected components: {0}" 

350 "".format(len(set(elements)))) 

351 for i, (v, k) in enumerate(sorted(cl, reverse=True)): 

352 if i == 0: 

353 maxc = k, v 

354 if i >= 10: 

355 break 

356 cum += v 

357 fLOG("[train_test_connex_split] c={0} #elements={1} cumulated" 

358 "={2}/{3}".format(k, v, cum, len(elements))) 

359 

360 # Most important component 

361 fLOG('[train_test_connex_split] first row of the biggest component ' 

362 '{0}'.format(maxc)) 

363 tdf = dfids[dfids[name] == maxc[0]] 

364 fLOG('[train_test_connex_split] \n{0}'.format(tdf.head(n=10))) 

365 

366 # Splits. 

367 train, test = train_test_split_weights( 

368 grsum, weights=one, test_size=test_size, train_size=train_size, 

369 shuffle=shuffle, fail_imbalanced=fail_imbalanced, 

370 random_state=random_state) 

371 train.drop(one, inplace=True, axis=1) 

372 test.drop(one, inplace=True, axis=1) 

373 

374 # We compute the final dataframe. 

375 def double_merge(d): 

376 "merge twice" 

377 merge1 = dfids.merge(d, left_on=name, right_on=name) 

378 merge2 = df.merge(merge1, left_on=groups, right_on=groups) 

379 return merge2 

380 

381 train_f = double_merge(train) 

382 test_f = double_merge(test) 

383 if return_cnx: 

384 return train_f, test_f, (connex, dfids) 

385 else: 

386 return train_f, test_f 

387 

388 

389def train_test_apart_stratify(df, group, test_size=0.25, train_size=None, 

390 stratify=None, force=False, random_state=None, 

391 fLOG=None): 

392 """ 

393 This split is for a specific case where data is linked 

394 in one way. Let's assume we have two ids as we have 

395 for online sales: *(product id, category id)*. 

396 A product can have multiple categories. We need to have 

397 distinct products on train and test but common categories 

398 on both sides. 

399 

400 @param df :epkg:`pandas:DataFrame` 

401 @param group columns name for the ids 

402 @param test_size ratio for the test partition 

403 (if *train_size* is not specified) 

404 @param train_size ratio for the train partition 

405 @param stratify column holding the stratification 

406 @param force if True, tries to get at least one example on the test side 

407 for each value of the column *stratify* 

408 @param random_state seed for random generators 

409 @param fLOG logging function 

410 @return Two @see cl StreamingDataFrame, one 

411 for train, one for test. 

412 

413 .. index:: multi-label 

414 

415 The list of ids must hold in memory. 

416 There is no streaming implementation for the ids. 

417 This split was implemented for a case of a multi-label 

418 classification. A category (*stratify*) is not exclusive 

419 and an observation can be assigned to multiple 

420 categories. In that particular case, the method 

421 `train_test_split <http://scikit-learn.org/stable/modules/generated/ 

422 sklearn.model_selection.train_test_split.html>`_ 

423 can not directly be used. 

424 

425 .. runpython:: 

426 :showcode: 

427 

428 import pandas 

429 from pandas_streaming.df import train_test_apart_stratify 

430 

431 df = pandas.DataFrame([dict(a=1, b="e"), 

432 dict(a=1, b="f"), 

433 dict(a=2, b="e"), 

434 dict(a=2, b="f")]) 

435 

436 train, test = train_test_apart_stratify( 

437 df, group="a", stratify="b", test_size=0.5) 

438 print(train) 

439 print('-----------') 

440 print(test) 

441 """ 

442 if stratify is None: 

443 raise ValueError( # pragma: no cover 

444 "stratify must be specified.") 

445 if group is None: 

446 raise ValueError( # pragma: no cover 

447 "group must be specified.") 

448 if hasattr(df, 'iter_creation'): 

449 raise NotImplementedError( 

450 'Not implemented yet for StreamingDataFrame.') 

451 if isinstance(df, numpy.ndarray): 

452 raise NotImplementedError("Not implemented on numpy arrays.") 

453 

454 p = (1 - test_size) if test_size else None 

455 if train_size is not None: 

456 p = train_size 

457 test_size = 1 - p 

458 if p is None or min(test_size, p) <= 0: 

459 raise ValueError( # pragma: no cover 

460 "test_size={0} or train_size={1} cannot be null".format( 

461 test_size, train_size)) 

462 

463 couples = df[[group, stratify]].itertuples(name=None, index=False) 

464 hist = Counter(df[stratify]) 

465 sorted_hist = [(v, k) for k, v in hist.items()] 

466 sorted_hist.sort() 

467 ids = {c: set() for c in hist} 

468 

469 for g, s in couples: 

470 ids[s].add(g) 

471 

472 if random_state is None: 

473 permutation = numpy.random.permutation 

474 else: 

475 state = numpy.random.RandomState(random_state) 

476 permutation = state.permutation 

477 

478 split = {} 

479 for _, k in sorted_hist: 

480 not_assigned = [c for c in ids[k] if c not in split] 

481 if len(not_assigned) == 0: 

482 continue 

483 assigned = [c for c in ids[k] if c in split] 

484 nb_test = sum(split[c] for c in assigned) 

485 expected = min(len(ids[k]), int( 

486 test_size * len(ids[k]) + 0.5)) - nb_test 

487 if force and expected == 0 and nb_test == 0: 

488 nb_train = len(assigned) - nb_test 

489 if nb_train > 0 or len(not_assigned) > 1: 

490 expected = min(1, len(not_assigned)) 

491 if expected > 0: 

492 permutation(not_assigned) 

493 for e in not_assigned[:expected]: 

494 split[e] = 1 

495 for e in not_assigned[expected:]: 

496 split[e] = 0 

497 else: 

498 for c in not_assigned: 

499 split[c] = 0 

500 

501 train_set = set(k for k, v in split.items() if v == 0) 

502 test_set = set(k for k, v in split.items() if v == 1) 

503 train_df = df[df[group].isin(train_set)] 

504 test_df = df[df[group].isin(test_set)] 

505 return train_df, test_df