Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Helpers for dataframes. 

5""" 

6import hashlib 

7import struct 

8import warnings 

9import numpy 

10from pandas import DataFrame, Index 

11 

12 

13def numpy_types(): 

14 """ 

15 Returns the list of :epkg:`numpy` available types. 

16 

17 :return: list of types 

18 """ 

19 

20 return [numpy.bool_, 

21 numpy.int_, 

22 numpy.intc, 

23 numpy.intp, 

24 numpy.int8, 

25 numpy.int16, 

26 numpy.int32, 

27 numpy.int64, 

28 numpy.uint8, 

29 numpy.uint16, 

30 numpy.uint32, 

31 numpy.uint64, 

32 numpy.float_, 

33 numpy.float16, 

34 numpy.float32, 

35 numpy.float64, 

36 numpy.complex_, 

37 numpy.complex64, 

38 numpy.complex128] 

39 

40 

41def hash_str(c, hash_length): 

42 """ 

43 Hashes a string. 

44 

45 @param c value to hash 

46 @param hash_length hash_length 

47 @return string 

48 """ 

49 if isinstance(c, float): 

50 if numpy.isnan(c): 

51 return c 

52 raise ValueError("numpy.nan expected, not {0}".format(c)) 

53 m = hashlib.sha256() 

54 m.update(c.encode("utf-8")) 

55 r = m.hexdigest() 

56 if len(r) >= hash_length: 

57 return r[:hash_length] 

58 return r 

59 

60 

61def hash_int(c, hash_length): 

62 """ 

63 Hashes an integer into an integer. 

64 

65 @param c value to hash 

66 @param hash_length hash_length 

67 @return int 

68 """ 

69 if isinstance(c, float): 

70 if numpy.isnan(c): 

71 return c 

72 else: 

73 raise ValueError("numpy.nan expected, not {0}".format(c)) 

74 else: 

75 b = struct.pack("i", c) 

76 m = hashlib.sha256() 

77 m.update(b) 

78 r = m.hexdigest() 

79 if len(r) >= hash_length: 

80 r = r[:hash_length] 

81 return int(r, 16) % (10 ** 8) 

82 

83 

84def hash_float(c, hash_length): 

85 """ 

86 Hashes a float into a float. 

87 

88 @param c value to hash 

89 @param hash_length hash_length 

90 @return int 

91 """ 

92 if numpy.isnan(c): 

93 return c 

94 else: 

95 b = struct.pack("d", c) 

96 m = hashlib.sha256() 

97 m.update(b) 

98 r = m.hexdigest() 

99 if len(r) >= hash_length: 

100 r = r[:hash_length] 

101 i = int(r, 16) % (2 ** 53) 

102 return float(i) 

103 

104 

105def dataframe_hash_columns(df, cols=None, hash_length=10, inplace=False): 

106 """ 

107 Hashes a set of columns in a dataframe. 

108 Keeps the same type. Skips missing values. 

109 

110 @param df dataframe 

111 @param cols columns to hash or None for alls. 

112 @param hash_length for strings only, length of the hash 

113 @param inplace modifies inplace 

114 @return new dataframe 

115 

116 This might be useful to anonimized data before 

117 making it public. 

118 

119 .. exref:: 

120 :title: Hashes a set of columns in a dataframe 

121 :tag: dataframe 

122 

123 .. runpython:: 

124 :showcode: 

125 

126 import pandas 

127 from pandas_streaming.df import dataframe_hash_columns 

128 df = pandas.DataFrame([dict(a=1, b="e", c=5.6, ind="a1", ai=1), 

129 dict(b="f", c=5.7, ind="a2", ai=2), 

130 dict(a=4, b="g", ind="a3", ai=3), 

131 dict(a=8, b="h", c=5.9, ai=4), 

132 dict(a=16, b="i", c=6.2, ind="a5", ai=5)]) 

133 print(df) 

134 print('--------------') 

135 df2 = dataframe_hash_columns(df) 

136 print(df2) 

137 """ 

138 if cols is None: 

139 cols = list(df.columns) 

140 

141 if not inplace: 

142 df = df.copy() 

143 

144 def hash_intl(c): 

145 "hash int" 

146 return hash_int(c, hash_length) 

147 

148 def hash_strl(c): 

149 "hash string" 

150 return hash_str(c, hash_length) 

151 

152 def hash_floatl(c): 

153 "hash float" 

154 return hash_float(c, hash_length) 

155 

156 coltype = {n: t for n, t in zip( # pylint: disable=R1721 

157 df.columns, df.dtypes)} # pylint: disable=R1721 

158 for c in cols: 

159 t = coltype[c] 

160 if t == int: 

161 df[c] = df[c].apply(hash_intl) 

162 elif t == numpy.int64: 

163 df[c] = df[c].apply(lambda x: numpy.int64(hash_intl(x))) 

164 elif t == float: 

165 df[c] = df[c].apply(hash_floatl) 

166 elif t == object: 

167 df[c] = df[c].apply(hash_strl) 

168 else: 

169 raise NotImplementedError( 

170 "Conversion of type {0} in column '{1}' is not implemented".format(t, c)) 

171 

172 return df 

173 

174 

175def dataframe_unfold(df, col, new_col=None, sep=","): 

176 """ 

