module df.dataframe

Inheritance diagram of pandas_streaming.df.dataframe

Short summary

module pandas_streaming.df.dataframe

Defines a streaming dataframe.

source on GitHub

Classes

class

truncated documentation

StreamingDataFrame

Defines a streaming dataframe. The goal is to reduce the memory footprint. The class takes a function which creates …

StreamingDataFrameSchemaError

Reveals an issue with inconsistant schemas.

StreamingSeries

Seens as a StreamingDataFrame of one column.

Properties

property

truncated documentation

columns

See pandas.DataFrame.columns.

columns

See pandas.DataFrame.columns.

dtypes

See pandas.DataFrame.dtypes.

dtypes

See pandas.DataFrame.dtypes.

shape

This is the kind of operations you do not want to do when a file is large because it goes through the whole …

shape

This is the kind of operations you do not want to do when a file is large because it goes through the whole …

Static Methods

staticmethod

truncated documentation

_process_kwargs

Filters out parameters for the constructor of this class.

_process_kwargs

Filters out parameters for the constructor of this class.

read_csv

Reads a csv file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_csv. …

read_csv

Reads a csv file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_csv. …

read_df

Splits a DataFrame into small chunks mostly for unit testing purposes.

read_df

Splits a DataFrame into small chunks mostly for unit testing purposes.

read_json

Reads a json file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_json. …

read_json

Reads a json file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_json. …

read_str

Reads a DataFrame as an iterator on DataFrame. The signature is the same as pandas.read_csv. …

read_str

Reads a DataFrame as an iterator on DataFrame. The signature is the same as pandas.read_csv. …

Methods

method

truncated documentation

__add__

Does an addition on every value hoping that has a meaning.

__del__

Calls every function in _delete_.

__del__

Calls every function in _delete_.

__getitem__

Implements some of the functionalities pandas offers for the operator [].

__getitem__

Implements some of the functionalities pandas offers for the operator [].

__init__

__init__

__iter__

Iterator on a large file with a sliding window. Each windows is a DataFrame. The method stores a …

__iter__

Iterator on a large file with a sliding window. Each windows is a DataFrame. The method stores a …

__setitem__

Limited set of operators are supported.

__setitem__

Limited set of operators are supported.

_concath

_concath

_concatv

_concatv

_reservoir_sampling

Uses the reservoir sampling algorithm to draw a random sample …

_reservoir_sampling

Uses the reservoir sampling algorithm to draw a random sample …

add_column

Implements some of the functionalities pandas offers for the operator [].

add_column

Implements some of the functionalities pandas offers for the operator [].

apply

Applies pandas.DataFrame.apply. This function returns a StreamingDataFrame.

apply

Applies pandas.Series.apply. This function returns a StreamingSeries.

applymap

Applies pandas.DataFrame.applymap. This function returns a StreamingDataFrame.

applymap

Applies pandas.DataFrame.applymap. This function returns a StreamingDataFrame.

concat

Concatenates dataframes. The function ensures all pandas.DataFrame or StreamingDataFrame

concat

Concatenates dataframes. The function ensures all pandas.DataFrame or StreamingDataFrame

describe

Calls pandas.DataFrame.describe on every piece of the datasets. percentiles are not really accurate …

describe

Calls pandas.DataFrame.describe on every piece of the datasets. percentiles are not really accurate …

drop

Applies pandas.DataFrame.drop. This function returns a StreamingDataFrame.

drop

Applies pandas.DataFrame.drop. This function returns a StreamingDataFrame.

ensure_dtype

Ensures the dataframe df has types indicated in dtypes. Changes it if not.

ensure_dtype

Ensures the dataframe df has types indicated in dtypes. Changes it if not.

fillna

Replaces the missing values, calls pandas.DataFrame.fillna.

fillna

Replaces the missing values, calls pandas.DataFrame.fillna.

get_kwargs

Returns the parameters used to call the constructor.

get_kwargs

Returns the parameters used to call the constructor.

groupby

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory …

groupby

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory …

groupby_streaming

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory …

groupby_streaming

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory …

head

Returns the first rows as a DataFrame.

head

Returns the first rows as a DataFrame.

is_stable

Tells if the dataframe is supposed to be stable.

is_stable

Tells if the dataframe is supposed to be stable.

iterrows

See pandas.DataFrame.iterrows.

iterrows

See pandas.DataFrame.iterrows.

merge

