Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Defines a streaming dataframe. 

5""" 

6import pickle 

7import os 

8from io import StringIO, BytesIO 

9from inspect import isfunction 

10import numpy 

11import numpy.random as nrandom 

12import pandas 

13from pandas.testing import assert_frame_equal 

14from pandas.io.json import json_normalize 

15from .dataframe_split import sklearn_train_test_split, sklearn_train_test_split_streaming 

16from .dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream 

17 

18 

19class StreamingDataFrameSchemaError(Exception): 

20 """ 

21 Reveals an issue with inconsistant schemas. 

22 """ 

23 pass 

24 

25 

26class StreamingDataFrame: 

27 """ 

28 Defines a streaming dataframe. 

29 The goal is to reduce the memory footprint. 

30 The class takes a function which creates an iterator 

31 on :epkg:`dataframe`. We assume this function can 

32 be called multiple time. As a matter of fact, the 

33 function is called every time the class needs to walk 

34 through the stream with the following loop: 

35 

36 :: 

37 

38 for df in self: # self is a StreamingDataFrame 

39 # ... 

40 

41 The constructor cannot receive an iterator otherwise 

42 this class would be able to walk through the data 

43 only once. The main reason is it is impossible to 

44 :epkg:`*py:pickle` (or :epkg:`dill`) 

45 an iterator: it cannot be replicated. 

46 Instead, the class takes a function which generates 

47 an iterator on :epkg:`DataFrame`. 

48 Most of the methods returns either a :epkg:`DataFrame` 

49 either a @see cl StreamingDataFrame. In the second case, 

50 methods can be chained. 

51 

52 By default, the object checks that the schema remains 

53 the same between two chunks. This can be disabled 

54 by setting *check_schema=False* in the constructor. 

55 

56 The user should expect the data to remain stable. 

57 Every loop should produce the same data. However, 

58 in some situations, it is more efficient not to keep 

59 that constraints. Draw a random @see me sample 

60 is one of these cases. 

61 

62 :param iter_creation: function which creates an iterator or an 

63 instance of @see cl StreamingDataFrame 

64 :param check_schema: checks that the schema is the same 

65 for every :epkg:`dataframe` 

66 :param stable: indicates if the :epkg:`dataframe` remains the same 

67 whenever it is walked through 

68 """ 

69 

70 def __init__(self, iter_creation, check_schema=True, stable=True): 

71 self._delete_ = [] 

72 if isinstance(iter_creation, (pandas.DataFrame, dict, 

73 numpy.ndarray, str)): 

74 raise TypeError( 

75 "Unexpected type %r for iter_creation. It must " 

76 "be an iterator." % type(iter_creation)) 

77 if isinstance(iter_creation, StreamingDataFrame): 

78 self.iter_creation = iter_creation.iter_creation 

79 self.stable = iter_creation.stable 

80 else: 

81 self.iter_creation = iter_creation 

82 self.stable = stable 

83 self.check_schema = check_schema 

84 

85 def is_stable(self, do_check=False, n=10): 

86 """ 

87 Tells if the :epkg:`dataframe` is supposed to be stable. 

88 

89 @param do_check do not trust the value sent to the constructor 

90 @param n number of rows used to check the stability, 

91 None for all rows 

92 @return boolean 

93 

94 *do_check=True* means the methods checks the first 

95 *n* rows remains the same for two iterations. 

96 """ 

97 if do_check: 

98 for i, (a, b) in enumerate(zip(self, self)): 

99 if n is not None and i >= n: 

100 break 

101 try: 

102 assert_frame_equal(a, b) 

103 except AssertionError: # pragma: no cover 

104 return False 

105 return True 

106 else: 

107 return self.stable 

108 

109 def get_kwargs(self): 

110 """ 

111 Returns the parameters used to call the constructor. 

112 """ 

113 return dict(check_schema=self.check_schema) 

114 

115 def train_test_split(self, path_or_buf=None, export_method="to_csv", 

116 names=None, streaming=True, partitions=None, 

117 **kwargs): 

118 """ 

119 Randomly splits a :epkg:`dataframe` into smaller pieces. 

120 The function returns streams of file names. 

121 It chooses one of the options from module 

122 :mod:`dataframe_split <pandas_streaming.df.dataframe_split>`. 

123 

124 @param path_or_buf a string, a list of strings or buffers, if it is a 

125 string, it must contain ``{}`` like ``partition{}.txt``, 

126 if None, the function returns strings. 

127 @param export_method method used to store the partitions, by default 

128 :epkg:`pandas:DataFrame:to_csv`, additional parameters 

129 will be given to that function 

130 @param names partitions names, by default ``('train', 'test')`` 

131 @param kwargs parameters for the export function and 

132 :epkg:`sklearn:model_selection:train_test_split`. 

133 @param streaming the function switches to a 

134 streaming version of the algorithm. 

135 @param partitions splitting partitions 

136 @return outputs of the exports functions or two 

137 @see cl StreamingDataFrame if path_or_buf is None. 

138 

139 The streaming version of this algorithm is implemented by function 

140 @see fn sklearn_train_test_split_streaming. Its documentation 

141 indicates the limitation of the streaming version and gives some 

142 insights about the additional parameters. 

143 """ 

144 if streaming: 

145 if partitions is not None: 

146 if len(partitions) != 2: 

147 raise NotImplementedError( # pragma: no cover 

148 "Only train and test split is allowed, *partitions* " 

149 "must be of length 2.") 

150 kwargs = kwargs.copy() 

151 kwargs['train_size'] = partitions[0] 

152 kwargs['test_size'] = partitions[1] 

153 return sklearn_train_test_split_streaming(self, **kwargs) 

154 return sklearn_train_test_split(self, path_or_buf=path_or_buf, 

155 export_method=export_method, 

156 names=names, **kwargs) 

157 

158 @staticmethod 

159 def _process_kwargs(kwargs): 

160 """ 

161 Filters out parameters for the constructor of this class. 

162 """ 

163 kw = {} 

164 for k in ['check_schema']: 

165 if k in kwargs: 

166 kw[k] = kwargs[k] 

167 del kwargs[k] 

168 return kw 

169 

170 @staticmethod 

171 def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDataFrame': 

172 """ 