177 One column may contain concatenated values. 

178 This function splits these values and multiplies the 

179 rows for each split value. 

180 

181 @param df dataframe 

182 @param col column with the concatenated values (strings) 

183 @param new_col new column name, if None, use default value. 

184 @param sep separator 

185 @return a new dataframe 

186 

187 .. exref:: 

188 :title: Unfolds a column of a dataframe. 

189 :tag: dataframe 

190 

191 .. runpython:: 

192 :showcode: 

193 

194 import pandas 

195 import numpy 

196 from pandas_streaming.df import dataframe_unfold 

197 

198 df = pandas.DataFrame([dict(a=1, b="e,f"), 

199 dict(a=2, b="g"), 

200 dict(a=3)]) 

201 print(df) 

202 df2 = dataframe_unfold(df, "b") 

203 print('----------') 

204 print(df2) 

205 

206 # To fold: 

207 folded = df2.groupby('a').apply(lambda row: ','.join(row['b_unfold'].dropna()) \\ 

208 if len(row['b_unfold'].dropna()) > 0 else numpy.nan) 

209 print('----------') 

210 print(folded) 

211 """ 

212 if new_col is None: 

213 col_name = col + "_unfold" 

214 else: 

215 col_name = new_col 

216 temp_col = '__index__' 

217 while temp_col in df.columns: 

218 temp_col += "_" 

219 rows = [] 

220 for i, v in enumerate(df[col]): 

221 if isinstance(v, str): 

222 spl = v.split(sep) 

223 for vs in spl: 

224 rows.append({col: v, col_name: vs, temp_col: i}) 

225 else: 

226 rows.append({col: v, col_name: v, temp_col: i}) 

227 df = df.copy() 

228 df[temp_col] = list(range(df.shape[0])) 

229 dfj = DataFrame(rows) 

230 res = df.merge(dfj, on=[col, temp_col]) 

231 return res.drop(temp_col, axis=1).copy() 

232 

233 

234def dataframe_shuffle(df, random_state=None): 

235 """ 

236 Shuffles a dataframe. 

237 

238 :param df: :epkg:`pandas:DataFrame` 

239 :param random_state: seed 

240 :return: new :epkg:`pandas:DataFrame` 

241 

242 .. exref:: 

243 :title: Shuffles the rows of a dataframe 

244 :tag: dataframe 

245 

246 .. runpython:: 

247 :showcode: 

248 

249 import pandas 

250 from pandas_streaming.df import dataframe_shuffle 

251 

252 df = pandas.DataFrame([dict(a=1, b="e", c=5.6, ind="a1"), 

253 dict(a=2, b="f", c=5.7, ind="a2"), 

254 dict(a=4, b="g", c=5.8, ind="a3"), 

255 dict(a=8, b="h", c=5.9, ind="a4"), 

256 dict(a=16, b="i", c=6.2, ind="a5")]) 

257 print(df) 

258 print('----------') 

259 

260 shuffled = dataframe_shuffle(df, random_state=0) 

261 print(shuffled) 

262 """ 

263 if random_state is not None: 

264 state = numpy.random.RandomState(random_state) 

265 permutation = state.permutation 

266 else: 

267 permutation = numpy.random.permutation 

268 ori_cols = list(df.columns) 

269 scols = set(ori_cols) 

270 

271 no_index = df.reset_index(drop=False) 

272 keep_cols = [_ for _ in no_index.columns if _ not in scols] 

273 index = no_index.index 

274 index = permutation(index) 

275 shuffled = no_index.iloc[index, :] 

276 res = shuffled.set_index(keep_cols)[ori_cols] 

277 res.index.names = df.index.names 

278 return res 

279 

280 

281def pandas_fillna(df, by, hasna=None, suffix=None): 

282 """ 

283 Replaces the :epkg:`nan` values for something not :epkg:`nan`. 

284 Mostly used by @see fn pandas_groupby_nan. 

285 

286 :param df: dataframe 

287 :param by: list of columns for which we need to replace nan 

288 :param hasna: None or list of columns for which we need to replace NaN 

289 :param suffix: use a prefix for the NaN value 

290 :return: list of values chosen for each column, new dataframe (new copy) 