Merges two StreamingDataFrame and returns StreamingDataFrame. right can be either a StreamingDataFrame

merge

Merges two StreamingDataFrame and returns StreamingDataFrame. right can be either a StreamingDataFrame

sample

See pandas.DataFrame.sample. Only frac is available, otherwise choose reservoir_sampling(). …

sample

See pandas.DataFrame.sample. Only frac is available, otherwise choose reservoir_sampling(). …

sort_values

Sorts the streaming dataframe by values.

sort_values

Sorts the streaming dataframe by values.

tail

Returns the last rows as a DataFrame. The size of chunks must be greater than n to get n

tail

Returns the last rows as a DataFrame. The size of chunks must be greater than n to get n

to_csv

Saves the DataFrame into string. See pandas.DataFrame.to_csv.

to_csv

Saves the DataFrame into string. See pandas.DataFrame.to_csv.

to_dataframe

Converts everything into a single DataFrame.

to_dataframe

Converts everything into a single DataFrame.

to_df

Converts everything into a single DataFrame.

to_df

Converts everything into a single DataFrame.

train_test_split

Randomly splits a dataframe into smaller pieces. The function returns streams of file names. It …

train_test_split

Randomly splits a dataframe into smaller pieces. The function returns streams of file names. It …

where

Applies pandas.DataFrame.where. inplace must be False. This function returns a StreamingDataFrame. …

where

Applies pandas.DataFrame.where. inplace must be False. This function returns a StreamingDataFrame. …

Documentation

Defines a streaming dataframe.

source on GitHub

class pandas_streaming.df.dataframe.StreamingDataFrame(iter_creation, check_schema=True, stable=True)

Bases: object

Defines a streaming dataframe. The goal is to reduce the memory footprint. The class takes a function which creates an iterator on dataframe. We assume this function can be called multiple time. As a matter of fact, the function is called every time the class needs to walk through the stream with the following loop:

for df in self:  # self is a StreamingDataFrame
    # ...

The constructor cannot receive an iterator otherwise this class would be able to walk through the data only once. The main reason is it is impossible to pickle (or dill) an iterator: it cannot be replicated. Instead, the class takes a function which generates an iterator on DataFrame. Most of the methods returns either a DataFrame either a StreamingDataFrame. In the second case, methods can be chained.

By default, the object checks that the schema remains the same between two chunks. This can be disabled by setting check_schema=False in the constructor.

The user should expect the data to remain stable. Every loop should produce the same data. However, in some situations, it is more efficient not to keep that constraints. Draw a random sample() is one of these cases.

Parameters:
  • iter_creation – function which creates an iterator or an instance of StreamingDataFrame

  • check_schema – checks that the schema is the same for every dataframe

  • stable – indicates if the dataframe remains the same whenever it is walked through

source on GitHub

__del__()

Calls every function in _delete_.

source on GitHub

__getitem__(*args)

Implements some of the functionalities pandas offers for the operator [].

source on GitHub

__init__(iter_creation, check_schema=True, stable=True)
__iter__()

Iterator on a large file with a sliding window. Each windows is a DataFrame. The method stores a copy of the initial iterator and restores it after the end of the iterations. If check_schema was enabled when calling the constructor, the method checks that every DataFrame follows the same schema as the first chunck.

Even with a big chunk size, it might happen that consecutive chunks might detect different type for one particular column. An error message shows up saying Column types are different after row with more information about the column which failed. In that case, pandas.DataFrame.read_csv can overwrite the type on one column by specifying dtype={column_name: new_type}. It frequently happens when a string column has many missing values.

source on GitHub

__setitem__(index, value)

Limited set of operators are supported.

source on GitHub

_concath(others)
_concatv(others)
static _process_kwargs(kwargs)

Filters out parameters for the constructor of this class.

source on GitHub

_reservoir_sampling(cache=True, n=1000, random_state=None) StreamingDataFrame

Uses the reservoir sampling algorithm to draw a random sample with exactly n samples.

Parameters:
  • cache – cache the sample

  • n – number of observations to keep

  • random_state – sets the random_state

Returns:

StreamingDataFrame

Warning

The sample is split by chunks of size 1000. This parameter is not yet exposed.

source on GitHub

add_column(col, value)

Implements some of the functionalities pandas offers for the operator [].

Parameters:
Returns:

StreamingDataFrame

..note:

If value is a :class:`StreamingDataFrame <pandas_streaming.df.dataframe.StreamingDataFrame>`,
*chunksize* must be the same for both.