173 Reads a :epkg:`json` file or buffer as an iterator 

174 on :epkg:`DataFrame`. The signature is the same as 

175 :epkg:`pandas:read_json`. The important parameter is 

176 *chunksize* which defines the number 

177 of rows to parse in a single bloc 

178 and it must be defined to return an iterator. 

179 If *lines* is True, the function falls back into 

180 :epkg:`pandas:read_json`, otherwise it used 

181 @see fn enumerate_json_items. If *lines* is ``'stream'``, 

182 *enumerate_json_items* is called with parameter 

183 ``lines=True``. 

184 Parameter *flatten* uses the trick described at 

185 `Flattening JSON objects in Python 

186 <https://towardsdatascience.com/flattening-json-objects-in-python-f5343c794b10>`_. 

187 Examples: 

188 

189 .. runpython:: 

190 :showcode: 

191 

192 from io import BytesIO 

193 from pandas_streaming.df import StreamingDataFrame 

194 

195 data = b'''{"a": 1, "b": 2} 

196 {"a": 3, "b": 4}''' 

197 it = StreamingDataFrame.read_json(BytesIO(data), lines=True) 

198 dfs = list(it) 

199 print(dfs) 

200 

201 .. runpython:: 

202 :showcode: 

203 

204 from io import BytesIO 

205 from pandas_streaming.df import StreamingDataFrame 

206 

207 data = b'''[{"a": 1, 

208 "b": 2}, 

209 {"a": 3, 

210 "b": 4}]''' 

211 

212 it = StreamingDataFrame.read_json(BytesIO(data)) 

213 dfs = list(it) 

214 print(dfs) 

215 

216 .. index:: IncompleteJSONError 

217 

218 The parsed json must have an empty line at the end otherwise 

219 the following exception is raised: 

220 `ijson.common.IncompleteJSONError: ` 

221 `parse error: unallowed token at this point in JSON text`. 

222 """ 

223 if not isinstance(chunksize, int) or chunksize <= 0: 

224 raise ValueError( # pragma: no cover 

225 'chunksize must be a positive integer') 

226 kwargs_create = StreamingDataFrame._process_kwargs(kwargs) 

227 

228 if isinstance(args[0], (list, dict)): 

229 if flatten: 

230 return StreamingDataFrame.read_df( 

231 json_normalize(args[0]), **kwargs_create) 

232 return StreamingDataFrame.read_df(args[0], **kwargs_create) 

233 

234 if kwargs.get('lines', None) == 'stream': 

235 del kwargs['lines'] 

236 

237 def localf(a0=args[0]): 

238 if hasattr(a0, 'seek'): 

239 a0.seek(0) 

240 return enumerate_json_items( 

241 a0, encoding=kwargs.get('encoding', None), lines=True, 

242 flatten=flatten) 

243 

244 st = JsonIterator2Stream(localf) 

245 args = args[1:] 

246 

247 if chunksize is None: 

248 return StreamingDataFrame( 

249 lambda: pandas.read_json( 

250 st, *args, chunksize=None, lines=True, **kwargs), 

251 **kwargs_create) 

252 

253 def fct1(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): 

254 st.seek(0) 

255 for r in pandas.read_json( 

256 st, *args, chunksize=chunksize, nrows=chunksize, 

257 lines=True, **kw): 

258 yield r 

259 

260 return StreamingDataFrame(fct1, **kwargs_create) 

261 

262 if kwargs.get('lines', False): 

263 if flatten: 

264 raise NotImplementedError( 

265 "flatten==True is implemented with option lines='stream'") 

266 if chunksize is None: 

267 return StreamingDataFrame( 

268 lambda: pandas.read_json(*args, chunksize=None, **kwargs), 

269 **kwargs_create) 

270 

271 def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()): 

272 for r in pandas.read_json( 

273 *args, chunksize=chunksize, nrows=chunksize, **kw): 

274 yield r 

275 return StreamingDataFrame(fct2, **kwargs_create) 

276 

277 st = JsonIterator2Stream( 

278 lambda a0=args[0]: enumerate_json_items( 

279 a0, encoding=kwargs.get('encoding', None), flatten=flatten)) 

280 args = args[1:] 

281 if 'lines' in kwargs: 

282 del kwargs['lines'] 

283 

284 if chunksize is None: 

285 return StreamingDataFrame( 

286 lambda: pandas.read_json( 

287 st, *args, chunksize=chunksize, lines=True, **kwargs), 

288 **kwargs_create) 

289 

290 def fct3(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): 

291 if hasattr(st, 'seek'): 

292 st.seek(0) 

293 for r in pandas.read_json( 

294 st, *args, chunksize=chunksize, nrows=chunksize, 

295 lines=True, **kw): 

296 yield r 

297 return StreamingDataFrame(fct3, **kwargs_create) 

298 

299 @staticmethod 

300 def read_csv(*args, **kwargs) -> 'StreamingDataFrame': 

301 """ 

302 Reads a :epkg:`csv` file or buffer 

303 as an iterator on :epkg:`DataFrame`. 

304 The signature is the same as :epkg:`pandas:read_csv`. 

305 The important parameter is *chunksize* which defines the number 

306 of rows to parse in a single bloc. If not specified, 

307 it will be equal to 100000. 

308 """ 

309 if not kwargs.get('iterator', True): 

310 raise ValueError("If specified, iterator must be True.") 

311 if not kwargs.get('chunksize', 100000): 

312 raise ValueError("If specified, chunksize must not be None.") 

313 kwargs_create = StreamingDataFrame._process_kwargs(kwargs) 

314 kwargs['iterator'] = True 

315 if 'chunksize' not in kwargs: 

316 kwargs['chunksize'] = 100000 

317 return StreamingDataFrame(lambda: pandas.read_csv(*args, **kwargs), **kwargs_create) 

318 

319 @staticmethod 

320 def read_str(text, **kwargs) -> 'StreamingDataFrame': 

321 """ 

322 Reads a :epkg:`DataFrame` as an iterator on :epkg:`DataFrame`. 