291 """ 

292 suffix = suffix if suffix else "²nan" 

293 df = df.copy() 

294 rep = {} 

295 for c in by: 

296 if hasna is not None and c not in hasna: 

297 continue 

298 if df[c].dtype in (str, bytes, object): 

299 se = set(df[c].dropna()) 

300 val = se.pop() 

301 if isinstance(val, str): 

302 cst = suffix 

303 val = "" 

304 elif isinstance(val, bytes): 

305 cst = b"_" 

306 else: 

307 raise TypeError( # pragma: no cover 

308 "Unable to determine a constant for type='{0}' dtype='{1}'".format( 

309 val, df[c].dtype)) 

310 val += cst 

311 while val in se: 

312 val += suffix 

313 df[c].fillna(val, inplace=True) 

314 rep[c] = val 

315 else: 

316 dr = df[c].dropna() 

317 mi = abs(dr.min()) 

318 ma = abs(dr.max()) 

319 val = ma + mi 

320 if val == ma and not isinstance(val, str): 

321 val += ma + 1. 

322 if val <= ma: 

323 raise ValueError( # pragma: no cover 

324 "Unable to find a different value for column '{}' v='{}: " 

325 "min={} max={}".format(c, val, mi, ma)) 

326 df[c].fillna(val, inplace=True) 

327 rep[c] = val 

328 return rep, df 

329 

330 

331def pandas_groupby_nan(df, by, axis=0, as_index=False, suffix=None, nanback=True, **kwargs): 

332 """ 

333 Does a *groupby* including keeping missing values (:epkg:`nan`). 

334 

335 :param df: dataframe 

336 :param by: column or list of columns 

337 :param axis: only 0 is allowed 

338 :param as_index: should be False 

339 :param suffix: None or a string 

340 :param nanback: put :epkg:`nan` back in the index, 

341 otherwise it leaves a replacement for :epkg:`nan`. 

342 (does not work when grouping by multiple columns) 

343 :param kwargs: other parameters sent to 

344 `groupby <http://pandas.pydata.org/pandas-docs/stable/ 

345 generated/pandas.DataFrame.groupby.html>`_ 

346 :return: groupby results 

347 

348 See `groupby and missing values <http://pandas-docs.github.io/ 

349 pandas-docs-travis/groupby.html#na-and-nat-group-handling>`_. 

350 If no :epkg:`nan` is detected, the function falls back in regular 

351 :epkg:`pandas:DataFrame:groupby` which has the following 

352 behavior. 

353 

354 .. exref:: 

355 :title: Group a dataframe by one column including nan values 

356 :tag: dataframe 

357 

358 The regular :epkg:`pandas:dataframe:GroupBy` of a 

359 :epkg:`pandas:DataFrame` removes every :epkg:`nan` 

360 values from the index. 

361 

362 .. runpython:: 

363 :showcode: 

364 

365 from pandas import DataFrame 

366 

367 data = [dict(a=2, ind="a", n=1), 

368 dict(a=2, ind="a"), 

369 dict(a=3, ind="b"), 

370 dict(a=30)] 

371 df = DataFrame(data) 

372 print(df) 

373 gr = df.groupby(["ind"]).sum() 

374 print(gr) 

375 

376 Function @see fn pandas_groupby_nan modifies keeps them. 

377 

378 .. runpython:: 

379 :showcode: 

380 

381 from pandas import DataFrame 

382 from pandas_streaming.df import pandas_groupby_nan 

383 

384 data = [dict(a=2, ind="a", n=1), 

385 dict(a=2, ind="a"), 

386 dict(a=3, ind="b"), 

387 dict(a=30)] 

388 df = DataFrame(data) 

389 gr2 = pandas_groupby_nan(df, ["ind"]).sum() 

390 print(gr2) 