Add a new column to a StreamingDataFrame

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame

df = DataFrame(data=dict(X=[4.5, 6, 7], Y=["a", "b", "c"]))
sdf = StreamingDataFrame.read_df(df)
sdf2 = sdf.add_column("d", lambda row: int(1))
print(sdf2.to_dataframe())

sdf2 = sdf.add_column("d", lambda row: int(1))
print(sdf2.to_dataframe())

>>>

         X  Y  d
    0  4.5  a  1
    1  6.0  b  1
    2  7.0  c  1
         X  Y  d
    0  4.5  a  1
    1  6.0  b  1
    2  7.0  c  1

source on GitHub

apply(*args, **kwargs) StreamingDataFrame

Applies pandas.DataFrame.apply. This function returns a StreamingDataFrame.

source on GitHub

applymap(*args, **kwargs) StreamingDataFrame

Applies pandas.DataFrame.applymap. This function returns a StreamingDataFrame.

source on GitHub

property columns

See pandas.DataFrame.columns.

source on GitHub

concat(others, axis=0) StreamingDataFrame

Concatenates dataframes. The function ensures all pandas.DataFrame or StreamingDataFrame share the same columns (name and type). Otherwise, the function fails as it cannot guess the schema without walking through all dataframes.

Parameters:
  • others – list, enumeration, pandas.DataFrame

  • axis – concatenate by rows (0) or by columns (1)

Returns:

StreamingDataFrame

source on GitHub

describe(percentiles=None, include=None, exclude=None)

Calls pandas.DataFrame.describe on every piece of the datasets. percentiles are not really accurate but just an indication.

Parameters:
Returns:

pandas.DataFrame.describe

Changed in version 0.3.219: Parameter datetime_is_numeric was removed (see pandas.DataFrame.describe).

source on GitHub

drop(labels=None, *, axis=0, index=None, columns=None, level=None, inplace=False, errors='raise') StreamingDataFrame

Applies pandas.DataFrame.drop. This function returns a StreamingDataFrame.

source on GitHub

property dtypes

See pandas.DataFrame.dtypes.

source on GitHub

ensure_dtype(df, dtypes)

Ensures the dataframe df has types indicated in dtypes. Changes it if not.

Parameters:
  • df – dataframe

  • dtypes – list of types

Returns:

updated?

source on GitHub

fillna(**kwargs)

Replaces the missing values, calls pandas.DataFrame.fillna.

Parameters:

kwargs – see pandas.DataFrame.fillna

Returns:

StreamingDataFrame

Warning

The function does not check what happens at the limit of every chunk of data. Anything but a constant value will probably have an inconsistent behaviour.

source on GitHub

get_kwargs()

Returns the parameters used to call the constructor.

source on GitHub

groupby(by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, **kwargs) DataFrame

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory is not implemented yet.

Parameters:
  • by – see pandas.DataFrame.groupby

  • in_memory – in-memory algorithm

  • lambda_agg – aggregation function, sum by default

  • lambda_agg_agg – to aggregate the aggregations, sum by default

  • kwargs – additional parameters for pandas.DataFrame.groupby

Returns:

pandas.DataFrame

As the input StreamingDataFrame does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated dataframe, another one to combine all the reduced dataframes. This second one is always a sum. As a consequence, this function should not compute any mean or count, only sum because we do not know the size of each iterated dataframe. To compute an average, sum and weights must be aggregated.

Parameter lambda_agg is lambda gr: gr.sum() by default. It could also be lambda gr: gr.max() or lambda gr: gr.min() but not lambda gr: gr.mean() as it would lead to incoherent results.

StreamingDataFrame and groupby

Here is an example which shows how to write a simple groupby with pandas and StreamingDataFrame.

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame

df = DataFrame(dict(A=[3, 4, 3], B=[5, 6, 7]))
sdf = StreamingDataFrame.read_df(df)

# The following:
print(sdf.groupby("A", lambda gr: gr.sum()))

# Is equivalent to:
print(df.groupby("A").sum())

>>>

        B
    A    
    3  12
    4   6
        B
    A    
    3  12
    4   6

source on GitHub

groupby_streaming(by=None, lambda_agg=None, lambda_agg_agg=None, in_memory=True, strategy='cum', **kwargs) DataFrame

Implements the streaming pandas.DataFrame.groupby. We assume the result holds in memory. The out-of-memory is not implemented yet.

