module df.dataframe

Inheritance diagram of pandas_streaming.df.dataframe

Short summary

module pandas_streaming.df.dataframe

Defines a streaming dataframe.

source on GitHub

Classes

class

truncated documentation

StreamingDataFrame

Defines a streaming dataframe. The goal is to reduce the memory footprint. The class takes a function which creates …

StreamingDataFrameSchemaError

Reveals an issue with inconsistant schemas.

Properties

property

truncated documentation

columns

See pandas.DataFrame.columns.

dtypes

See pandas.DataFrame.dtypes.

shape

This is the kind of operations you do not want to do when a file is large because it goes through the whole …

Static Methods

staticmethod

truncated documentation

_process_kwargs

Filters out parameters for the constructor of this class.

read_csv

Reads a csv file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_csv. …

read_df

Splits a DataFrame into small chunks mostly for unit testing purposes.

read_json

Reads a json file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_json. …

read_str

Reads a DataFrame as an iterator on DataFrame. The signature is the same as pandas.read_csv. …

Methods

method

truncated documentation

__getitem__

Implements some of the functionalities pandas offers for the operator [].

__init__

__iter__

Iterator on a large file with a sliding window. Each windows is a DataFrame. The method stores a …

_concath

_concatv

_reservoir_sampling

Uses the reservoir sampling algorithm to draw a random sample …

add_column

Implements some of the functionalities pandas offers for the operator [].

apply

Applies pandas.DataFrame.apply. This function returns a StreamingDataFrame.

applymap

Applies pandas.DataFrame.applymap. This function returns a StreamingDataFrame.

concat

Concatenates dataframes. The function ensures all pandas.DataFrame or StreamingDataFrame

ensure_dtype

Ensures the dataframe df has types indicated in dtypes. Changes it if not.

fillna

Replaces the missing values, calls pandas.DataFrame.fillna.

get_kwargs

Returns the parameters used to call the constructor.

groupby

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory …

groupby_streaming

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory …

head

Returns the first rows as a DataFrame.

is_stable

Tells if the dataframe is supposed to be stable.

iterrows

See pandas.DataFrame.iterrows.

merge

Merges two StreamingDataFrame and returns StreamingDataFrame. right can be either a StreamingDataFrame

sample

See pandas.DataFrame.sample. Only frac is available, otherwise choose reservoir_sampling(). …

sort_values

Not implemented.

tail

Returns the last rows as a DataFrame. The size of chunks must be greater than n to get n

to_csv

Saves the DataFrame into string. See pandas.DataFrame.to_csv.

to_dataframe

Converts everything into a single DataFrame.

to_df

Converts everything into a single DataFrame.

train_test_split

Randomly splits a dataframe into smaller pieces. The function returns streams of file names. It …

where

Applies pandas.DataFrame.where. inplace must be False. This function returns a StreamingDataFrame. …

Documentation

Defines a streaming dataframe.

source on GitHub

class pandas_streaming.df.dataframe.StreamingDataFrame(iter_creation, check_schema=True, stable=True)[source]

Bases: object

Defines a streaming dataframe. The goal is to reduce the memory footprint. The class takes a function which creates an iterator on dataframe. We assume this function can be called multiple time. As a matter of fact, the function is called every time the class needs to walk through the stream with the following loop:

for df in self:  # self is a StreamingDataFrame
    # ...

The constructor cannot receive an iterator otherwise this class would be able to walk through the data only once. The main reason is it is impossible to pickle (or :epkg:`dill`) an iterator: it cannot be replicated. Instead, the class takes a function which generates an iterator on DataFrame. Most of the methods returns either a DataFrame either a StreamingDataFrame. In the second case, methods can be chained.

By default, the object checks that the schema remains the same between two chunks. This can be disabled by setting check_schema=False in the constructor.

The user should expect the data to remain stable. Every loop should produce the same data. However, in some situations, it is more efficient not to keep that constraints. Draw a random sample is one of these cases.

source on GitHub

Parameters
  • iter_creation – function which creates an iterator or an instance of StreamingDataFrame

  • check_schema – checks that the schema is the same for every dataframe

  • stable – indicates if the dataframe remains the same whenever it is walked through