391 """ 

392 if axis != 0: 

393 raise NotImplementedError("axis should be 0") 

394 if as_index: 

395 raise NotImplementedError("as_index must be False") 

396 if isinstance(by, tuple): 

397 raise TypeError("by should be of list not tuple") 

398 if not isinstance(by, list): 

399 by = [by] 

400 hasna = {} 

401 for b in by: 

402 h = df[b].isnull().values.any() 

403 if h: 

404 hasna[b] = True 

405 if len(hasna) > 0: 

406 rep, df_copy = pandas_fillna(df, by, hasna, suffix=suffix) 

407 res = df_copy.groupby(by, axis=axis, as_index=as_index, **kwargs) 

408 if len(by) == 1: 

409 if not nanback: 

410 dummy = DataFrame([{"a": "a"}]) 

411 do = dummy.dtypes[0] 

412 typ = {c: t for c, t in zip( # pylint: disable=R1721 

413 df.columns, df.dtypes)} # pylint: disable=R1721 

414 if typ[by[0]] != do: 

415 warnings.warn( # pragma: no cover 

416 "[pandas_groupby_nan] NaN value: {0}".format(rep)) 

417 return res 

418 for b in by: 

419 fnan = rep[b] 

420 if fnan in res.grouper.groups: 

421 res.grouper.groups[numpy.nan] = res.grouper.groups[fnan] 

422 del res.grouper.groups[fnan] 

423 new_val = list((numpy.nan if b == fnan else b) 

424 for b in res.grouper.result_index) 

425 res.grouper.groupings[0]._group_index = Index(new_val) 

426 res.grouper.groupings[0].obj[b].replace( 

427 fnan, numpy.nan, inplace=True) 

428 if hasattr(res.grouper, 'grouping'): 

429 if isinstance(res.grouper.groupings[0].grouper, numpy.ndarray): 

430 arr = numpy.array(new_val) 

431 res.grouper.groupings[0].grouper = arr 

432 if (hasattr(res.grouper.groupings[0], '_cache') and 

433 'result_index' in res.grouper.groupings[0]._cache): 

434 del res.grouper.groupings[0]._cache['result_index'] 

435 else: 

436 raise NotImplementedError("Not implemented for type: {0}".format( 

437 type(res.grouper.groupings[0].grouper))) 

438 else: 

439 grouper = res.grouper._get_grouper() 

440 if isinstance(grouper, numpy.ndarray): 

441 arr = numpy.array(new_val) 

442 res.grouper.groupings[0].grouping_vector = arr 

443 if (hasattr(res.grouper.groupings[0], '_cache') and 

444 'result_index' in res.grouper.groupings[0]._cache): 

445 index = res.grouper.groupings[0]._cache['result_index'] 

446 if len(rep) == 1: 

447 key = list(rep.values())[0] 

448 new_index = numpy.array(index) 

449 for i in range(0, len(new_index)): # pylint: disable=C0200 

450 if new_index[i] == key: 

451 new_index[i] = numpy.nan 

452 res.grouper.groupings[0]._cache['result_index'] = ( 

453 index.__class__(new_index)) 

454 else: 

455 raise NotImplementedError( 

456 "NaN values not implemented for multiindex.") 

457 else: 

458 raise NotImplementedError( 

459 "Not implemented for type: {0}".format( 

460 type(res.grouper.groupings[0].grouper))) 

461 res.grouper._cache['result_index'] = res.grouper.groupings[0]._group_index 

462 else: 

463 if not nanback: 

464 dummy = DataFrame([{"a": "a"}]) 

465 do = dummy.dtypes[0] 

466 typ = {c: t for c, t in zip( # pylint: disable=R1721 

467 df.columns, df.dtypes)} # pylint: disable=R1721 

468 for b in by: 

469 if typ[b] != do: 

470 warnings.warn( # pragma: no cover 

471 "[pandas_groupby_nan] NaN values: {0}".format(rep)) 

472 break 

473 return res 

474 raise NotImplementedError( 

475 "Not yet implemented. Replacing pseudo nan values by real nan " 

476 "values is not as easy as it looks. Use nanback=False") 

477 

478 # keys = list(res.grouper.groups.keys()) 

479 # didit = False 

480 # mapping = {} 

481 # for key in keys: 

482 # new_key = list(key) 

483 # mod = False 

484 # for k, b in enumerate(by): 

485 # if b not in rep: 

486 # continue 

487 # fnan = rep[b] 

488 # if key[k] == fnan: 

489 # new_key[k] = numpy.nan 

490 # mod = True 

491 # didit = True 

492 # mapping[fnan] = numpy.nan 

493 # if mod: 

494 # new_key = tuple(new_key) 

495 # mapping[key] = new_key 

496 # res.grouper.groups[new_key] = res.grouper.groups[key] 

497 # del res.grouper.groups[key] 

498 # if didit: 

499 # # this code deos not work 

500 # vnan = numpy.nan 

501 # new_index = list(mapping.get(v, v) 

502 # for v in res.grouper.result_index) 

503 # names = res.grouper.result_index.names 

504 # # index = MultiIndex.from_tuples(tuples=new_index, names=names) 

505 # # res.grouper.result_index = index # does not work cannot set 

506 # # values for [result_index] 

507 # for k in range(len(res.grouper.groupings)): 

508 # grou = res.grouper.groupings[k] 

509 # new_val = list(mapping.get(v, v) for v in grou) 

510 # grou._group_index = Index(new_val) 

511 # b = names[k] 

512 # if b in rep: 

513 # vv = rep[b] 

514 # grou.obj[b].replace(vv, vnan, inplace=True) 

515 # if isinstance(grou.grouper, numpy.ndarray): 

516 # grou.grouper = numpy.array(new_val) 

517 # else: 

518 # raise NotImplementedError( 

519 # "Not implemented for type: {0}".format(type(grou.grouper))) 

520 # del res.grouper._cache 

521 return res 

522 else: 

523 return df.groupby(by, axis=axis, **kwargs)