# -*- coding: utf-8 -*-
"""
Saves and reads a :epkg:`dataframe` into a :epkg:`zip` file.
:githublink:`%|py|6`
"""
import os
from io import StringIO, BytesIO
try:
from ujson import dumps
except ImportError: # pragma: no cover
from json import dumps
import ijson
[docs]class JsonPerRowsStream:
"""
Reads a :epkg:`json` streams and adds
``,``, ``[``, ``]`` to convert a stream containing
one :pekg:`json` object per row into one single :epkg:`json` object.
It only implements method *readline*.
:githublink:`%|py|21`
"""
[docs] def __init__(self, st):
"""
:param st: stream
:githublink:`%|py|26`
"""
self.st = st
self.begin = True
self.newline = False
self.end = True
[docs] def readline(self, size=-1):
"""
Reads a line, adds ``,``, ``[``, ``]`` if needed.
So the number of read characters is not recessarily
the requested one but could be greater.
:githublink:`%|py|37`
"""
text = self.st.readline(size)
if size == 0:
return text
if self.newline:
text = ',' + text
self.newline = False
elif self.begin:
text = '[' + text
self.begin = False
if text.endswith("\n"):
self.newline = True
return text
elif len(text) == 0 or len(text) < size:
if self.end:
self.end = False
return text + ']'
else:
return text
else:
return text
[docs] def read(self, size=-1):
"""
Reads characters, adds ``,``, ``[``, ``]`` if needed.
So the number of read characters is not recessarily
the requested one but could be greater.
:githublink:`%|py|65`
"""
text = self.st.read(size)
if isinstance(text, bytes):
cst = b"\n", b"\n,", b",", b"[", b"]"
else:
cst = "\n", "\n,", ",", "[", "]"
if size == 0:
return text
if len(text) > 1:
t1, t2 = text[:len(text) - 1], text[len(text) - 1:]
t1 = t1.replace(cst[0], cst[1])
text = t1 + t2
if self.newline:
text = cst[2] + text
self.newline = False
elif self.begin:
text = cst[3] + text
self.begin = False
if text.endswith(cst[0]):
self.newline = True
return text
elif len(text) == 0 or len(text) < size:
if self.end:
self.end = False
return text + cst[4]
else:
return text
else:
return text
[docs] def getvalue(self):
"""
Returns the whole stream content.
:githublink:`%|py|100`
"""
def byline():
line = self.readline()
while line:
yield line
line = self.readline()
return "".join(byline())
[docs]def flatten_dictionary(dico, sep="_"):
"""
Flattens a dictionary with nested structure to a dictionary with no
hierarchy.
:param dico: dictionary to flatten
:param sep: string to separate dictionary keys by
:return: flattened dictionary
Inspired from `flatten_json <https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_.
:githublink:`%|py|118`
"""
flattened_dict = dict()
def _flatten(obj, key):
if obj is None:
flattened_dict[key] = obj
elif isinstance(obj, dict):
for k, v in obj.items():
if not isinstance(k, str):
raise TypeError(
"All keys must a string.") # pragma: no cover
k2 = k if key is None else "{0}{1}{2}".format(key, sep, k)
_flatten(v, k2)
elif isinstance(obj, (list, set)):
for index, item in enumerate(obj):
k2 = k if key is None else "{0}{1}{2}".format(key, sep, index)
_flatten(item, k2)
else:
flattened_dict[key] = obj
_flatten(dico, None)
return flattened_dict
[docs]def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fLOG=None):
"""
Enumerates items from a :epkg:`JSON` file or string.
:param filename: filename or string or stream to parse
:param encoding: encoding
:param lines: one record per row
:param flatten: call :func:`flatten_dictionary <pandas_streaming.df.dataframe_io_helpers.flatten_dictionary>`
:param fLOG: logging function
:return: iterator on records at first level.
It assumes the syntax follows the format: ``[ {"id":1, ...}, {"id": 2, ...}, ...]``.
However, if option *lines* if true, the function considers that the
stream or file does have one record per row as follows:
{"id":1, ...}
{"id": 2, ...}
.. exref::
:title: Processes a json file by streaming.
The module :epkg:`ijson` can read a :epkg:`JSON` file by streaming.
This module is needed because a record can be written on multiple lines.
This function leverages it produces the following results.
.. runpython::
:showcode:
from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items
text_json = b'''
[
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": [{
"GlossEntry": {
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}
}]
}
}
},
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": {
"GlossEntry": [{
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}]
}
}
}
}
]
'''
for item in enumerate_json_items(text_json):
print(item)
:githublink:`%|py|223`
"""
if isinstance(filename, str):
if "{" not in filename and os.path.exists(filename):
with open(filename, "r", encoding=encoding) as f:
for el in enumerate_json_items(f, encoding=encoding, lines=lines, flatten=flatten, fLOG=fLOG):
yield el
else:
st = StringIO(filename)
for el in enumerate_json_items(st, encoding=encoding, lines=lines, flatten=flatten, fLOG=fLOG):
yield el
elif isinstance(filename, bytes):
st = BytesIO(filename)
for el in enumerate_json_items(st, encoding=encoding, lines=lines, flatten=flatten, fLOG=fLOG):
yield el
elif lines:
for el in enumerate_json_items(JsonPerRowsStream(filename),
encoding=encoding, lines=False, flatten=flatten, fLOG=fLOG):
yield el
else:
parser = ijson.parse(filename)
current = None
curkey = None
stack = []
nbyield = 0
for i, (_, event, value) in enumerate(parser):
if i % 1000000 == 0 and fLOG is not None:
fLOG( # pragma: no cover
"[enumerate_json_items] i={0} yielded={1}".format(i, nbyield))
if event == "start_array":
if curkey is None:
current = []
else:
if not isinstance(current, dict):
raise RuntimeError( # pragma: no cover
"Type issue {0}".format(type(current)))
c = []
current[curkey] = c
current = c
curkey = None
stack.append(current)
elif event == "end_array":
stack.pop()
if len(stack) == 0:
# We should be done.
current = None
else:
current = stack[-1]
elif event == "start_map":
c = {}
if curkey is None:
if current is None:
current = []
current.append(c)
else:
current[curkey] = c # pylint: disable=E1137
stack.append(c)
current = c
curkey = None
elif event == "end_map":
stack.pop()
current = stack[-1]
if len(stack) == 1:
nbyield += 1
if flatten:
yield flatten_dictionary(current[-1])
else:
yield current[-1]
# We clear the memory.
current.clear()
elif event == "map_key":
curkey = value
elif event in {"string", "number", "boolean"}:
if curkey is None:
current.append(value)
else:
current[curkey] = value # pylint: disable=E1137
curkey = None
elif event == "null":
if curkey is None:
current.append(None)
else:
current[curkey] = None # pylint: disable=E1137
curkey = None
else:
raise ValueError("Unknown event '{0}'".format(
event)) # pragma: no cover
[docs]class JsonIterator2Stream:
"""
Transforms an iterator on :epkg:`JSON` items
into a stream which returns an items as a string every time
method *read* is called.
The iterator could be one returned by :func:`enumerate_json_items <pandas_streaming.df.dataframe_io_helpers.enumerate_json_items>`.
.. exref::
:title: Reshape a json file
The function :func:`enumerate_json_items <pandas_streaming.df.dataframe_io_helpers.enumerate_json_items>` reads any
:epkg:`json` even if every record is split over
multiple lines. Class :class:`JsonIterator2Stream <pandas_streaming.df.dataframe_io_helpers.JsonIterator2Stream>`
mocks this iterator as a stream. Each row is a single item.
.. runpython::
:showcode:
from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream
text_json = b'''
[
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": [{
"GlossEntry": {
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}
}]
}
}
},
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": {
"GlossEntry": [{
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}]
}
}
}
}
]
'''
for item in JsonIterator2Stream(enumerate_json_items(text_json)):
print(item)
:githublink:`%|py|382`
"""
[docs] def __init__(self, it, **kwargs):
"""
:param it: iterator
:param kwargs: arguments to :epkg:`*py:json:dumps`
:githublink:`%|py|388`
"""
self.it = it
self.kwargs = kwargs
[docs] def write(self):
"""
The class does not write.
:githublink:`%|py|395`
"""
raise NotImplementedError()
[docs] def read(self):
"""
Reads the next item and returns it as a string.
:githublink:`%|py|401`
"""
try:
value = next(self.it)
return dumps(value, **self.kwargs)
except StopIteration:
return None
[docs] def __iter__(self):
"""
Iterate on each row.
:githublink:`%|py|411`
"""
for value in self.it:
yield dumps(value, **self.kwargs)