323 The signature is the same as :epkg:`pandas:read_csv`. 

324 The important parameter is *chunksize* which defines the number 

325 of rows to parse in a single bloc. 

326 """ 

327 if not kwargs.get('iterator', True): 

328 raise ValueError("If specified, iterator must be True.") 

329 if not kwargs.get('chunksize', 100000): 

330 raise ValueError("If specified, chunksize must not be None.") 

331 kwargs_create = StreamingDataFrame._process_kwargs(kwargs) 

332 kwargs['iterator'] = True 

333 if 'chunksize' not in kwargs: 

334 kwargs['chunksize'] = 100000 

335 if isinstance(text, str): 

336 buffer = StringIO(text) 

337 else: 

338 buffer = BytesIO(text) 

339 return StreamingDataFrame( 

340 lambda: pandas.read_csv(buffer, **kwargs), **kwargs_create) 

341 

342 @staticmethod 

343 def read_df(df, chunksize=None, check_schema=True) -> 'StreamingDataFrame': 

344 """ 

345 Splits a :epkg:`DataFrame` into small chunks mostly for 

346 unit testing purposes. 

347 

348 @param df :epkg:`DataFrame` 

349 @param chunksize number rows per chunks (// 10 by default) 

350 @param check_schema check schema between two iterations 

351 @return iterator on @see cl StreamingDataFrame 

352 """ 

353 if chunksize is None: 

354 if hasattr(df, 'shape'): 

355 chunksize = df.shape[0] 

356 else: 

357 raise NotImplementedError( 

358 "Cannot retrieve size to infer chunksize for type={0}" 

359 ".".format(type(df))) 

360 

361 if hasattr(df, 'shape'): 

362 size = df.shape[0] 

363 else: 

364 raise NotImplementedError( # pragma: no cover 

365 "Cannot retrieve size for type={0}.".format(type(df))) 

366 

367 def local_iterator(): 

368 "local iterator" 

369 for i in range(0, size, chunksize): 

370 end = min(size, i + chunksize) 

371 yield df[i:end].copy() 

372 return StreamingDataFrame(local_iterator, check_schema=check_schema) 

373 

374 def __iter__(self): 

375 """ 

376 Iterator on a large file with a sliding window. 

377 Each windows is a :epkg:`DataFrame`. 

378 The method stores a copy of the initial iterator 

379 and restores it after the end of the iterations. 

380 If *check_schema* was enabled when calling the constructor, 

381 the method checks that every :epkg:`DataFrame` 

382 follows the same schema as the first chunck. 

383 

384 Even with a big chunk size, it might happen 

385 that consecutive chunks might detect different type 

386 for one particular column. An error message shows up 

387 saying ``Column types are different after row`` 

388 with more information about the column which failed. 

389 In that case, :epkg:`pandas:DataFrame.read_csv` can overwrite 

390 the type on one column by specifying 

391 ``dtype={column_name: new_type}``. It frequently happens 

392 when a string column has many missing values. 

393 """ 

394 iters = self.iter_creation() 

395 sch = None 

396 rows = 0 

397 for it in iters: 

398 if sch is None: 

399 sch = (list(it.columns), list(it.dtypes)) 

400 elif self.check_schema: 

401 if list(it.columns) != sch[0]: # pylint: disable=E1136 

402 raise StreamingDataFrameSchemaError( # pragma: no cover 

403 'Column names are different after row {0}\nFirst chunk: {1}' 

404 '\nCurrent chunk: {2}'.format( 

405 rows, sch[0], list(it.columns))) # pylint: disable=E1136 

406 if list(it.dtypes) != sch[1]: # pylint: disable=E1136 

407 errdf = pandas.DataFrame( 

408 dict(names=sch[0], schema1=sch[1], # pylint: disable=E1136 

409 schema2=list(it.dtypes))) # pylint: disable=E1136 

410 tdf = StringIO() 

411 errdf['diff'] = errdf['schema2'] != errdf['schema1'] 

412 errdf = errdf[errdf['diff']] 

413 errdf.to_csv(tdf, sep=",", index=False) 

414 raise StreamingDataFrameSchemaError( 

415 'Column types are different after row {0}. You may use option ' 

416 'dtype={{"column_name": str}} to force the type on this column.' 

417 '\n---\n{1}'.format(rows, tdf.getvalue())) 

418 

419 rows += it.shape[0] 

420 yield it 

421 

422 @property 

423 def shape(self): 

424 """ 

425 This is the kind of operations you do not want to do 

426 when a file is large because it goes through the whole 

427 stream just to get the number of rows. 

428 """ 

429 nl, nc = 0, 0 

430 for it in self: 

431 nc = max(it.shape[1], nc) 

432 nl += it.shape[0] 

433 return nl, nc 

434 

435 @property 

436 def columns(self): 

437 """ 

438 See :epkg:`pandas:DataFrame:columns`. 

439 """ 

440 for it in self: 

441 return it.columns 

442 # The dataframe is empty. 

443 return [] 

444 

445 @property 

446 def dtypes(self): 

447 """ 

448 See :epkg:`pandas:DataFrame:dtypes`. 

449 """ 

450 for it in self: 

451 return it.dtypes 

452 

453 def to_csv(self, path_or_buf=None, **kwargs) -> 'StreamingDataFrame': 

454 """ 

455 Saves the :epkg:`DataFrame` into string. 

456 See :epkg:`pandas:DataFrame.to_csv`. 

457 """ 

458 if path_or_buf is None: 

459 st = StringIO() 

460 close = False 

461 elif isinstance(path_or_buf, str): 

462 st = open( # pylint: disable=R1732 

463 path_or_buf, "w", encoding=kwargs.get('encoding')) 

464 close = True 

465 else: 

466 st = path_or_buf 

467 close = False 

468 

469 for df in self: 

470 df.to_csv(st, **kwargs) 

471 kwargs['header'] = False 

472 

473 if close: 

474 st.close() 

475 if isinstance(st, StringIO): 

476 return st.getvalue() 

477 return path_or_buf 

478 

479 def to_dataframe(self) -> pandas.DataFrame: 

480 """ 

481 Converts everything into a single :epkg:`DataFrame`. 

482 """ 

483 return pandas.concat(self, axis=0) 

484 

485 def to_df(self) -> pandas.DataFrame: 

486 """ 

487 Converts everything into a single :epkg:`DataFrame`. 

488 """ 

489 return self.to_dataframe() 

490 

491 def iterrows(self): 

492 """ 

493 See :epkg:`pandas:DataFrame:iterrows`. 

494 """ 

495 for df in self: 

496 for it in df.iterrows(): 

497 yield it 

498 

499 def head(self, n=5) -> pandas.DataFrame: 

500 """ 

501 Returns the first rows as a :epkg:`DataFrame`. 

502 """ 

503 st = [] 

504 total = 0 

505 for df in self: 

506 h = df.head(n=n) 

507 total += h.shape[0] 

508 st.append(h) 

509 if total >= n: 

510 break 

511 n -= h.shape[0] 

512 if len(st) == 1: 

513 return st[0] 

514 if len(st) == 0: 

515 return None 

516 return pandas.concat(st, axis=0) 

517 

518 def tail(self, n=5) -> pandas.DataFrame: 

519 """ 

520 Returns the last rows as a :epkg:`DataFrame`. 

521 The size of chunks must be greater than ``n`` to 

522 get ``n`` lines. This method is not efficient 

523 because the whole dataset must be walked through. 

524 """ 

525 for df in self: 

526 h = df.tail(n=n) 

527 return h 

528 

529 def where(self, *args, **kwargs) -> 'StreamingDataFrame': 

530 """ 

531 Applies :epkg:`pandas:DataFrame:where`. 

532 *inplace* must be False. 

533 This function returns a @see cl StreamingDataFrame. 

534 """ 

535 kwargs['inplace'] = False 

536 return StreamingDataFrame( 

537 lambda: map(lambda df: df.where(*args, **kwargs), self), 

538 **self.get_kwargs()) 

539 

540 def sample(self, reservoir=False, cache=False, **kwargs) -> 'StreamingDataFrame': 

541 """ 

542 See :epkg:`pandas:DataFrame:sample`. 

543 Only *frac* is available, otherwise choose 

544 @see me reservoir_sampling. 

545 This function returns a @see cl StreamingDataFrame. 

546 

547 @param reservoir use `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_ 

548 @param cache cache the sample 

549 @param kwargs additional parameters for :epkg:`pandas:DataFrame:sample` 

550 

551 If *cache* is True, the sample is cached (assuming it holds in memory). 

552 The second time an iterator walks through the 

553 """ 

554 if reservoir or 'n' in kwargs: 

555 if 'frac' in kwargs: 

556 raise ValueError( 

557 'frac cannot be specified for reservoir sampling.') 

558 return self._reservoir_sampling(cache=cache, n=kwargs['n'], random_state=kwargs.get('random_state')) 

559 if cache: 

560 sdf = self.sample(cache=False, **kwargs) 

561 df = sdf.to_df() 

562 return StreamingDataFrame.read_df(df, chunksize=df.shape[0]) 

563 return StreamingDataFrame(lambda: map(lambda df: df.sample(**kwargs), self), **self.get_kwargs(), stable=False) 

564 

565 def _reservoir_sampling(self, cache=True, n=1000, random_state=None) -> 'StreamingDataFrame': 

566 """ 

567 Uses the `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_ 

568 algorithm to draw a random sample with exactly *n* samples. 

569 

570 @param cache cache the sample 

571 @param n number of observations to keep 

572 @param random_state sets the random_state 

573 @return @see cl StreamingDataFrame 

574 

575 .. warning:: 

576 The sample is split by chunks of size 1000. 

577 This parameter is not yet exposed. 

578 """ 

579 if not cache: 

580 raise ValueError( 

581 "cache=False is not available for reservoir sampling.") 

582 indices = [] 

583 seen = 0 

584 for i, df in enumerate(self): 

585 for ir, _ in enumerate(df.iterrows()): 

586 seen += 1 

587 if len(indices) < n: 

588 indices.append((i, ir)) 

589 else: 

590 x = nrandom.random() # pylint: disable=E1101 

591 if x * n < (seen - n): 

592 k = nrandom.randint(0, len(indices) - 1) 

593 indices[k] = (i, ir) # pylint: disable=E1126 

594 indices = set(indices) 

595 

596 def reservoir_iterate(sdf, indices, chunksize): 

597 "iterator" 

598 buffer = [] 

599 for i, df in enumerate(self): 

600 for ir, row in enumerate(df.iterrows()): 

601 if (i, ir) in indices: 

602 buffer.append(row) 

603 if len(buffer) >= chunksize: 

604 yield pandas.DataFrame(buffer) 

605 buffer.clear() 

606 if len(buffer) > 0: 

607 yield pandas.DataFrame(buffer) 

608 

609 return StreamingDataFrame( 

610 lambda: reservoir_iterate(sdf=self, indices=indices, chunksize=1000)) 

611 

612 def apply(self, *args, **kwargs) -> 'StreamingDataFrame': 

613 """ 

614 Applies :epkg:`pandas:DataFrame:apply`. 

615 This function returns a @see cl StreamingDataFrame. 

616 """ 

617 return StreamingDataFrame( 

618 lambda: map(lambda df: df.apply(*args, **kwargs), self), 

619 **self.get_kwargs()) 

620 

621 def applymap(self, *args, **kwargs) -> 'StreamingDataFrame': 

622 """ 

623 Applies :epkg:`pandas:DataFrame:applymap`. 

624 This function returns a @see cl StreamingDataFrame. 

625 """ 

626 return StreamingDataFrame( 

627 lambda: map(lambda df: df.applymap(*args, **kwargs), self), 

628 **self.get_kwargs()) 

629 

630 def merge(self, right, **kwargs) -> 'StreamingDataFrame': 

631 """ 

632 Merges two @see cl StreamingDataFrame and returns @see cl StreamingDataFrame. 

633 *right* can be either a @see cl StreamingDataFrame or simply 

634 a :epkg:`pandas:DataFrame`. It calls :epkg:`pandas:DataFrame:merge` in 

635 a double loop, loop on *self*, loop on *right*. 

636 """ 

637 if isinstance(right, pandas.DataFrame): 

638 return self.merge(StreamingDataFrame.read_df(right, chunksize=right.shape[0]), **kwargs) 

639 

640 def iterator_merge(sdf1, sdf2, **kw): 

641 "iterate on dataframes" 

642 for df1 in sdf1: 

643 for df2 in sdf2: 

644 df = df1.merge(df2, **kw) 

645 yield df 

646 

647 return StreamingDataFrame( 

648 lambda: iterator_merge(self, right, **kwargs), **self.get_kwargs()) 

649 

650 def concat(self, others, axis=0) -> 'StreamingDataFrame': 

651 """ 

652 Concatenates :epkg:`dataframes`. The function ensures all :epkg:`pandas:DataFrame` 

653 or @see cl StreamingDataFrame share the same columns (name and type). 

654 Otherwise, the function fails as it cannot guess the schema without 

655 walking through all :epkg:`dataframes`. 

656 

657 :param others: list, enumeration, :epkg:`pandas:DataFrame` 

658 :param axis: concatenate by rows (0) or by columns (1) 

659 :return: @see cl StreamingDataFrame 

660 """ 

661 if axis == 1: 

662 return self._concath(others) 

663 if axis == 0: 

664 return self._concatv(others) 

665 raise ValueError("axis must be 0 or 1") # pragma: no cover 

666 

667 def _concath(self, others): 

668 if not isinstance(others, list): 

669 others = [others] 

670 

671 def iterateh(self, others): 

672 cols = tuple([self] + others) 

673 for dfs in zip(*cols): 

674 nrows = [_.shape[0] for _ in dfs] 

675 if min(nrows) != max(nrows): 

676 raise RuntimeError( 

677 "StreamingDataFram cannot merge DataFrame with different size or chunksize") 

678 yield pandas.concat(list(dfs), axis=1) 

679 

680 return StreamingDataFrame(lambda: iterateh(self, others), **self.get_kwargs()) 

681 

682 def _concatv(self, others): 

683 

684 def iterator_concat(this, lothers): 

685 "iterator on dataframes" 

686 columns = None 

687 dtypes = None 

688 for df in this: 

689 if columns is None: 

690 columns = df.columns 

691 dtypes = df.dtypes 

692 yield df 

693 for obj in lothers: 

694 check = True 

695 for i, df in enumerate(obj): 

696 if check: 

697 if list(columns) != list(df.columns): 

698 raise ValueError( 

699 "Frame others[{0}] do not have the same column names or the same order.".format(i)) 

700 if list(dtypes) != list(df.dtypes): 

701 raise ValueError( 

702 "Frame others[{0}] do not have the same column types.".format(i)) 

703 check = False 

704 yield df 

705 

706 if isinstance(others, pandas.DataFrame): 

707 others = [others] 

708 elif isinstance(others, StreamingDataFrame): 

709 others = [others] 

710 

711 def change_type(obj): 

712 "change column type" 

713 if isinstance(obj, pandas.DataFrame): 

714 return StreamingDataFrame.read_df(obj, obj.shape[0]) 

715 else: 

716 return obj 

717 

718 others = list(map(change_type, others)) 

719 return StreamingDataFrame( 

720 lambda: iterator_concat(self, others), **self.get_kwargs()) 

721 

722 def groupby(self, by=None, lambda_agg=None, lambda_agg_agg=None, 

723 in_memory=True, **kwargs) -> pandas.DataFrame: 

724 """ 

725 Implements the streaming :epkg:`pandas:DataFrame:groupby`. 

726 We assume the result holds in memory. The out-of-memory is 

727 not implemented yet. 

728 

729 @param by see :epkg:`pandas:DataFrame:groupby` 

730 @param in_memory in-memory algorithm 

731 @param lambda_agg aggregation function, *sum* by default 

732 @param lambda_agg_agg to aggregate the aggregations, *sum* by default 

733 @param kwargs additional parameters for :epkg:`pandas:DataFrame:groupby` 

734 @return :epkg:`pandas:DataFrame` 

735 

736 As the input @see cl StreamingDataFrame does not necessarily hold 

737 in memory, the aggregation must be done at every iteration. 

738 There are two levels of aggregation: one to reduce every iterated 

739 :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`. 

740 This second one is always a **sum**. 

741 As a consequence, this function should not compute any *mean* or *count*, 

742 only *sum* because we do not know the size of each iterated 

743 :epkg:`dataframe`. To compute an average, sum and weights must be 

744 aggregated. 

745 

746 Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default. 

747 It could also be ``lambda gr: gr.max()`` or 

748 ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()`` 

749 as it would lead to incoherent results. 

750 

751 .. exref:: 

752 :title: StreamingDataFrame and groupby 

753 :tag: streaming 

754 

755 Here is an example which shows how to write a simple *groupby* 

756 with :epkg:`pandas` and @see cl StreamingDataFrame. 

757 

758 .. runpython:: 

759 :showcode: 

760 

761 from pandas import DataFrame 

762 from pandas_streaming.df import StreamingDataFrame 

763 

764 df = DataFrame(dict(A=[3, 4, 3], B=[5,6, 7])) 

765 sdf = StreamingDataFrame.read_df(df) 

766 

767 # The following: 

768 print(sdf.groupby("A", lambda gr: gr.sum())) 

769 

770 # Is equivalent to: 

771 print(df.groupby("A").sum()) 

772 """ 

773 if not in_memory: 

774 raise NotImplementedError( 

775 "Out-of-memory group by is not implemented.") 

776 if lambda_agg is None: 

777 def lambda_agg_(gr): 

778 "sum" 

779 return gr.sum() 

780 lambda_agg = lambda_agg_ 

781 if lambda_agg_agg is None: 

782 def lambda_agg_agg_(gr): 

783 "sum" 

784 return gr.sum() 

785 lambda_agg_agg = lambda_agg_agg_ 

786 ckw = kwargs.copy() 

787 ckw["as_index"] = False 

788 

789 agg = [] 

790 for df in self: 

791 gr = df.groupby(by=by, **ckw) 

792 agg.append(lambda_agg(gr)) 

793 conc = pandas.concat(agg, sort=False) 

794 return lambda_agg_agg(conc.groupby(by=by, **kwargs)) 

795 

796 def groupby_streaming(self, by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, 

797 strategy='cum', **kwargs) -> pandas.DataFrame: 

798 """ 

799 Implements the streaming :epkg:`pandas:DataFrame:groupby`. 

800 We assume the result holds in memory. The out-of-memory is 

801 not implemented yet. 

802 

803 :param by: see :epkg:`pandas:DataFrame:groupby` 

804 :param in_memory: in-memory algorithm 

805 :param lambda_agg: aggregation function, *sum* by default 

806 :param lambda_agg_agg: to aggregate the aggregations, *sum* by default 

807 :param kwargs: additional parameters for :epkg:`pandas:DataFrame:groupby` 

808 :param strategy: ``'cum'``, or ``'streaming'``, see below 

809 :return: :epkg:`pandas:DataFrame` 

810 

811 As the input @see cl StreamingDataFrame does not necessarily hold 

812 in memory, the aggregation must be done at every iteration. 

813 There are two levels of aggregation: one to reduce every iterated 

814 :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`. 

815 This second one is always a **sum**. 

816 As a consequence, this function should not compute any *mean* or *count*, 

817 only *sum* because we do not know the size of each iterated 

818 :epkg:`dataframe`. To compute an average, sum and weights must be 

819 aggregated. 

820 

821 Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default. 

822 It could also be ``lambda gr: gr.max()`` or 

823 ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()`` 

824 as it would lead to incoherent results. 

825 

826 Parameter *strategy* allows three scenarios. 

827 First one if ``strategy is None`` goes through 

828 the whole datasets to produce a final :epkg:`DataFrame`. 

829 Second if ``strategy=='cum'`` returns a 

830 @see cl StreamingDataFrame, each iteration produces 

831 the current status of the *group by*. Last case, 

832 ``strategy=='streaming'`` produces :epkg:`DataFrame` 

833 which must be concatenated into a single :epkg:`DataFrame` 

834 and grouped again to get the results. 

835 

836 .. exref:: 

837 :title: StreamingDataFrame and groupby 

838 :tag: streaming 

839 

840 Here is an example which shows how to write a simple *groupby* 

841 with :epkg:`pandas` and @see cl StreamingDataFrame. 

842 

843 .. runpython:: 

844 :showcode: 

845 

846 from pandas import DataFrame 

847 from pandas_streaming.df import StreamingDataFrame 

848 from pandas_streaming.data import dummy_streaming_dataframe 

849 

850 df20 = dummy_streaming_dataframe(20).to_dataframe() 

851 df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0) 