source on GitHub

__getitem__(*args)[source]

Implements some of the functionalities pandas offers for the operator [].

source on GitHub

__init__(iter_creation, check_schema=True, stable=True)[source]
Parameters
  • iter_creation – function which creates an iterator or an instance of StreamingDataFrame

  • check_schema – checks that the schema is the same for every dataframe

  • stable – indicates if the dataframe remains the same whenever it is walked through

source on GitHub

__iter__()[source]

Iterator on a large file with a sliding window. Each windows is a DataFrame. The method stores a copy of the initial iterator and restores it after the end of the iterations. If check_schema was enabled when calling the constructor, the method checks that every DataFrame follows the same schema as the first chunck.

Even with a big chunk size, it might happen that consecutive chunks might detect different type for one particular column. An error message shows up saying Column types are different after row with more information about the column which failed. In that case, pandas.DataFrame.read_csv can overwrite the type on one column by specifying dtype={column_name: new_type}. It frequently happens when a string column has many missing values.

source on GitHub

_concath(others)[source]
_concatv(others)[source]
static _process_kwargs(kwargs)[source]

Filters out parameters for the constructor of this class.

source on GitHub

_reservoir_sampling(cache=True, n=1000, random_state=None) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Uses the reservoir sampling algorithm to draw a random sample with exactly n samples.

Parameters
  • cache – cache the sample

  • n – number of observations to keep

  • random_state – sets the random_state

Returns

StreamingDataFrame

Warning

The sample is split by chunks of size 1000. This parameter is not yet exposed.

source on GitHub

add_column(col, value)[source]

Implements some of the functionalities pandas offers for the operator [].

Parameters
Returns

StreamingDataFrame

..note:

If value is a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`,
*chunksize* must be the same for both.

Add a new column to a StreamingDataFrame

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame

df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"]))
sdf = StreamingDataFrame.read_df(df)
sdf2 = sdf.add_column("d", lambda row: int(1))
print(sdf2.to_dataframe())

sdf2 = sdf.add_column("d", lambda row: int(1))
print(sdf2.to_dataframe())

>>>

         X  Y  d
    0  4.5  a  1
    1  6.0  b  1
    2  7.0  c  1
         X  Y  d
    0  4.5  a  1
    1  6.0  b  1
    2  7.0  c  1

source on GitHub

apply(*args, **kwargs) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Applies pandas.DataFrame.apply. This function returns a StreamingDataFrame.

source on GitHub

applymap(*args, **kwargs) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Applies pandas.DataFrame.applymap. This function returns a StreamingDataFrame.

source on GitHub

property columns

See pandas.DataFrame.columns.

source on GitHub

concat(others, axis=0) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Concatenates dataframes. The function ensures all pandas.DataFrame or StreamingDataFrame share the same columns (name and type). Otherwise, the function fails as it cannot guess the schema without walking through all dataframes.

Parameters

others – list, enumeration, pandas.DataFrame

Returns

StreamingDataFrame

source on GitHub

property dtypes

See pandas.DataFrame.dtypes.

source on GitHub

ensure_dtype(df, dtypes)[source]

Ensures the dataframe df has types indicated in dtypes. Changes it if not.

Parameters
  • df – dataframe

  • dtypes – list of types

Returns

updated?

source on GitHub

fillna(**kwargs)[source]

Replaces the missing values, calls pandas.DataFrame.fillna.

Parameters

kwargs – see pandas.DataFrame.fillna

Returns

StreamingDataFrame

Warning

The function does not check what happens at the limit of every chunk of data. Anything but a constant value will probably have an inconsistent behaviour.

source on GitHub

get_kwargs()[source]

Returns the parameters used to call the constructor.

source on GitHub

groupby(by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, **kwargs) → pandas.core.frame.DataFrame[source]

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory is not implemented yet.

Parameters
  • by – see pandas.DataFrame.groupby

  • in_memory – in-memory algorithm

  • lambda_agg – aggregation function, sum by default

  • lambda_agg_agg – to aggregate the aggregations, sum by default

  • kwargs – additional parameters for pandas.DataFrame.groupby

Returns

pandas.DataFrame

As the input StreamingDataFrame does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated dataframe, another one to combine all the reduced dataframes. This second one is always a sum. As a consequence, this function should not compute any mean or count, only sum because we do not know the size of each iterated dataframe. To compute an average, sum and weights must be aggregated.

Parameter lambda_agg is lambda gr: gr.sum() by default. It could also be lambda gr: gr.max() or lambda gr: gr.min() but not lambda gr: gr.mean() as it would lead to incoherent results.

StreamingDataFrame and groupby

Here is an example which shows how to write a simple groupby with pandas and StreamingDataFrame.

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame

df = DataFrame(dict(A=[3, 4, 3], B=[5, 6, 7]))
sdf = StreamingDataFrame.read_df(df)

# The following:
print(sdf.groupby("A", lambda gr: gr.sum()))

# Is equivalent to:
print(df.groupby("A").sum())

>>>

        B
    A    
    3  12
    4   6
        B
    A    
    3  12
    4   6

source on GitHub

groupby_streaming(by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, strategy='cum', **kwargs) → pandas.core.frame.DataFrame[source]

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory is not implemented yet.

Parameters
  • by – see pandas.DataFrame.groupby

  • in_memory – in-memory algorithm

  • lambda_agg – aggregation function, sum by default

  • lambda_agg_agg – to aggregate the aggregations, sum by default

  • kwargs – additional parameters for pandas.DataFrame.groupby

  • strategy'cum', or 'streaming', see below

Returns

pandas.DataFrame

As the input StreamingDataFrame does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated dataframe, another one to combine all the reduced dataframes. This second one is always a sum. As a consequence, this function should not compute any mean or count, only sum because we do not know the size of each iterated dataframe. To compute an average, sum and weights must be aggregated.

Parameter lambda_agg is lambda gr: gr.sum() by default. It could also be lambda gr: gr.max() or lambda gr: gr.min() but not lambda gr: gr.mean() as it would lead to incoherent results.

Parameter strategy allows three scenarios. First one if strategy is None goes through the whole datasets to produce a final DataFrame. Second if strategy=='cum' returns a StreamingDataFrame, each iteration produces the current status of the group by. Last case, strategy=='streaming' produces DataFrame which must be concatenated into a single DataFrame and grouped again to get the results.

StreamingDataFrame and groupby

Here is an example which shows how to write a simple groupby with pandas and StreamingDataFrame.

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame
from pandas_streaming.data import dummy_streaming_dataframe

df20 = dummy_streaming_dataframe(20).to_dataframe()
df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0)
sdf20 = StreamingDataFrame.read_df(df20, chunksize=5)
sgr = sdf20.groupby_streaming(
    "key", lambda gr: gr.sum(), strategy='cum', as_index=False)
for gr in sgr:
    print()
    print(gr)

>>>

    
         key  cint
    0  False     7
    1   True     3
    
         key  cint
    0  False    27
    1   True    18
    
         key  cint
    0  False    75
    1   True    30
    
         key  cint
    0  False   127
    1   True    63

source on GitHub

head(n=5) → pandas.core.frame.DataFrame[source]

Returns the first rows as a DataFrame.

source on GitHub

is_stable(do_check=False, n=10)[source]

Tells if the dataframe is supposed to be stable.

Parameters
  • do_check – do not trust the value sent to the constructor

  • n – number of rows used to check the stability, None for all rows

Returns

boolean

do_check=True means the methods checks the first n rows remains the same for two iterations.

source on GitHub

iterrows()[source]

See pandas.DataFrame.iterrows.

source on GitHub

merge(right, **kwargs) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Merges two StreamingDataFrame and returns StreamingDataFrame. right can be either a StreamingDataFrame or simply a pandas.DataFrame. It calls pandas.DataFrame.merge in a double loop, loop on self, loop on right.

source on GitHub

static read_csv(*args, **kwargs) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Reads a csv file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_csv. The important parameter is chunksize which defines the number of rows to parse in a single bloc. If not specified, it will be equal to 100000.

source on GitHub

static read_df(df, chunksize=None, check_schema=True) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Splits a DataFrame into small chunks mostly for unit testing purposes.

Parameters
  • dfDataFrame

  • chunksize – number rows per chunks (// 10 by default)

  • check_schema – check schema between two iterations

Returns

iterator on StreamingDataFrame

source on GitHub

static read_json(*args, chunksize=100000, flatten=False, **kwargs) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Reads a json file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_json. The important parameter is chunksize which defines the number of rows to parse in a single bloc and it must be defined to return an iterator. If lines is True, the function falls back into pandas.read_json, otherwise it used enumerate_json_items. If lines is ``’stream’``, :func:`enumerate_json_items <pandas_streaming.df.dataframe_io_helpers.enumerate_json_items>` is called with parameter ``lines=True``. Parameter *flatten uses the trick described at ` <https://towardsdatascience.com/flattening-json-objects-in-python-f5343c794b10>`_. Examples:

