Source code for pandas_streaming.df.dataframe

# -*- coding: utf-8 -*-
"""
Defines a streaming dataframe.


:githublink:`%|py|6`
"""
from io import StringIO, BytesIO
from inspect import isfunction
import numpy.random as nrandom
import pandas
from pandas.testing import assert_frame_equal
from pandas.io.json import json_normalize
from ..exc import StreamingInefficientException
from .dataframe_split import sklearn_train_test_split, sklearn_train_test_split_streaming
from .dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream


[docs]class StreamingDataFrameSchemaError(Exception): """ Reveals an issue with inconsistant schemas. :githublink:`%|py|20` """ pass
[docs]class StreamingDataFrame: """ Defines a streaming dataframe. The goal is to reduce the memory footprint. The class takes a function which creates an iterator on :epkg:`dataframe`. We assume this function can be called multiple time. As a matter of fact, the function is called every time the class needs to walk through the stream with the following loop: :: for df in self: # self is a StreamingDataFrame # ... The constructor cannot receive an iterator otherwise this class would be able to walk through the data only once. The main reason is it is impossible to :epkg:`*py:pickle` (or :epkg:`dill`) an iterator: it cannot be replicated. Instead, the class takes a function which generates an iterator on :epkg:`DataFrame`. Most of the methods returns either a :epkg:`DataFrame` either a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. In the second case, methods can be chained. By default, the object checks that the schema remains the same between two chunks. This can be disabled by setting *check_schema=False* in the constructor. The user should expect the data to remain stable. Every loop should produce the same data. However, in some situations, it is more efficient not to keep that constraints. Draw a random :meth:`sample <pandas_streaming.df.dataframe.StreamingDataFrame.sample>` is one of these cases. :githublink:`%|py|59` """
[docs] def __init__(self, iter_creation, check_schema=True, stable=True): """ :param iter_creation: function which creates an iterator or an instance of :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` :param check_schema: checks that the schema is the same for every :epkg:`dataframe` :param stable: indicates if the :epkg:`dataframe` remains the same whenever it is walked through :githublink:`%|py|68` """ if isinstance(iter_creation, StreamingDataFrame): self.iter_creation = iter_creation.iter_creation self.stable = iter_creation.stable else: self.iter_creation = iter_creation self.stable = stable self.check_schema = check_schema
[docs] def is_stable(self, do_check=False, n=10): """ Tells if the :epkg:`dataframe` is supposed to be stable. :param do_check: do not trust the value sent to the constructor :param n: number of rows used to check the stability, None for all rows :return: boolean *do_check=True* means the methods checks the first *n* rows remains the same for two iterations. :githublink:`%|py|88` """ if do_check: for i, (a, b) in enumerate(zip(self, self)): if n is not None and i >= n: break try: assert_frame_equal(a, b) except AssertionError: # pragma: no cover return False return True else: return self.stable
[docs] def get_kwargs(self): """ Returns the parameters used to call the constructor. :githublink:`%|py|104` """ return dict(check_schema=self.check_schema)
[docs] def train_test_split(self, path_or_buf=None, export_method="to_csv", names=None, streaming=True, partitions=None, **kwargs): """ Randomly splits a :epkg:`dataframe` into smaller pieces. The function returns streams of file names. It chooses one of the options from module :mod:`dataframe_split <pandas_streaming.df.dataframe_split>`. :param path_or_buf: a string, a list of strings or buffers, if it is a string, it must contain ``{}`` like ``partition{}.txt``, if None, the function returns strings. :param export_method: method used to store the partitions, by default :epkg:`pandas:DataFrame:to_csv`, additional parameters will be given to that function :param names: partitions names, by default ``('train', 'test')`` :param kwargs: parameters for the export function and :epkg:`sklearn:model_selection:train_test_split`. :param streaming: the function switches to a streaming version of the algorithm. :param partitions: splitting partitions :return: outputs of the exports functions or two :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` if path_or_buf is None. The streaming version of this algorithm is implemented by function :func:`sklearn_train_test_split_streaming <pandas_streaming.df.dataframe_split.sklearn_train_test_split_streaming>`. Its documentation indicates the limitation of the streaming version and gives some insights about the additional parameters. :githublink:`%|py|135` """ if streaming: if partitions is not None: if len(partitions) != 2: raise NotImplementedError( "Only train and test split is allowed, *partitions* must be of length 2.") kwargs = kwargs.copy() kwargs['train_size'] = partitions[0] kwargs['test_size'] = partitions[1] return sklearn_train_test_split_streaming(self, **kwargs) return sklearn_train_test_split(self, path_or_buf=path_or_buf, export_method=export_method, names=names, **kwargs)
[docs] @staticmethod def _process_kwargs(kwargs): """ Filters out parameters for the constructor of this class. :githublink:`%|py|153` """ kw = {} for k in {'check_schema'}: if k in kwargs: kw[k] = kwargs[k] del kwargs[k] return kw
[docs] @staticmethod def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDataFrame': """ Reads a :epkg:`json` file or buffer as an iterator on :epkg:`DataFrame`. The signature is the same as :epkg:`pandas:read_json`. The important parameter is *chunksize* which defines the number of rows to parse in a single bloc and it must be defined to return an iterator. If *lines* is True, the function falls back into :epkg:`pandas:read_json`, otherwise it used :func:`enumerate_json_items <pandas_streaming.df.dataframe_io_helpers.enumerate_json_items>`. If *lines is ``'stream'``, *enumerate_json_items* is called with parameter ``lines=True``. Parameter *flatten* uses the trick described at `Flattening JSON objects in Python <https://towardsdatascience.com/flattening-json-objects-in-python-f5343c794b10>`_. Examples: .. runpython:: :showcode: from io import BytesIO from pandas_streaming.df import StreamingDataFrame data = b'''{"a": 1, "b": 2} {"a": 3, "b": 4}''' it = StreamingDataFrame.read_json(BytesIO(data), lines=True) dfs = list(it) print(dfs) .. runpython:: :showcode: from io import BytesIO from pandas_streaming.df import StreamingDataFrame data = b'''[{"a": 1, "b": 2}, {"a": 3, "b": 4}]''' it = StreamingDataFrame.read_json(BytesIO(data)) dfs = list(it) print(dfs) :githublink:`%|py|206` """ if not isinstance(chunksize, int) or chunksize <= 0: raise ValueError( # pragma: no cover 'chunksize must be a positive integer') kwargs_create = StreamingDataFrame._process_kwargs(kwargs) if isinstance(args[0], (list, dict)): if flatten: return StreamingDataFrame.read_df(json_normalize(args[0]), **kwargs_create) return StreamingDataFrame.read_df(args[0], **kwargs_create) if kwargs.get('lines', None) == 'stream': del kwargs['lines'] st = JsonIterator2Stream(enumerate_json_items( args[0], encoding=kwargs.get('encoding', None), lines=True, flatten=flatten)) args = args[1:] if chunksize is None: return StreamingDataFrame( lambda: pandas.read_json( st, *args, chunksize=None, lines=True, **kwargs), **kwargs_create) def fct1(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): for r in pandas.read_json(st, *args, chunksize=chunksize, nrows=chunksize, lines=True, **kw): yield r return StreamingDataFrame(fct1, **kwargs_create) if kwargs.get('lines', False): if flatten: raise NotImplementedError( "flatten==True is implemented with option lines='stream'") if chunksize is None: return StreamingDataFrame( lambda: pandas.read_json(*args, chunksize=None, **kwargs), **kwargs_create) def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()): for r in pandas.read_json(*args, chunksize=chunksize, nrows=chunksize, **kw): yield r return StreamingDataFrame(fct2, **kwargs_create) st = JsonIterator2Stream(enumerate_json_items( args[0], encoding=kwargs.get('encoding', None), flatten=flatten)) args = args[1:] if 'lines' in kwargs: del kwargs['lines'] if chunksize is None: return StreamingDataFrame( lambda: pandas.read_json( st, *args, chunksize=chunksize, lines=True, **kwargs), **kwargs_create) def fct3(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()): for r in pandas.read_json(st, *args, chunksize=chunksize, nrows=chunksize, lines=True, **kw): yield r return StreamingDataFrame(fct3, **kwargs_create)
[docs] @staticmethod def read_csv(*args, **kwargs) -> 'StreamingDataFrame': """ Reads a :epkg:`csv` file or buffer as an iterator on :epkg:`DataFrame`. The signature is the same as :epkg:`pandas:read_csv`. The important parameter is *chunksize* which defines the number of rows to parse in a single bloc. If not specified, it will be equal to 100000. :githublink:`%|py|276` """ if not kwargs.get('iterator', True): raise ValueError("If specified, iterator must be True.") if not kwargs.get('chunksize', 100000): raise ValueError("If specified, chunksize must not be None.") kwargs_create = StreamingDataFrame._process_kwargs(kwargs) kwargs['iterator'] = True if 'chunksize' not in kwargs: kwargs['chunksize'] = 100000 return StreamingDataFrame(lambda: pandas.read_csv(*args, **kwargs), **kwargs_create)
[docs] @staticmethod def read_str(text, **kwargs) -> 'StreamingDataFrame': """ Reads a :epkg:`DataFrame` as an iterator on :epkg:`DataFrame`. The signature is the same as :epkg:`pandas:read_csv`. The important parameter is *chunksize* which defines the number of rows to parse in a single bloc. :githublink:`%|py|294` """ if not kwargs.get('iterator', True): raise ValueError("If specified, iterator must be True.") if not kwargs.get('chunksize', 100000): raise ValueError("If specified, chunksize must not be None.") kwargs_create = StreamingDataFrame._process_kwargs(kwargs) kwargs['iterator'] = True if 'chunksize' not in kwargs: kwargs['chunksize'] = 100000 if isinstance(text, str): buffer = StringIO(text) else: buffer = BytesIO(text) return StreamingDataFrame( lambda: pandas.read_csv(buffer, **kwargs), **kwargs_create)
[docs] @staticmethod def read_df(df, chunksize=None, check_schema=True) -> 'StreamingDataFrame': """ Splits a :epkg:`DataFrame` into small chunks mostly for unit testing purposes. :param df: :epkg:`DataFrame` :param chunksize: number rows per chunks (// 10 by default) :param check_schema: check schema between two iterations :return: iterator on :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` :githublink:`%|py|320` """ if chunksize is None: if hasattr(df, 'shape'): chunksize = df.shape[0] else: raise NotImplementedError( "Cannot retrieve size to infer chunksize for type={0}".format(type(df))) if hasattr(df, 'shape'): size = df.shape[0] else: raise NotImplementedError( "Cannot retrieve size for type={0}".format(type(df))) def local_iterator(): "local iterator" for i in range(0, size, chunksize): end = min(size, i + chunksize) yield df[i:end].copy() return StreamingDataFrame(local_iterator, check_schema=check_schema)
[docs] def __iter__(self): """ Iterator on a large file with a sliding window. Each windows is a :epkg:`DataFrame`. The method stores a copy of the initial iterator and restores it after the end of the iterations. If *check_schema* was enabled when calling the constructor, the method checks that every :epkg:`DataFrame` follows the same schema as the first chunck. Even with a big chunk size, it might happen that consecutive chunks might detect different type for one particular column. An error message shows up saying ``Column types are different after row`` with more information about the column which failed. In that case, :epkg:`pandas:DataFrame.read_csv` can overwrite the type on one column by specifying ``dtype={column_name: new_type}``. It frequently happens when a string column has many missing values. :githublink:`%|py|360` """ iters = self.iter_creation() sch = None rows = 0 for it in iters: if sch is None: sch = (list(it.columns), list(it.dtypes)) elif self.check_schema: if list(it.columns) != sch[0]: # pylint: disable=E1136 raise StreamingDataFrameSchemaError( # pragma: no cover 'Column names are different after row {0}\nFirst chunk: {1}' '\nCurrent chunk: {2}'.format( rows, sch[0], list(it.columns))) # pylint: disable=E1136 if list(it.dtypes) != sch[1]: # pylint: disable=E1136 errdf = pandas.DataFrame( dict(names=sch[0], schema1=sch[1], schema2=list(it.dtypes))) # pylint: disable=E1136 tdf = StringIO() errdf['diff'] = errdf['schema2'] != errdf['schema1'] errdf = errdf[errdf['diff']] errdf.to_csv(tdf, sep=",") msg = 'Column types are different after row {0}\n{1}' raise StreamingDataFrameSchemaError( msg.format(rows, tdf.getvalue())) rows += it.shape[0] yield it
[docs] def sort_values(self, *args, **kwargs): """ Not implemented. :githublink:`%|py|389` """ raise StreamingInefficientException(StreamingDataFrame.sort_values)
@property def shape(self): """ This is the kind of operations you do not want to do when a file is large because it goes through the whole stream just to get the number of rows. :githublink:`%|py|398` """ nl, nc = 0, 0 for it in self: nc = max(it.shape[1], nc) nl += it.shape[0] return nl, nc @property def columns(self): """ See :epkg:`pandas:DataFrame:columns`. :githublink:`%|py|409` """ for it in self: return it.columns @property def dtypes(self): """ See :epkg:`pandas:DataFrame:dtypes`. :githublink:`%|py|417` """ for it in self: return it.dtypes
[docs] def to_csv(self, path_or_buf=None, **kwargs) -> 'StreamingDataFrame': """ Saves the :epkg:`DataFrame` into string. See :epkg:`pandas:DataFrame.to_csv`. :githublink:`%|py|425` """ if path_or_buf is None: st = StringIO() close = False elif isinstance(path_or_buf, str): st = open(path_or_buf, "w", encoding=kwargs.get('encoding')) close = True else: st = path_or_buf close = False for df in self: df.to_csv(st, **kwargs) kwargs['header'] = False if close: st.close() if isinstance(st, StringIO): return st.getvalue() return path_or_buf
[docs] def to_dataframe(self) -> pandas.DataFrame: """ Converts everything into a single :epkg:`DataFrame`. :githublink:`%|py|449` """ return pandas.concat(self, axis=0)
[docs] def to_df(self) -> pandas.DataFrame: """ Converts everything into a single :epkg:`DataFrame`. :githublink:`%|py|455` """ return self.to_dataframe()
[docs] def iterrows(self): """ See :epkg:`pandas:DataFrame:iterrows`. :githublink:`%|py|461` """ for df in self: for it in df.iterrows(): yield it
[docs] def head(self, n=5) -> pandas.DataFrame: """ Returns the first rows as a :epkg:`DataFrame`. :githublink:`%|py|469` """ st = [] total = 0 for df in self: h = df.head(n=n) total += h.shape[0] st.append(h) if total >= n: break n -= h.shape[0] if len(st) == 1: return st[0] if len(st) == 0: return None return pandas.concat(st, axis=0)
[docs] def tail(self, n=5) -> pandas.DataFrame: """ Returns the last rows as a :epkg:`DataFrame`. The size of chunks must be greater than ``n`` to get ``n`` lines. This method is not efficient because the whole dataset must be walked through. :githublink:`%|py|491` """ for df in self: h = df.tail(n=n) return h
[docs] def where(self, *args, **kwargs) -> 'StreamingDataFrame': """ Applies :epkg:`pandas:DataFrame:where`. *inplace* must be False. This function returns a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. :githublink:`%|py|501` """ kwargs['inplace'] = False return StreamingDataFrame( lambda: map(lambda df: df.where(*args, **kwargs), self), **self.get_kwargs())
[docs] def sample(self, reservoir=False, cache=False, **kwargs) -> 'StreamingDataFrame': """ See :epkg:`pandas:DataFrame:sample`. Only *frac* is available, otherwise choose :meth:`reservoir_sampling`. This function returns a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. :param reservoir: use `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_ :param cache: cache the sample :param kwargs: additional parameters for :epkg:`pandas:DataFrame:sample` If *cache* is True, the sample is cached (assuming it holds in memory). The second time an iterator walks through the :githublink:`%|py|520` """ if reservoir or 'n' in kwargs: if 'frac' in kwargs: raise ValueError( 'frac cannot be specified for reservoir sampling.') return self._reservoir_sampling(cache=cache, n=kwargs['n'], random_state=kwargs.get('random_state')) if cache: sdf = self.sample(cache=False, **kwargs) df = sdf.to_df() return StreamingDataFrame.read_df(df, chunksize=df.shape[0]) return StreamingDataFrame(lambda: map(lambda df: df.sample(**kwargs), self), **self.get_kwargs(), stable=False)
[docs] def _reservoir_sampling(self, cache=True, n=1000, random_state=None) -> 'StreamingDataFrame': """ Uses the `reservoir sampling <https://en.wikipedia.org/wiki/Reservoir_sampling>`_ algorithm to draw a random sample with exactly *n* samples. :param cache: cache the sample :param n: number of observations to keep :param random_state: sets the random_state :return: :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` .. warning:: The sample is split by chunks of size 1000. This parameter is not yet exposed. :githublink:`%|py|545` """ if not cache: raise ValueError( "cache=False is not available for reservoir sampling.") indices = [] seen = 0 for i, df in enumerate(self): for ir, _ in enumerate(df.iterrows()): seen += 1 if len(indices) < n: indices.append((i, ir)) else: x = nrandom.random() # pylint: disable=E1101 if x * n < (seen - n): k = nrandom.randint(0, len(indices) - 1) indices[k] = (i, ir) # pylint: disable=E1126 indices = set(indices) def reservoir_iterate(sdf, indices, chunksize): "iterator" buffer = [] for i, df in enumerate(self): for ir, row in enumerate(df.iterrows()): if (i, ir) in indices: buffer.append(row) if len(buffer) >= chunksize: yield pandas.DataFrame(buffer) buffer.clear() if len(buffer) > 0: yield pandas.DataFrame(buffer) return StreamingDataFrame( lambda: reservoir_iterate(sdf=self, indices=indices, chunksize=1000))
[docs] def apply(self, *args, **kwargs) -> 'StreamingDataFrame': """ Applies :epkg:`pandas:DataFrame:apply`. This function returns a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. :githublink:`%|py|583` """ return StreamingDataFrame( lambda: map(lambda df: df.apply(*args, **kwargs), self), **self.get_kwargs())
[docs] def applymap(self, *args, **kwargs) -> 'StreamingDataFrame': """ Applies :epkg:`pandas:DataFrame:applymap`. This function returns a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. :githublink:`%|py|592` """ return StreamingDataFrame( lambda: map(lambda df: df.applymap(*args, **kwargs), self), **self.get_kwargs())
[docs] def merge(self, right, **kwargs) -> 'StreamingDataFrame': """ Merges two :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` and returns :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. *right* can be either a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` or simply a :epkg:`pandas:DataFrame`. It calls :epkg:`pandas:DataFrame:merge` in a double loop, loop on *self*, loop on *right*. :githublink:`%|py|603` """ if isinstance(right, pandas.DataFrame): return self.merge(StreamingDataFrame.read_df(right, chunksize=right.shape[0]), **kwargs) def iterator_merge(sdf1, sdf2, **kw): "iterate on dataframes" for df1 in sdf1: for df2 in sdf2: df = df1.merge(df2, **kw) yield df return StreamingDataFrame( lambda: iterator_merge(self, right, **kwargs), **self.get_kwargs())
[docs] def concat(self, others, axis=0) -> 'StreamingDataFrame': """ Concatenates :epkg:`dataframes`. The function ensures all :epkg:`pandas:DataFrame` or :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` share the same columns (name and type). Otherwise, the function fails as it cannot guess the schema without walking through all :epkg:`dataframes`. :param others: list, enumeration, :epkg:`pandas:DataFrame` :return: :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` :githublink:`%|py|626` """ if axis == 1: return self._concath(others) if axis == 0: return self._concatv(others) raise ValueError("axis must be 0 or 1") # pragma: no cover
[docs] def _concath(self, others): if not isinstance(others, list): others = [others] def iterateh(self, others): cols = tuple([self] + others) for dfs in zip(*cols): nrows = [_.shape[0] for _ in dfs] if min(nrows) != max(nrows): raise RuntimeError( "StreamingDataFram cannot merge DataFrame with different size or chunksize") yield pandas.concat(list(dfs), axis=1) return StreamingDataFrame(lambda: iterateh(self, others), **self.get_kwargs())
[docs] def _concatv(self, others): def iterator_concat(this, lothers): "iterator on dataframes" columns = None dtypes = None for df in this: if columns is None: columns = df.columns dtypes = df.dtypes yield df for obj in lothers: check = True for i, df in enumerate(obj): if check: if list(columns) != list(df.columns): raise ValueError( "Frame others[{0}] do not have the same column names or the same order.".format(i)) if list(dtypes) != list(df.dtypes): raise ValueError( "Frame others[{0}] do not have the same column types.".format(i)) check = False yield df if isinstance(others, pandas.DataFrame): others = [others] elif isinstance(others, StreamingDataFrame): others = [others] def change_type(obj): "change column type" if isinstance(obj, pandas.DataFrame): return StreamingDataFrame.read_df(obj, obj.shape[0]) else: return obj others = list(map(change_type, others)) return StreamingDataFrame( lambda: iterator_concat(self, others), **self.get_kwargs())
[docs] def groupby(self, by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, **kwargs) -> pandas.DataFrame: """ Implements the streaming :epkg:`pandas:DataFrame:groupby`. We assume the result holds in memory. The out-of-memory is not implemented yet. :param by: see :epkg:`pandas:DataFrame:groupby` :param in_memory: in-memory algorithm :param lambda_agg: aggregation function, *sum* by default :param lambda_agg_agg: to aggregate the aggregations, *sum* by default :param kwargs: additional parameters for :epkg:`pandas:DataFrame:groupby` :return: :epkg:`pandas:DataFrame` As the input :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`. This second one is always a **sum**. As a consequence, this function should not compute any *mean* or *count*, only *sum* because we do not know the size of each iterated :epkg:`dataframe`. To compute an average, sum and weights must be aggregated. Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default. It could also be ``lambda gr: gr.max()`` or ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()`` as it would lead to incoherent results. .. exref:: :title: StreamingDataFrame and groupby :tag: streaming Here is an example which shows how to write a simple *groupby* with :epkg:`pandas` and :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. .. runpython:: :showcode: from pandas import DataFrame from pandas_streaming.df import StreamingDataFrame df = DataFrame(dict(A=[3, 4, 3], B=[5,6, 7])) sdf = StreamingDataFrame.read_df(df) # The following: print(sdf.groupby("A", lambda gr: gr.sum())) # Is equivalent to: print(df.groupby("A").sum()) :githublink:`%|py|738` """ if not in_memory: raise NotImplementedError( "Out-of-memory group by is not implemented.") if lambda_agg is None: def lambda_agg_(gr): "sum" return gr.sum() lambda_agg = lambda_agg_ if lambda_agg_agg is None: def lambda_agg_agg_(gr): "sum" return gr.sum() lambda_agg_agg = lambda_agg_agg_ ckw = kwargs.copy() ckw["as_index"] = False agg = [] for df in self: gr = df.groupby(by=by, **ckw) agg.append(lambda_agg(gr)) conc = pandas.concat(agg, sort=False) return lambda_agg_agg(conc.groupby(by=by, **kwargs))
[docs] def groupby_streaming(self, by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, strategy='cum', **kwargs) -> pandas.DataFrame: """ Implements the streaming :epkg:`pandas:DataFrame:groupby`. We assume the result holds in memory. The out-of-memory is not implemented yet. :param by: see :epkg:`pandas:DataFrame:groupby` :param in_memory: in-memory algorithm :param lambda_agg: aggregation function, *sum* by default :param lambda_agg_agg: to aggregate the aggregations, *sum* by default :param kwargs: additional parameters for :epkg:`pandas:DataFrame:groupby` :param strategy: ``'cum'``, or ``'streaming'``, see below :return: :epkg:`pandas:DataFrame` As the input :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated :epkg:`dataframe`, another one to combine all the reduced :epkg:`dataframes`. This second one is always a **sum**. As a consequence, this function should not compute any *mean* or *count*, only *sum* because we do not know the size of each iterated :epkg:`dataframe`. To compute an average, sum and weights must be aggregated. Parameter *lambda_agg* is ``lambda gr: gr.sum()`` by default. It could also be ``lambda gr: gr.max()`` or ``lambda gr: gr.min()`` but not ``lambda gr: gr.mean()`` as it would lead to incoherent results. Parameter *strategy* allows three scenarios. First one if ``strategy is None`` goes through the whole datasets to produce a final :epkg:`DataFrame`. Second if ``strategy=='cum'`` returns a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`, each iteration produces the current status of the *group by*. Last case, ``strategy=='streaming'`` produces :epkg:`DataFrame` which must be concatenated into a single :epkg:`DataFrame` and grouped again to get the results. .. exref:: :title: StreamingDataFrame and groupby :tag: streaming Here is an example which shows how to write a simple *groupby* with :epkg:`pandas` and :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`. .. runpython:: :showcode: from pandas import DataFrame from pandas_streaming.df import StreamingDataFrame from pandas_streaming.data import dummy_streaming_dataframe df20 = dummy_streaming_dataframe(20).to_dataframe() df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0) sdf20 = StreamingDataFrame.read_df(df20, chunksize=5) sgr = sdf20.groupby_streaming("key", lambda gr: gr.sum(), strategy='cum', as_index=False) for gr in sgr: print() print(gr) :githublink:`%|py|824` """ if not in_memory: raise NotImplementedError( "Out-of-memory group by is not implemented.") if lambda_agg is None: def lambda_agg_(gr): "sum" return gr.sum() lambda_agg = lambda_agg_ if lambda_agg_agg is None: def lambda_agg_agg_(gr): "sum" return gr.sum() lambda_agg_agg = lambda_agg_agg_ ckw = kwargs.copy() ckw["as_index"] = False if strategy == 'cum': def iterate_cum(): agg = None for df in self: gr = df.groupby(by=by, **ckw) gragg = lambda_agg(gr) if agg is None: yield lambda_agg_agg(gragg.groupby(by=by, **kwargs)) agg = gragg else: lagg = pandas.concat([agg, gragg], sort=False) yield lambda_agg_agg(lagg.groupby(by=by, **kwargs)) agg = lagg return StreamingDataFrame(lambda: iterate_cum(), **self.get_kwargs()) if strategy == 'streaming': def iterate_streaming(): for df in self: gr = df.groupby(by=by, **ckw) gragg = lambda_agg(gr) yield lambda_agg(gragg.groupby(by=by, **kwargs)) return StreamingDataFrame(lambda: iterate_streaming(), **self.get_kwargs()) raise ValueError( # pragma: no cover "Unknown strategy '{0}'".format(strategy))
[docs] def ensure_dtype(self, df, dtypes): """ Ensures the :epkg:`dataframe` *df* has types indicated in dtypes. Changes it if not. :param df: dataframe :param dtypes: list of types :return: updated? :githublink:`%|py|875` """ ch = False cols = df.columns for i, (has, exp) in enumerate(zip(df.dtypes, dtypes)): if has != exp: name = cols[i] df[name] = df[name].astype(exp) ch = True return ch
[docs] def __getitem__(self, *args): """ Implements some of the functionalities :epkg:`pandas` offers for the operator ``[]``. :githublink:`%|py|889` """ if len(args) != 1: raise NotImplementedError("Only a list of columns is supported.") cols = args[0] if not isinstance(cols, list): raise NotImplementedError("Only a list of columns is supported.") def iterate_cols(sdf): "iterate on columns" for df in sdf: yield df[cols] return StreamingDataFrame(lambda: iterate_cols(self), **self.get_kwargs())
[docs] def add_column(self, col, value): """ Implements some of the functionalities :epkg:`pandas` offers for the operator ``[]``. :param col: new column :param value: :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` or a lambda function :return: :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` ..note:: If value is a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`, *chunksize* must be the same for both. .. exref:: :title: Add a new column to a StreamingDataFrame :tag: streaming .. runpython:: :showcode: from pandas import DataFrame from pandas_streaming.df import StreamingDataFrame df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"])) sdf = StreamingDataFrame.read_df(df) sdf2 = sdf.add_column("d", lambda row: int(1)) print(sdf2.to_dataframe()) sdf2 = sdf.add_column("d", lambda row: int(1)) print(sdf2.to_dataframe()) :githublink:`%|py|935` """ if not isinstance(col, str): raise NotImplementedError( "Only a column as a string is supported.") if isfunction(value): def iterate_fct(self, value, col): "iterate on rows" for df in self: dfc = df.copy() dfc.insert(dfc.shape[1], col, dfc.apply(value, axis=1)) yield dfc return StreamingDataFrame(lambda: iterate_fct(self, value, col), **self.get_kwargs()) if isinstance(value, (pandas.Series, pandas.DataFrame, StreamingDataFrame)): raise NotImplementedError( "Unable set a new column based on a datadframe.") def iterate_cst(self, value, col): "iterate on rows" for df in self: dfc = df.copy() dfc[col] = value yield dfc return StreamingDataFrame( lambda: iterate_cst(self, value, col), **self.get_kwargs())
[docs] def fillna(self, **kwargs): """ Replaces the missing values, calls :epkg:`pandas:DataFrame:fillna`. :param kwargs: see :epkg:`pandas:DataFrame:fillna` :return: :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>` .. warning:: The function does not check what happens at the limit of every chunk of data. Anything but a constant value will probably have an inconsistent behaviour. :githublink:`%|py|976` """ def iterate_na(self, **kwargs): "iterate on rows" if kwargs.get('inplace', True): kwargs['inplace'] = True for df in self: df.fillna(**kwargs) yield df else: for df in self: yield df.fillna(**kwargs) return StreamingDataFrame( lambda: iterate_na(self, **kwargs), **self.get_kwargs())