852 sdf20 = StreamingDataFrame.read_df(df20, chunksize=5) 

853 sgr = sdf20.groupby_streaming("key", lambda gr: gr.sum(), 

854 strategy='cum', as_index=False) 

855 for gr in sgr: 

856 print() 

857 print(gr) 

858 """ 

859 if not in_memory: 

860 raise NotImplementedError( 

861 "Out-of-memory group by is not implemented.") 

862 if lambda_agg is None: 

863 def lambda_agg_(gr): 

864 "sum" 

865 return gr.sum() 

866 lambda_agg = lambda_agg_ 

867 if lambda_agg_agg is None: 

868 def lambda_agg_agg_(gr): 

869 "sum" 

870 return gr.sum() 

871 lambda_agg_agg = lambda_agg_agg_ 

872 ckw = kwargs.copy() 

873 ckw["as_index"] = False 

874 

875 if strategy == 'cum': 

876 def iterate_cum(): 

877 agg = None 

878 for df in self: 

879 gr = df.groupby(by=by, **ckw) 

880 gragg = lambda_agg(gr) 

881 if agg is None: 

882 yield lambda_agg_agg(gragg.groupby(by=by, **kwargs)) 

883 agg = gragg 

884 else: 

885 lagg = pandas.concat([agg, gragg], sort=False) 

886 yield lambda_agg_agg(lagg.groupby(by=by, **kwargs)) 

887 agg = lagg 

888 return StreamingDataFrame(lambda: iterate_cum(), **self.get_kwargs()) 

889 

890 if strategy == 'streaming': 

891 def iterate_streaming(): 

892 for df in self: 

893 gr = df.groupby(by=by, **ckw) 

894 gragg = lambda_agg(gr) 

895 yield lambda_agg(gragg.groupby(by=by, **kwargs)) 

896 return StreamingDataFrame(lambda: iterate_streaming(), **self.get_kwargs()) 

897 

898 raise ValueError( # pragma: no cover 

899 "Unknown strategy '{0}'".format(strategy)) 

900 

901 def ensure_dtype(self, df, dtypes): 

902 """ 