.. runpython::
showcode

from pandas_streaming.df import StreamingDataFrame

data = ‘’‘{“a”: 1, “b”: 2}

{“a”: 3, “b”: 4}’‘’

it = StreamingDataFrame.read_json(StringIO(data), lines=True) dfs = list(it) print(dfs)

<<<

from io import StringIO
from pandas_streaming.df import StreamingDataFrame

data = '''[{"a": 1,
            "b": 2},
           {"a": 3,
            "b": 4}]'''

it = StreamingDataFrame.read_json(StringIO(data))
dfs = list(it)
print(dfs)

>>>

    [   a  b
    0  1  2
    1  3  4]

source on GitHub

static read_str(text, **kwargs) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Reads a DataFrame as an iterator on DataFrame. The signature is the same as pandas.read_csv. The important parameter is chunksize which defines the number of rows to parse in a single bloc.

source on GitHub

sample(reservoir=False, cache=False, **kwargs) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

See pandas.DataFrame.sample. Only frac is available, otherwise choose reservoir_sampling(). This function returns a StreamingDataFrame.

Parameters

If cache is True, the sample is cached (assuming it holds in memory). The second time an iterator walks through the

source on GitHub

property shape

This is the kind of operations you do not want to do when a file is large because it goes through the whole stream just to get the number of rows.

source on GitHub

sort_values(*args, **kwargs)[source]

Not implemented.

source on GitHub

tail(n=5) → pandas.core.frame.DataFrame[source]

Returns the last rows as a DataFrame. The size of chunks must be greater than n to get n lines. This method is not efficient because the whole dataset must be walked through.

source on GitHub

to_csv(path_or_buf=None, **kwargs) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Saves the DataFrame into string. See pandas.DataFrame.to_csv.

source on GitHub

to_dataframe() → pandas.core.frame.DataFrame[source]

Converts everything into a single DataFrame.

source on GitHub

to_df() → pandas.core.frame.DataFrame[source]

Converts everything into a single DataFrame.

source on GitHub

train_test_split(path_or_buf=None, export_method='to_csv', names=None, streaming=True, partitions=None, **kwargs)[source]

Randomly splits a dataframe into smaller pieces. The function returns streams of file names. It chooses one of the options from module dataframe_split.

Parameters
  • path_or_buf – a string, a list of strings or buffers, if it is a string, it must contain {} like partition{}.txt, if None, the function returns strings.

  • export_method – method used to store the partitions, by default pandas.DataFrame.to_csv, additional parameters will be given to that function

  • names – partitions names, by default ('train', 'test')

  • kwargs – parameters for the export function and sklearn.model_selection.train_test_split.

  • streaming – the function switches to a streaming version of the algorithm.

  • partitions – splitting partitions

Returns

outputs of the exports functions or two StreamingDataFrame if path_or_buf is None.

The streaming version of this algorithm is implemented by function sklearn_train_test_split_streaming. Its documentation indicates the limitation of the streaming version and gives some insights about the additional parameters.

source on GitHub

where(*args, **kwargs) → pandas_streaming.df.dataframe.StreamingDataFrame[source]

Applies pandas.DataFrame.where. inplace must be False. This function returns a StreamingDataFrame.

source on GitHub

exception pandas_streaming.df.dataframe.StreamingDataFrameSchemaError[source]

Bases: Exception

Reveals an issue with inconsistant schemas.

source on GitHub