Source code for pandas_streaming.df.dataframe_io_helpers

# -*- coding: utf-8 -*-
"""
Saves and reads a :epkg:`dataframe` into a :epkg:`zip` file.


:githublink:`%|py|6`
"""
import os
from io import StringIO, BytesIO
try:
    from ujson import dumps
except ImportError:  # pragma: no cover
    from json import dumps
import ijson


[docs]class JsonPerRowsStream: """ Reads a :epkg:`json` streams and adds ``,``, ``[``, ``]`` to convert a stream containing one :pekg:`json` object per row into one single :epkg:`json` object. It only implements method *readline*. :githublink:`%|py|21` """
[docs] def __init__(self, st): """ :param st: stream :githublink:`%|py|26` """ self.st = st self.begin = True self.newline = False self.end = True
[docs] def readline(self, size=-1): """ Reads a line, adds ``,``, ``[``, ``]`` if needed. So the number of read characters is not recessarily the requested one but could be greater. :githublink:`%|py|37` """ text = self.st.readline(size) if size == 0: return text if self.newline: text = ',' + text self.newline = False elif self.begin: text = '[' + text self.begin = False if text.endswith("\n"): self.newline = True return text elif len(text) == 0 or len(text) < size: if self.end: self.end = False return text + ']' else: return text else: return text
[docs] def read(self, size=-1): """ Reads characters, adds ``,``, ``[``, ``]`` if needed. So the number of read characters is not recessarily the requested one but could be greater. :githublink:`%|py|65` """ text = self.st.read(size) if isinstance(text, bytes): cst = b"\n", b"\n,", b",", b"[", b"]" else: cst = "\n", "\n,", ",", "[", "]" if size == 0: return text if len(text) > 1: t1, t2 = text[:len(text) - 1], text[len(text) - 1:] t1 = t1.replace(cst[0], cst[1]) text = t1 + t2 if self.newline: text = cst[2] + text self.newline = False elif self.begin: text = cst[3] + text self.begin = False if text.endswith(cst[0]): self.newline = True return text elif len(text) == 0 or len(text) < size: if self.end: self.end = False return text + cst[4] else: return text else: return text
[docs] def getvalue(self): """ Returns the whole stream content. :githublink:`%|py|100` """ def byline(): line = self.readline() while line: yield line line = self.readline() return "".join(byline())
[docs]def flatten_dictionary(dico, sep="_"): """ Flattens a dictionary with nested structure to a dictionary with no hierarchy. :param dico: dictionary to flatten :param sep: string to separate dictionary keys by :return: flattened dictionary Inspired from `flatten_json <https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_. :githublink:`%|py|118` """ flattened_dict = dict() def _flatten(obj, key): if obj is None: flattened_dict[key] = obj elif isinstance(obj, dict): for k, v in obj.items(): if not isinstance(k, str): raise TypeError( "All keys must a string.") # pragma: no cover k2 = k if key is None else "{0}{1}{2}".format(key, sep, k) _flatten(v, k2) elif isinstance(obj, (list, set)): for index, item in enumerate(obj): k2 = k if key is None else "{0}{1}{2}".format(key, sep, index) _flatten(item, k2) else: flattened_dict[key] = obj _flatten(dico, None) return flattened_dict
[docs]def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fLOG=None): """ Enumerates items from a :epkg:`JSON` file or string. :param filename: filename or string or stream to parse :param encoding: encoding :param lines: one record per row :param flatten: call :func:`flatten_dictionary <pandas_streaming.df.dataframe_io_helpers.flatten_dictionary>` :param fLOG: logging function :return: iterator on records at first level. It assumes the syntax follows the format: ``[ {"id":1, ...}, {"id": 2, ...}, ...]``. However, if option *lines* if true, the function considers that the stream or file does have one record per row as follows: {"id":1, ...} {"id": 2, ...} .. exref:: :title: Processes a json file by streaming. The module :epkg:`ijson` can read a :epkg:`JSON` file by streaming. This module is needed because a record can be written on multiple lines. This function leverages it produces the following results. .. runpython:: :showcode: from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items text_json = b''' [ { "glossary": { "title": "example glossary", "GlossDiv": { "title": "S", "GlossList": [{ "GlossEntry": { "ID": "SGML", "SortAs": "SGML", "GlossTerm": "Standard Generalized Markup Language", "Acronym": "SGML", "Abbrev": "ISO 8879:1986", "GlossDef": { "para": "A meta-markup language, used to create markup languages such as DocBook.", "GlossSeeAlso": ["GML", "XML"] }, "GlossSee": "markup" } }] } } }, { "glossary": { "title": "example glossary", "GlossDiv": { "title": "S", "GlossList": { "GlossEntry": [{ "ID": "SGML", "SortAs": "SGML", "GlossTerm": "Standard Generalized Markup Language", "Acronym": "SGML", "Abbrev": "ISO 8879:1986", "GlossDef": { "para": "A meta-markup language, used to create markup languages such as DocBook.", "GlossSeeAlso": ["GML", "XML"] }, "GlossSee": "markup" }] } } } } ] ''' for item in enumerate_json_items(text_json): print(item) :githublink:`%|py|223` """ if isinstance(filename, str): if "{" not in filename and os.path.exists(filename): with open(filename, "r", encoding=encoding) as f: for el in enumerate_json_items(f, encoding=encoding, lines=lines, flatten=flatten, fLOG=fLOG): yield el else: st = StringIO(filename) for el in enumerate_json_items(st, encoding=encoding, lines=lines, flatten=flatten, fLOG=fLOG): yield el elif isinstance(filename, bytes): st = BytesIO(filename) for el in enumerate_json_items(st, encoding=encoding, lines=lines, flatten=flatten, fLOG=fLOG): yield el elif lines: for el in enumerate_json_items(JsonPerRowsStream(filename), encoding=encoding, lines=False, flatten=flatten, fLOG=fLOG): yield el else: parser = ijson.parse(filename) current = None curkey = None stack = [] nbyield = 0 for i, (_, event, value) in enumerate(parser): if i % 1000000 == 0 and fLOG is not None: fLOG( # pragma: no cover "[enumerate_json_items] i={0} yielded={1}".format(i, nbyield)) if event == "start_array": if curkey is None: current = [] else: if not isinstance(current, dict): raise RuntimeError( # pragma: no cover "Type issue {0}".format(type(current))) c = [] current[curkey] = c current = c curkey = None stack.append(current) elif event == "end_array": stack.pop() if len(stack) == 0: # We should be done. current = None else: current = stack[-1] elif event == "start_map": c = {} if curkey is None: if current is None: current = [] current.append(c) else: current[curkey] = c # pylint: disable=E1137 stack.append(c) current = c curkey = None elif event == "end_map": stack.pop() current = stack[-1] if len(stack) == 1: nbyield += 1 if flatten: yield flatten_dictionary(current[-1]) else: yield current[-1] # We clear the memory. current.clear() elif event == "map_key": curkey = value elif event in {"string", "number", "boolean"}: if curkey is None: current.append(value) else: current[curkey] = value # pylint: disable=E1137 curkey = None elif event == "null": if curkey is None: current.append(None) else: current[curkey] = None # pylint: disable=E1137 curkey = None else: raise ValueError("Unknown event '{0}'".format( event)) # pragma: no cover
[docs]class JsonIterator2Stream: """ Transforms an iterator on :epkg:`JSON` items into a stream which returns an items as a string every time method *read* is called. The iterator could be one returned by :func:`enumerate_json_items <pandas_streaming.df.dataframe_io_helpers.enumerate_json_items>`. .. exref:: :title: Reshape a json file The function :func:`enumerate_json_items <pandas_streaming.df.dataframe_io_helpers.enumerate_json_items>` reads any :epkg:`json` even if every record is split over multiple lines. Class :class:`JsonIterator2Stream <pandas_streaming.df.dataframe_io_helpers.JsonIterator2Stream>` mocks this iterator as a stream. Each row is a single item. .. runpython:: :showcode: from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream text_json = b''' [ { "glossary": { "title": "example glossary", "GlossDiv": { "title": "S", "GlossList": [{ "GlossEntry": { "ID": "SGML", "SortAs": "SGML", "GlossTerm": "Standard Generalized Markup Language", "Acronym": "SGML", "Abbrev": "ISO 8879:1986", "GlossDef": { "para": "A meta-markup language, used to create markup languages such as DocBook.", "GlossSeeAlso": ["GML", "XML"] }, "GlossSee": "markup" } }] } } }, { "glossary": { "title": "example glossary", "GlossDiv": { "title": "S", "GlossList": { "GlossEntry": [{ "ID": "SGML", "SortAs": "SGML", "GlossTerm": "Standard Generalized Markup Language", "Acronym": "SGML", "Abbrev": "ISO 8879:1986", "GlossDef": { "para": "A meta-markup language, used to create markup languages such as DocBook.", "GlossSeeAlso": ["GML", "XML"] }, "GlossSee": "markup" }] } } } } ] ''' for item in JsonIterator2Stream(enumerate_json_items(text_json)): print(item) :githublink:`%|py|382` """
[docs] def __init__(self, it, **kwargs): """ :param it: iterator :param kwargs: arguments to :epkg:`*py:json:dumps` :githublink:`%|py|388` """ self.it = it self.kwargs = kwargs
[docs] def write(self): """ The class does not write. :githublink:`%|py|395` """ raise NotImplementedError()
[docs] def read(self): """ Reads the next item and returns it as a string. :githublink:`%|py|401` """ try: value = next(self.it) return dumps(value, **self.kwargs) except StopIteration: return None
[docs] def __iter__(self): """ Iterate on each row. :githublink:`%|py|411` """ for value in self.it: yield dumps(value, **self.kwargs)