903 Ensures the :epkg:`dataframe` *df* has types indicated in dtypes. 

904 Changes it if not. 

905 

906 :param df: dataframe 

907 :param dtypes: list of types 

908 :return: updated? 

909 """ 

910 ch = False 

911 cols = df.columns 

912 for i, (has, exp) in enumerate(zip(df.dtypes, dtypes)): 

913 if has != exp: 

914 name = cols[i] 

915 df[name] = df[name].astype(exp) 

916 ch = True 

917 return ch 

918 

919 def __getitem__(self, *args): 

920 """ 

921 Implements some of the functionalities :epkg:`pandas` 

922 offers for the operator ``[]``. 

923 """ 

924 if len(args) != 1: 

925 raise NotImplementedError("Only a list of columns is supported.") 

926 cols = args[0] 

927 if isinstance(cols, str): 

928 # One column. 

929 iter_creation = self.iter_creation 

930 

931 def iterate_col(): 

932 "iterate on one column" 

933 one_col = [cols] 

934 for df in iter_creation(): 

935 yield df[one_col] 

936 return StreamingSeries(iterate_col, **self.get_kwargs()) 

937 

938 if not isinstance(cols, list): 

939 raise NotImplementedError("Only a list of columns is supported.") 

940 

941 def iterate_cols(sdf): 

942 """Iterate on columns.""" 

943 for df in sdf: 

944 yield df[cols] 

945 

946 return StreamingDataFrame(lambda: iterate_cols(self), **self.get_kwargs()) 

947 

948 def __setitem__(self, index, value): 

949 """ 

