Coverage for pandas_streaming/df/dataframe_io_helpers.py: 92%
173 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 14:15 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 14:15 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Saves and reads a :epkg:`dataframe` into a :epkg:`zip` file.
5"""
6import os
7from io import StringIO, BytesIO
8try:
9 from ujson import dumps
10except ImportError: # pragma: no cover
11 from json import dumps
12import ijson
15class JsonPerRowsStream:
16 """
17 Reads a :epkg:`json` streams and adds
18 ``,``, ``[``, ``]`` to convert a stream containing
19 one :epkg:`json` object per row into one single :epkg:`json` object.
20 It only implements method *readline*.
22 :param st: stream
23 """
25 def __init__(self, st):
26 self.st = st
27 self.begin = True
28 self.newline = False
29 self.end = True
31 def seek(self, offset):
32 """
33 Change the stream position to the given byte offset.
35 :param offset: offset, only 0 is implemented
36 """
37 self.st.seek(offset)
39 def readline(self, size=-1):
40 """
41 Reads a line, adds ``,``, ``[``, ``]`` if needed.
42 So the number of read characters is not recessarily
43 the requested one but could be greater.
44 """
45 text = self.st.readline(size)
46 if size == 0:
47 return text
48 if self.newline:
49 text = ',' + text
50 self.newline = False
51 elif self.begin:
52 text = '[' + text
53 self.begin = False
55 if text.endswith("\n"):
56 self.newline = True
57 return text
58 if len(text) == 0 or len(text) < size:
59 if self.end:
60 self.end = False
61 return text + ']'
62 return text
63 return text
65 def read(self, size=-1):
66 """
67 Reads characters, adds ``,``, ``[``, ``]`` if needed.
68 So the number of read characters is not recessarily
69 the requested one but could be greater.
70 """
71 text = self.st.read(size)
72 if isinstance(text, bytes):
73 cst = b"\n", b"\n,", b",", b"[", b"]"
74 else:
75 cst = "\n", "\n,", ",", "[", "]"
76 if size == 0:
77 return text
78 if len(text) > 1:
79 t1, t2 = text[:len(text) - 1], text[len(text) - 1:]
80 t1 = t1.replace(cst[0], cst[1])
81 text = t1 + t2
83 if self.newline:
84 text = cst[2] + text
85 self.newline = False
86 elif self.begin:
87 text = cst[3] + text
88 self.begin = False
90 if text.endswith(cst[0]):
91 self.newline = True
92 return text
93 if len(text) == 0 or len(text) < size:
94 if self.end:
95 self.end = False
96 return text + cst[4]
97 return text
98 return text
100 def getvalue(self):
101 """
102 Returns the whole stream content.
103 """
104 def byline():
105 line = self.readline()
106 while line:
107 yield line
108 line = self.readline()
109 return "".join(byline())
112def flatten_dictionary(dico, sep="_"):
113 """
114 Flattens a dictionary with nested structure to a dictionary with no
115 hierarchy.
117 :param dico: dictionary to flatten
118 :param sep: string to separate dictionary keys by
119 :return: flattened dictionary
121 Inspired from `flatten_json
122 <https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_.
123 """
124 flattened_dict = {}
126 def _flatten(obj, key):
127 if obj is None:
128 flattened_dict[key] = obj
129 elif isinstance(obj, dict):
130 for k, v in obj.items():
131 if not isinstance(k, str):
132 raise TypeError(
133 "All keys must a string.") # pragma: no cover
134 k2 = k if key is None else f"{key}{sep}{k}"
135 _flatten(v, k2)
136 elif isinstance(obj, (list, set)):
137 for index, item in enumerate(obj):
138 k2 = k if key is None else f"{key}{sep}{index}"
139 _flatten(item, k2)
140 else:
141 flattened_dict[key] = obj
143 _flatten(dico, None)
144 return flattened_dict
147def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fLOG=None):
148 """
149 Enumerates items from a :epkg:`JSON` file or string.
151 :param filename: filename or string or stream to parse
152 :param encoding: encoding
153 :param lines: one record per row
154 :param flatten: call @see fn flatten_dictionary
155 :param fLOG: logging function
156 :return: iterator on records at first level.
158 It assumes the syntax follows the format: ``[ {"id":1, ...}, {"id": 2, ...}, ...]``.
159 However, if option *lines* if true, the function considers that the
160 stream or file does have one record per row as follows:
162 {"id":1, ...}
163 {"id": 2, ...}
165 .. exref::
166 :title: Processes a json file by streaming.
168 The module :epkg:`ijson` can read a :epkg:`JSON` file by streaming.
169 This module is needed because a record can be written on multiple lines.
170 This function leverages it produces the following results.
172 .. runpython::
173 :showcode:
175 from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items
177 text_json = b'''
178 [
179 {
180 "glossary": {
181 "title": "example glossary",
182 "GlossDiv": {
183 "title": "S",
184 "GlossList": [{
185 "GlossEntry": {
186 "ID": "SGML",
187 "SortAs": "SGML",
188 "GlossTerm": "Standard Generalized Markup Language",
189 "Acronym": "SGML",
190 "Abbrev": "ISO 8879:1986",
191 "GlossDef": {
192 "para": "A meta-markup language, used to create markup languages such as DocBook.",
193 "GlossSeeAlso": ["GML", "XML"]
194 },
195 "GlossSee": "markup"
196 }
197 }]
198 }
199 }
200 },
201 {
202 "glossary": {
203 "title": "example glossary",
204 "GlossDiv": {
205 "title": "S",
206 "GlossList": {
207 "GlossEntry": [{
208 "ID": "SGML",
209 "SortAs": "SGML",
210 "GlossTerm": "Standard Generalized Markup Language",
211 "Acronym": "SGML",
212 "Abbrev": "ISO 8879:1986",
213 "GlossDef": {
214 "para": "A meta-markup language, used to create markup languages such as DocBook.",
215 "GlossSeeAlso": ["GML", "XML"]
216 },
217 "GlossSee": "markup"
218 }]
219 }
220 }
221 }
222 }
223 ]
224 '''
226 for item in enumerate_json_items(text_json):
227 print(item)
229 The parsed json must have an empty line at the end otherwise
230 the following exception is raised:
231 `ijson.common.IncompleteJSONError: `
232 `parse error: unallowed token at this point in JSON text`.
233 """
234 if isinstance(filename, str):
235 if "{" not in filename and os.path.exists(filename):
236 with open(filename, "r", encoding=encoding) as f:
237 for el in enumerate_json_items(
238 f, encoding=encoding, lines=lines,
239 flatten=flatten, fLOG=fLOG):
240 yield el
241 else:
242 st = StringIO(filename)
243 for el in enumerate_json_items(
244 st, encoding=encoding, lines=lines,
245 flatten=flatten, fLOG=fLOG):
246 yield el
247 elif isinstance(filename, bytes):
248 st = BytesIO(filename)
249 for el in enumerate_json_items(
250 st, encoding=encoding, lines=lines, flatten=flatten,
251 fLOG=fLOG):
252 yield el
253 elif lines:
254 for el in enumerate_json_items(
255 JsonPerRowsStream(filename),
256 encoding=encoding, lines=False, flatten=flatten, fLOG=fLOG):
257 yield el
258 else:
259 if hasattr(filename, 'seek'):
260 filename.seek(0)
261 parser = ijson.parse(filename)
262 current = None
263 curkey = None
264 stack = []
265 nbyield = 0
266 for i, (_, event, value) in enumerate(parser):
267 if i % 1000000 == 0 and fLOG is not None:
268 fLOG( # pragma: no cover
269 f"[enumerate_json_items] i={i} yielded={nbyield}")
270 if event == "start_array":
271 if curkey is None:
272 current = []
273 else:
274 if not isinstance(current, dict):
275 raise RuntimeError( # pragma: no cover
276 f"Type issue {type(current)}")
277 c = []
278 current[curkey] = c # pylint: disable=E1137
279 current = c
280 curkey = None
281 stack.append(current)
282 elif event == "end_array":
283 stack.pop()
284 if len(stack) == 0:
285 # We should be done.
286 current = None
287 else:
288 current = stack[-1]
289 elif event == "start_map":
290 c = {}
291 if curkey is None:
292 if current is None:
293 current = []
294 current.append(c)
295 else:
296 current[curkey] = c # pylint: disable=E1137
297 stack.append(c)
298 current = c
299 curkey = None
300 elif event == "end_map":
301 stack.pop()
302 current = stack[-1]
303 if len(stack) == 1:
304 nbyield += 1
305 if flatten:
306 yield flatten_dictionary(current[-1])
307 else:
308 yield current[-1]
309 # We clear the memory.
310 current.clear()
311 elif event == "map_key":
312 curkey = value
313 elif event in {"string", "number", "boolean"}:
314 if curkey is None:
315 current.append(value)
316 else:
317 current[curkey] = value # pylint: disable=E1137
318 curkey = None
319 elif event == "null":
320 if curkey is None:
321 current.append(None)
322 else:
323 current[curkey] = None # pylint: disable=E1137
324 curkey = None
325 else:
326 raise ValueError(
327 f"Unknown event '{event}'") # pragma: no cover
330class JsonIterator2Stream:
331 """
332 Transforms an iterator on :epkg:`JSON` items
333 into a stream which returns an items as a string every time
334 method *read* is called.
335 The iterator could be one returned by @see fn enumerate_json_items.
337 :param it: iterator
338 :param kwargs: arguments to :epkg:`*py:json:dumps`
340 .. exref::
341 :title: Reshape a json file
343 The function @see fn enumerate_json_items reads any
344 :epkg:`json` even if every record is split over
345 multiple lines. Class @see cl JsonIterator2Stream
346 mocks this iterator as a stream. Each row is a single item.
348 .. runpython::
349 :showcode:
351 from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream
353 text_json = b'''
354 [
355 {
356 "glossary": {
357 "title": "example glossary",
358 "GlossDiv": {
359 "title": "S",
360 "GlossList": [{
361 "GlossEntry": {
362 "ID": "SGML",
363 "SortAs": "SGML",
364 "GlossTerm": "Standard Generalized Markup Language",
365 "Acronym": "SGML",
366 "Abbrev": "ISO 8879:1986",
367 "GlossDef": {
368 "para": "A meta-markup language, used to create markup languages such as DocBook.",
369 "GlossSeeAlso": ["GML", "XML"]
370 },
371 "GlossSee": "markup"
372 }
373 }]
374 }
375 }
376 },
377 {
378 "glossary": {
379 "title": "example glossary",
380 "GlossDiv": {
381 "title": "S",
382 "GlossList": {
383 "GlossEntry": [{
384 "ID": "SGML",
385 "SortAs": "SGML",
386 "GlossTerm": "Standard Generalized Markup Language",
387 "Acronym": "SGML",
388 "Abbrev": "ISO 8879:1986",
389 "GlossDef": {
390 "para": "A meta-markup language, used to create markup languages such as DocBook.",
391 "GlossSeeAlso": ["GML", "XML"]
392 },
393 "GlossSee": "markup"
394 }]
395 }
396 }
397 }
398 }
399 ]
400 '''
402 for item in JsonIterator2Stream(lambda: enumerate_json_items(text_json)):
403 print(item)
405 .. versionchanged:: 0.3
406 The class takes a function which outputs an iterator and not an iterator.
407 `JsonIterator2Stream(enumerate_json_items(text_json))` needs to be rewritten
408 into JsonIterator2Stream(lambda: enumerate_json_items(text_json)).
409 """
411 def __init__(self, it, **kwargs):
412 self.it = it
413 self.kwargs = kwargs
414 self.it0 = it()
416 def seek(self, offset):
417 """
418 Change the stream position to the given byte offset.
420 :param offset: offset, only 0 is implemented
421 """
422 if offset != 0:
423 raise NotImplementedError(
424 "The iterator can only return at the beginning.")
425 self.it0 = self.it()
427 def write(self):
428 """
429 The class does not write.
430 """
431 raise NotImplementedError()
433 def read(self):
434 """
435 Reads the next item and returns it as a string.
436 """
437 try:
438 value = next(self.it0)
439 return dumps(value, **self.kwargs)
440 except StopIteration:
441 return None
443 def __iter__(self):
444 """
445 Iterates on each row. The behaviour is a bit tricky.
446 It is implemented to be swalled by :func:`pandas.read_json` which
447 uses :func:`itertools.islice` to go through the items.
448 It calls multiple times `__iter__` but does expect the
449 iterator to continue from where it stopped last time.
450 """
451 for value in self.it0:
452 yield dumps(value, **self.kwargs)