Parameters:
  • by – see pandas.DataFrame.groupby

  • in_memory – in-memory algorithm

  • lambda_agg – aggregation function, sum by default

  • lambda_agg_agg – to aggregate the aggregations, sum by default

  • kwargs – additional parameters for pandas.DataFrame.groupby

  • strategy'cum', or 'streaming', see below

Returns:

pandas.DataFrame

As the input StreamingDataFrame does not necessarily hold in memory, the aggregation must be done at every iteration. There are two levels of aggregation: one to reduce every iterated dataframe, another one to combine all the reduced dataframes. This second one is always a sum. As a consequence, this function should not compute any mean or count, only sum because we do not know the size of each iterated dataframe. To compute an average, sum and weights must be aggregated.

Parameter lambda_agg is lambda gr: gr.sum() by default. It could also be lambda gr: gr.max() or lambda gr: gr.min() but not lambda gr: gr.mean() as it would lead to incoherent results.

Parameter strategy allows three scenarios. First one if strategy is None goes through the whole datasets to produce a final DataFrame. Second if strategy=='cum' returns a StreamingDataFrame, each iteration produces the current status of the group by. Last case, strategy=='streaming' produces DataFrame which must be concatenated into a single DataFrame and grouped again to get the results.

StreamingDataFrame and groupby

Here is an example which shows how to write a simple groupby with pandas and StreamingDataFrame.

<<<

from pandas import DataFrame
from pandas_streaming.df import StreamingDataFrame
from pandas_streaming.data import dummy_streaming_dataframe

df20 = dummy_streaming_dataframe(20).to_dataframe()
df20["key"] = df20["cint"].apply(lambda i: i % 3 == 0)
sdf20 = StreamingDataFrame.read_df(df20, chunksize=5)
sgr = sdf20.groupby_streaming("key", lambda gr: gr.sum(),
                              strategy='cum', as_index=False)
for gr in sgr:
    print()
    print(gr)

>>>

    
         key  cint    cstr
    0  False     7  s1s2s4
    1   True     3    s0s3
    
         key  cint          cstr
    0  False    27  s1s2s4s5s7s8
    1   True    18      s0s3s6s9
    
         key  cint                      cstr
    0  False    75  s1s2s4s5s7s8s10s11s13s14
    1   True    30               s0s3s6s9s12
    
         key  cint                               cstr
    0  False   127  s1s2s4s5s7s8s10s11s13s14s16s17s19
    1   True    63                  s0s3s6s9s12s15s18

source on GitHub

head(n=5) DataFrame

Returns the first rows as a DataFrame.

source on GitHub

is_stable(do_check=False, n=10)

Tells if the dataframe is supposed to be stable.

Parameters:
  • do_check – do not trust the value sent to the constructor

  • n – number of rows used to check the stability, None for all rows

Returns:

boolean

do_check=True means the methods checks the first n rows remains the same for two iterations.

source on GitHub

iterrows()

See pandas.DataFrame.iterrows.

source on GitHub

merge(right, **kwargs) StreamingDataFrame

Merges two StreamingDataFrame and returns StreamingDataFrame. right can be either a StreamingDataFrame or simply a pandas.DataFrame. It calls pandas.DataFrame.merge in a double loop, loop on self, loop on right.

source on GitHub

static read_csv(*args, **kwargs) StreamingDataFrame

Reads a csv file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_csv. The important parameter is chunksize which defines the number of rows to parse in a single bloc. If not specified, it will be equal to 100000.

source on GitHub

static read_df(df, chunksize=None, check_schema=True) StreamingDataFrame

Splits a DataFrame into small chunks mostly for unit testing purposes.