950 Limited set of operators are supported. 

951 """ 

952 if not isinstance(index, str): 

953 raise ValueError( 

954 "Only column affected are supported but index=%r." % index) 

955 if isinstance(value, (int, float, numpy.number, str)): 

956 # Is is equivalent to add_column. 

957 iter_creation = self.iter_creation 

958 

959 def iterate_fct(): 

960 "iterate on rows" 

961 iters = iter_creation() 

962 for df in iters: 

963 dfc = df.copy() 

964 dfc[index] = value 

965 yield dfc 

966 

967 self.iter_creation = iterate_fct 

968 

969 elif isinstance(value, StreamingSeries): 

970 iter_creation = self.iter_creation 

971 

972 def iterate_fct(): 

973 "iterate on rows" 

974 iters = iter_creation() 

975 for df, dfs in zip(iters, value): 

976 if df.shape[0] != dfs.shape[0]: 

977 raise RuntimeError( 

978 "Chunksize or shape are different when " 

979 "iterating on two StreamDataFrame at the same " 

980 "time: %r != %r." % (df.shape[0], dfs.shape[0])) 

981 dfc = df.copy() 

982 dfc[index] = dfs 

983 yield dfc 

984 

985 self.iter_creation = iterate_fct 

986 else: 

987 raise NotImplementedError( 

988 "Not implemented for type(index)=%r and type(value)=%r." % ( 

989 type(index), type(value))) 

990 

991 def add_column(self, col, value): 

992 """ 

