Tutorial

The main class StreamingDataFrame is basically on iterator on dataframes. Altogether, it is a single dataframe which does not have to fit in memory. It implements a subset a functionalities pandas provides related to map reduce, concat, join. Both return a StreamingDataFrame as opposed to groupby which does not.

The beginning is always the same, we create such object with one method read_csv, read_df, read_str. The module was initially created to easily split a dataset into train/test when it does not fit into memory.

from pandas_streaming.df import StreamingDataFrame
sdf = StreamingDataFrame.read_csv("<filename>", sep="\t")
sdf.train_test_split("dataset_split_{}.txt", sep="\t")

>>> ['dataset_split_train.txt', 'dataset_split_test.txt']

Objectives and Competitors

The first objective is speed. StreamingDataFrame is useful when the user needs to process a large data set which does not hold in memory (out-of-memory dataset) or when the user needs to fastly check an algorithm on the beginning of a big dataset without paying the cost of loading the data.

The second objective is simplicity. The proposed interface tries to follow the same syntax as pandas. That is one of the direction followed by dask.

dask tries to address these two objectives and also offers parallelization. Based on my experience, dask is efficient but tends to be slow for simple things on medium datasets (a couple of gigabytes). The API is not exactly the same either. The parser does not behave exactly the same. pyspark seems a bit of overhead, more difficult to install and still slow if it is used locally. pyarrow is supposed to be the next pandas but its scope is larger (it handles streaming dataset from Hadoop) and does not work yet with scikit-learn. I expect this module to be live until scikit-learn updates its code to handle a streaming container. This one will probably be the winner.

One element of design to remember

The class StreamingDataFrame does not hold an iterator but a function which creates an iterator. Every time the user writes the following loop, the function is called to create an iterator then used to walk through the data.

<<<

import pandas
df = pandas.DataFrame([dict(cf=0, cint=0, cstr="0"), dict(cf=1, cint=1, cstr="1"),
                       dict(cf=3, cint=3, cstr="3")])

from pandas_streaming.df import StreamingDataFrame
sdf = StreamingDataFrame.read_df(df, chunksize=2)

print("First time:")

for df in sdf:
    # process this chunk of data
    print(df)

print("\nSecond time:\n")

for df in sdf:
    # process this chunk of data a second time
    print(df)

>>>

    First time:
       cf  cint cstr
    0   0     0    0
    1   1     1    1
       cf  cint cstr
    2   3     3    3
    
    Second time:
    
       cf  cint cstr
    0   0     0    0
    1   1     1    1
       cf  cint cstr
    2   3     3    3

The reason why the class cannot directly use an iterator is because it is not possible to pickle an iterator. An iterator is meant to be used only once, a second loop would not be possible and would be quite surprising to most of users.

A StreamingDataFrame is also supposed to be stable: the two loops in the previous example should produce the exact same chunks. However, in some cases, the user can choose not to abide by this constraint. Drawing a sample is one of the reasons. A user can either choose to draw the same sample every time he is going through the data. He could also choose that a different sample should be drawn each time. The following method indicates which kinds of sample the StreamingDataFrame is producing.

pandas_streaming.df.StreamingDataFrame (self, iter_creation, check_schema = True, stable = True)

Defines a streaming dataframe. The goal is to reduce the memory footprint. The class takes a function which creates an iterator on dataframe. We assume this function can be called multiple time. As a matter of fact, the function is called every time the class needs to walk through the stream with the following loop…

Check the schema consistency of a large file

Large files usually comes from an export of a database and this for some reason, this export failed for a couple of lines. It can be character end of line not removed from a comment, a separator also present in the data. When that happens, pandas takes the least strict type as the column type. Sometimes, we prefer to get a an idea of where we could find the error.

<<<

import pandas
df = pandas.DataFrame([dict(cf=0, cint=0, cstr="0"), dict(cf=1, cint=1, cstr="1"),
                       dict(cf=2, cint="s2", cstr="2"), dict(cf=3, cint=3, cstr="3")])
name = "temp_df.csv"
df.to_csv(name, index=False)

from pandas_streaming.df import StreamingDataFrame
try:
    sdf = StreamingDataFrame.read_csv(name, chunksize=2)
    for df in sdf:
        print(df.dtypes)
except Exception as e:
    print("ERROR:", e)

>>>

    cf      int64
    cint    int64
    cstr    int64
    dtype: object
    ERROR: Column types are different after row 2
    ,names,schema1,schema2,diff
    1,cint,int64,object,True

The method __iter__ checks that the schema does not change between two iterations. It can be disabled by adding check_schema=False when the constructor is called.