Parameters:
  • dfDataFrame

  • chunksize – number rows per chunks (// 10 by default)

  • check_schema – check schema between two iterations

Returns:

iterator on StreamingDataFrame

source on GitHub

static read_json(*args, chunksize=100000, flatten=False, **kwargs) StreamingDataFrame

Reads a json file or buffer as an iterator on DataFrame. The signature is the same as pandas.read_json. The important parameter is chunksize which defines the number of rows to parse in a single bloc and it must be defined to return an iterator. If lines is True, the function falls back into pandas.read_json, otherwise it used enumerate_json_items. If lines is 'stream', enumerate_json_items is called with parameter lines=True. Parameter flatten uses the trick described at Flattening JSON objects in Python. Examples:

<<<

from io import BytesIO
from pandas_streaming.df import StreamingDataFrame

data = b'''{"a": 1, "b": 2}
           {"a": 3, "b": 4}'''
it = StreamingDataFrame.read_json(BytesIO(data), lines=True)
dfs = list(it)
print(dfs)

>>>

    [   a  b
    0  1  2
    1  3  4]

<<<

from io import BytesIO
from pandas_streaming.df import StreamingDataFrame

data = b'''[{"a": 1,
             "b": 2},
            {"a": 3,
             "b": 4}]'''

it = StreamingDataFrame.read_json(BytesIO(data))
dfs = list(it)
print(dfs)

>>>

    [   a  b
    0  1  2
    1  3  4]

The parsed json must have an empty line at the end otherwise the following exception is raised: ijson.common.IncompleteJSONError: ` `parse error: unallowed token at this point in JSON text.

source on GitHub

static read_str(text, **kwargs) StreamingDataFrame

Reads a DataFrame as an iterator on DataFrame. The signature is the same as pandas.read_csv. The important parameter is chunksize which defines the number of rows to parse in a single bloc.

source on GitHub

sample(reservoir=False, cache=False, **kwargs) StreamingDataFrame

See pandas.DataFrame.sample. Only frac is available, otherwise choose reservoir_sampling(). This function returns a StreamingDataFrame.

Parameters:

If cache is True, the sample is cached (assuming it holds in memory). The second time an iterator walks through the

source on GitHub

property shape

This is the kind of operations you do not want to do when a file is large because it goes through the whole stream just to get the number of rows.

source on GitHub

sort_values(by, axis=0, ascending=True, kind='quicksort', na_position='last', temp_file='_pandas_streaming_sort_values_')

Sorts the streaming dataframe by values.

Parameters:
  • by – one column

  • ascending – order

  • kind – see pandas.DataFrame.sort_values()

  • na_position – see pandas.DataFrame.sort_values()

  • temp_file – sorting a whole database is impossible without storing intermediate results on disk unless it can fit into the memory, but in that case, it is easier to convert the streaming database into a dataframe and sort it

Returns:

streaming database

source on GitHub

tail(n=5) DataFrame

Returns the last rows as a DataFrame. The size of chunks must be greater than n to get n lines. This method is not efficient because the whole dataset must be walked through.

source on GitHub

to_csv(path_or_buf=None, **kwargs) StreamingDataFrame

Saves the DataFrame into string. See pandas.DataFrame.to_csv.

source on GitHub

to_dataframe() DataFrame

Converts everything into a single DataFrame.

source on GitHub

to_df() DataFrame

Converts everything into a single DataFrame.

source on GitHub

train_test_split(path_or_buf=None, export_method='to_csv', names=None, streaming=True, partitions=None, **kwargs)

Randomly splits a dataframe into smaller pieces. The function returns streams of file names. It chooses one of the options from module dataframe_split.

Parameters:
  • path_or_buf – a string, a list of strings or buffers, if it is a string, it must contain {} like partition{}.txt, if None, the function returns strings.

  • export_method – method used to store the partitions, by default pandas.DataFrame.to_csv, additional parameters will be given to that function

  • names – partitions names, by default ('train', 'test')

  • kwargs – parameters for the export function and sklearn.model_selection.train_test_split.

  • streaming – the function switches to a streaming version of the algorithm.

  • partitions – splitting partitions

Returns:

outputs of the exports functions or two StreamingDataFrame if path_or_buf is None.

The streaming version of this algorithm is implemented by function sklearn_train_test_split_streaming. Its documentation indicates the limitation of the streaming version and gives some insights about the additional parameters.

source on GitHub

where(*args, **kwargs) StreamingDataFrame

Applies pandas.DataFrame.where. inplace must be False. This function returns a StreamingDataFrame.

source on GitHub

exception pandas_streaming.df.dataframe.StreamingDataFrameSchemaError

Bases: Exception

Reveals an issue with inconsistant schemas.

source on GitHub

class pandas_streaming.df.dataframe.StreamingSeries(iter_creation, check_schema=True, stable=True)

Bases: StreamingDataFrame

Seens as a StreamingDataFrame of one column.

source on GitHub

__add__(value)

Does an addition on every value hoping that has a meaning.

Parameters:

value – any value which makes sense

Returns:

a new series

source on GitHub

__init__(iter_creation, check_schema=True, stable=True)
apply(*args, **kwargs) StreamingDataFrame

Applies pandas.Series.apply. This function returns a StreamingSeries.

source on GitHub