993 Implements some of the functionalities :epkg:`pandas` 

994 offers for the operator ``[]``. 

995 

996 @param col new column 

997 @param value @see cl StreamingDataFrame or a lambda function 

998 @return @see cl StreamingDataFrame 

999 

1000 ..note:: 

1001 

1002 If value is a @see cl StreamingDataFrame, 

1003 *chunksize* must be the same for both. 

1004 

1005 .. exref:: 

1006 :title: Add a new column to a StreamingDataFrame 

1007 :tag: streaming 

1008 

1009 .. runpython:: 

1010 :showcode: 

1011 

1012 from pandas import DataFrame 

1013 from pandas_streaming.df import StreamingDataFrame 

1014 

1015 df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"])) 

1016 sdf = StreamingDataFrame.read_df(df) 

1017 sdf2 = sdf.add_column("d", lambda row: int(1)) 

1018 print(sdf2.to_dataframe()) 

1019 

1020 sdf2 = sdf.add_column("d", lambda row: int(1)) 

1021 print(sdf2.to_dataframe()) 

1022 

1023 """ 

1024 if not isinstance(col, str): 

1025 raise NotImplementedError( 

1026 "Only a column as a string is supported.") 

1027 

1028 if isfunction(value): 

1029 def iterate_fct(self, value, col): 

1030 "iterate on rows" 

1031 for df in self: 

1032 dfc = df.copy() 

1033 dfc.insert(dfc.shape[1], col, dfc.apply(value, axis=1)) 

1034 yield dfc 

1035 

1036 return StreamingDataFrame(lambda: iterate_fct(self, value, col), **self.get_kwargs()) 

1037 

1038 if isinstance(value, (pandas.Series, pandas.DataFrame, StreamingDataFrame)): 

1039 raise NotImplementedError( 

1040 "Unable set a new column based on a datadframe.") 

1041 

1042 def iterate_cst(self, value, col): 

1043 "iterate on rows" 

1044 for df in self: 

1045 dfc = df.copy() 

1046 dfc[col] = value 

1047 yield dfc 

1048 

1049 return StreamingDataFrame( 

1050 lambda: iterate_cst(self, value, col), **self.get_kwargs()) 

1051 

1052 def fillna(self, **kwargs): 

1053 """ 

1054 Replaces the missing values, calls 

1055 :epkg:`pandas:DataFrame:fillna`. 

1056 

1057 @param kwargs see :epkg:`pandas:DataFrame:fillna` 

1058 @return @see cl StreamingDataFrame 

1059 

1060 .. warning:: 

1061 The function does not check what happens at the 

1062 limit of every chunk of data. Anything but a constant value 

1063 will probably have an inconsistent behaviour. 

1064 """ 

1065 

1066 def iterate_na(self, **kwargs): 

1067 "iterate on rows" 

1068 if kwargs.get('inplace', True): 

1069 kwargs['inplace'] = True 

1070 for df in self: 

1071 df.fillna(**kwargs) 

1072 yield df 

1073 else: 

1074 for df in self: 

1075 yield df.fillna(**kwargs) 

1076 

1077 return StreamingDataFrame( 

1078 lambda: iterate_na(self, **kwargs), **self.get_kwargs()) 

1079 

1080 def describe(self, percentiles=None, include=None, exclude=None, 

1081 datetime_is_numeric=False): 

1082 """ 

1083 Calls :epkg:`pandas:DataFrame:describe` on every piece 

1084 of the datasets. *percentiles* are not really accurate 

1085 but just an indication. 

1086 

1087 :param percentiles: see :epkg:`pandas:DataFrame:describe` 

1088 :param include: see :epkg:`pandas:DataFrame:describe` 

1089 :param exclude: see :epkg:`pandas:DataFrame:describe` 

1090 :param datetime_is_numeric: see :epkg:`pandas:DataFrame:describe` 

1091 :return: :epkg:`pandas:DataFrame:describe` 

1092 """ 

1093 merged = None 

1094 stack = [] 

1095 notper = ['count', 'mean', 'std'] 

1096 for df in self: 

1097 desc = df.describe( 

1098 percentiles=percentiles, include=include, exclude=exclude, 

1099 datetime_is_numeric=datetime_is_numeric) 

1100 count = desc.loc['count', :] 

1101 rows = [name for name in desc.index if name not in notper] 

1102 stack.append(desc.loc[rows, :]) 

1103 if merged is None: 

1104 merged = desc 

1105 merged.loc['std', :] = ( 

1106 merged.loc['std', :] ** 2 + merged.loc['mean', :] ** 2) * count 

1107 merged.loc['mean', :] *= count 

1108 else: 

1109 merged.loc['count', :] += desc.loc['count', :] 

1110 merged.loc['mean', :] += desc.loc['mean', :] * count 

1111 merged.loc['std', :] += ( 

1112 desc.loc['std', :] ** 2 + desc.loc['mean', :] ** 2) * count 

1113 merged.loc['max', :] = numpy.maximum( 

1114 merged.loc['max', :], desc.loc['max', :]) 

1115 merged.loc['min', :] = numpy.maximum( 

1116 merged.loc['min', :], desc.loc['min', :]) 

1117 merged.loc['mean', :] /= merged.loc['count', :] 

1118 merged.loc['std', :] = ( 

1119 merged.loc['std', :] / merged.loc['count', :] - 

1120 merged.loc['mean', :] ** 2) ** 0.5 

1121 values = pandas.concat(stack) 

1122 summary = values.describe(percentiles=percentiles, 

1123 datetime_is_numeric=datetime_is_numeric) 

1124 merged = merged.loc[notper, :] 

1125 rows = [name for name in summary.index if name not in notper] 

1126 summary = summary.loc[rows, :] 

1127 return pandas.concat([merged, summary]) 

1128 

1129 def sort_values(self, by, axis=0, ascending=True, kind='quicksort', 

1130 na_position='last', 

1131 temp_file='_pandas_streaming_sort_values_'): 

1132 """ 

1133 Sorts the streaming dataframe by values. 

1134 

1135 :param by: one column 

1136 :param ascending: order 

1137 :param kind: see :meth:`pandas.DataFrame.sort_values` 

1138 :param na_position: see :meth:`pandas.DataFrame.sort_values` 

1139 :param temp_file: sorting a whole database is impossible 

1140 without storing intermediate results on disk 

1141 unless it can fit into the memory, but in that case, 

1142 it is easier to convert the streaming database into 

1143 a dataframe and sort it 

1144 :return: streaming database 

1145 """ 

1146 if not isinstance(by, str): 

1147 raise NotImplementedError( 

1148 "Only one column can be used to sort not %r." % by) 

1149 keys = {} 

1150 nans = [] 

1151 indices = [] 

1152 with open(temp_file, 'wb') as f: 

1153 for df in self: 

1154 dfs = df.sort_values(by, ascending=ascending, kind=kind, 

1155 na_position=na_position) 

1156 for tu in dfs[by]: 

1157 if isinstance(tu, float) and numpy.isnan(tu): 

1158 nans.append(len(indices)) 

1159 else: 

1160 if tu not in keys: 

1161 keys[tu] = [] 

1162 keys[tu].append(len(indices)) 

1163 indices.append(f.tell()) 

1164 st = BytesIO() 

1165 pickle.dump(dfs, st) 

1166 f.write(st.getvalue()) 

1167 

1168 indices.append(f.tell()) 

1169 

1170 values = list(keys.items()) 

1171 values.sort(reverse=not ascending) 

1172 

1173 def iterate(): 

1174 

1175 with open(temp_file, 'rb') as f: 

1176 

1177 if na_position == 'first': 

1178 for p in nans: 

1179 f.seek(indices[p]) 

1180 length = indices[p + 1] - indices[p] 

1181 pkl = f.read(length) 

1182 dfs = pickle.load(BytesIO(pkl)) 

1183 sub = dfs[numpy.isnan(dfs[by])] 

1184 yield sub 

1185 

1186 for key, positions in values: 

1187 for p in positions: 

1188 f.seek(indices[p]) 

1189 length = indices[p + 1] - indices[p] 

1190 pkl = f.read(length) 

1191 dfs = pickle.load(BytesIO(pkl)) 

1192 sub = dfs[dfs[by] == key] 

1193 yield sub 

1194 

1195 if na_position == 'last': 

1196 for p in nans: 

1197 f.seek(indices[p]) 

1198 length = indices[p + 1] - indices[p] 

1199 pkl = f.read(length) 

1200 dfs = pickle.load(BytesIO(pkl)) 

1201 sub = dfs[numpy.isnan(dfs[by])] 

1202 yield sub 

1203 

1204 res = StreamingDataFrame( 

1205 lambda: iterate(), **self.get_kwargs()) 

1206 res._delete_.append(lambda: os.remove(temp_file)) 

1207 return res 

1208 

1209 def __del__(self): 

1210 """ 

1211 Calls every function in `_delete_`. 

1212 """ 

1213 for f in self._delete_: 

1214 f() 

1215 

1216 

1217class StreamingSeries(StreamingDataFrame): 

1218 """ 

1219 Seens as a @see cl StreamingDataFrame of one column. 

1220 """ 

1221 

1222 def __init__(self, iter_creation, check_schema=True, stable=True): 

1223 StreamingDataFrame.__init__( 

1224 self, iter_creation, check_schema=check_schema, stable=stable) 

1225 if len(self.columns) != 1: 

1226 raise RuntimeError( 

1227 "A series can contain only one column not %r." % len(self.columns)) 

1228 

1229 def apply(self, *args, **kwargs) -> 'StreamingDataFrame': 

1230 """ 

1231 Applies :epkg:`pandas:Series:apply`. 

1232 This function returns a @see cl StreamingSeries. 

1233 """ 

1234 return StreamingSeries( 

1235 lambda: map(lambda df: df.apply(*args, **kwargs), self), 

1236 **self.get_kwargs()) 

1237 

1238 def __add__(self, value): 

1239 """ 

1240 Does an addition on every value hoping that has a meaning. 

1241 

1242 :param value: any value which makes sense 

1243 :return: a new series 

1244 """ 

1245 def iterate(): 

1246 for df in self: 

1247 yield df + value 

1248 

1249 return StreamingSeries(iterate, **self.get_kwargs())