Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Saves and reads a :epkg:`dataframe` into a :epkg:`zip` file. 

5""" 

6import os 

7from io import StringIO, BytesIO 

8try: 

9 from ujson import dumps 

10except ImportError: # pragma: no cover 

11 from json import dumps 

12import ijson 

13 

14 

15class JsonPerRowsStream: 

16 """ 

17 Reads a :epkg:`json` streams and adds 

18 ``,``, ``[``, ``]`` to convert a stream containing 

19 one :epkg:`json` object per row into one single :epkg:`json` object. 

20 It only implements method *readline*. 

21 

22 :param st: stream 

23 """ 

24 

25 def __init__(self, st): 

26 self.st = st 

27 self.begin = True 

28 self.newline = False 

29 self.end = True 

30 

31 def seek(self, offset): 

32 """ 

33 Change the stream position to the given byte offset. 

34 

35 :param offset: offset, only 0 is implemented 

36 """ 

37 self.st.seek(offset) 

38 

39 def readline(self, size=-1): 

40 """ 

41 Reads a line, adds ``,``, ``[``, ``]`` if needed. 

42 So the number of read characters is not recessarily 

43 the requested one but could be greater. 

44 """ 

45 text = self.st.readline(size) 

46 if size == 0: 

47 return text 

48 if self.newline: 

49 text = ',' + text 

50 self.newline = False 

51 elif self.begin: 

52 text = '[' + text 

53 self.begin = False 

54 

55 if text.endswith("\n"): 

56 self.newline = True 

57 return text 

58 if len(text) == 0 or len(text) < size: 

59 if self.end: 

60 self.end = False 

61 return text + ']' 

62 return text 

63 return text 

64 

65 def read(self, size=-1): 

66 """ 

67 Reads characters, adds ``,``, ``[``, ``]`` if needed. 

68 So the number of read characters is not recessarily 

69 the requested one but could be greater. 

70 """ 

71 text = self.st.read(size) 

72 if isinstance(text, bytes): 

73 cst = b"\n", b"\n,", b",", b"[", b"]" 

74 else: 

75 cst = "\n", "\n,", ",", "[", "]" 

76 if size == 0: 

77 return text 

78 if len(text) > 1: 

79 t1, t2 = text[:len(text) - 1], text[len(text) - 1:] 

80 t1 = t1.replace(cst[0], cst[1]) 

81 text = t1 + t2 

82 

83 if self.newline: 

84 text = cst[2] + text 

85 self.newline = False 

86 elif self.begin: 

87 text = cst[3] + text 

88 self.begin = False 

89 

90 if text.endswith(cst[0]): 

91 self.newline = True 

92 return text 

93 if len(text) == 0 or len(text) < size: 

94 if self.end: 

95 self.end = False 

96 return text + cst[4] 

97 return text 

98 return text 

99 

100 def getvalue(self): 

101 """ 

102 Returns the whole stream content. 

103 """ 

104 def byline(): 

105 line = self.readline() 

106 while line: 

107 yield line 

108 line = self.readline() 

109 return "".join(byline()) 

110 

111 

112def flatten_dictionary(dico, sep="_"): 

113 """ 

114 Flattens a dictionary with nested structure to a dictionary with no 

115 hierarchy. 

116 

117 :param dico: dictionary to flatten 

118 :param sep: string to separate dictionary keys by 

119 :return: flattened dictionary 

120 

121 Inspired from `flatten_json 

122 <https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_. 

123 """ 

124 flattened_dict = {} 

125 

126 def _flatten(obj, key): 

127 if obj is None: 

128 flattened_dict[key] = obj 

129 elif isinstance(obj, dict): 

130 for k, v in obj.items(): 

131 if not isinstance(k, str): 

132 raise TypeError( 

133 "All keys must a string.") # pragma: no cover 

134 k2 = k if key is None else "{0}{1}{2}".format(key, sep, k) 

135 _flatten(v, k2) 

136 elif isinstance(obj, (list, set)): 

137 for index, item in enumerate(obj): 

138 k2 = k if key is None else "{0}{1}{2}".format(key, sep, index) 

139 _flatten(item, k2) 

140 else: 

141 flattened_dict[key] = obj 

142 

143 _flatten(dico, None) 

144 return flattened_dict 

145 

146 

147def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fLOG=None): 

148 """ 

149 Enumerates items from a :epkg:`JSON` file or string. 

150 

151 :param filename: filename or string or stream to parse 

152 :param encoding: encoding 

153 :param lines: one record per row 

154 :param flatten: call @see fn flatten_dictionary 

155 :param fLOG: logging function 

156 :return: iterator on records at first level. 

157 

158 It assumes the syntax follows the format: ``[ {"id":1, ...}, {"id": 2, ...}, ...]``. 

159 However, if option *lines* if true, the function considers that the 

160 stream or file does have one record per row as follows: 

161 

162 {"id":1, ...} 

163 {"id": 2, ...} 

164 

165 .. exref:: 

166 :title: Processes a json file by streaming. 

167 

168 The module :epkg:`ijson` can read a :epkg:`JSON` file by streaming. 

169 This module is needed because a record can be written on multiple lines. 

170 This function leverages it produces the following results. 

171 

172 .. runpython:: 

173 :showcode: 

174 

175 from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items 

176 

177 text_json = b''' 

178 [ 

179 { 

180 "glossary": { 

181 "title": "example glossary", 

182 "GlossDiv": { 

183 "title": "S", 

184 "GlossList": [{ 

185 "GlossEntry": { 

186 "ID": "SGML", 

187 "SortAs": "SGML", 

188 "GlossTerm": "Standard Generalized Markup Language", 

189 "Acronym": "SGML", 

190 "Abbrev": "ISO 8879:1986", 

191 "GlossDef": { 

192 "para": "A meta-markup language, used to create markup languages such as DocBook.", 

193 "GlossSeeAlso": ["GML", "XML"] 

194 }, 

195 "GlossSee": "markup" 

196 } 

197 }] 

198 } 

199 } 

200 }, 

201 { 

202 "glossary": { 

203 "title": "example glossary", 

204 "GlossDiv": { 

205 "title": "S", 

206 "GlossList": { 

207 "GlossEntry": [{ 

208 "ID": "SGML", 

209 "SortAs": "SGML", 

210 "GlossTerm": "Standard Generalized Markup Language", 

211 "Acronym": "SGML", 

212 "Abbrev": "ISO 8879:1986", 

213 "GlossDef": { 

214 "para": "A meta-markup language, used to create markup languages such as DocBook.", 

215 "GlossSeeAlso": ["GML", "XML"] 

216 }, 

217 "GlossSee": "markup" 

218 }] 

219 } 

220 } 

221 } 

222 } 

223 ] 

224 ''' 

225 

226 for item in enumerate_json_items(text_json): 

227 print(item) 

228 

229 The parsed json must have an empty line at the end otherwise 

230 the following exception is raised: 

231 `ijson.common.IncompleteJSONError: ` 

232 `parse error: unallowed token at this point in JSON text`. 

233 """ 

234 if isinstance(filename, str): 

235 if "{" not in filename and os.path.exists(filename): 

236 with open(filename, "r", encoding=encoding) as f: 

237 for el in enumerate_json_items( 

238 f, encoding=encoding, lines=lines, 

239 flatten=flatten, fLOG=fLOG): 

240 yield el 

241 else: 

242 st = StringIO(filename) 

243 for el in enumerate_json_items( 

244 st, encoding=encoding, lines=lines, 

245 flatten=flatten, fLOG=fLOG): 

246 yield el 

247 elif isinstance(filename, bytes): 

248 st = BytesIO(filename) 

249 for el in enumerate_json_items( 

250 st, encoding=encoding, lines=lines, flatten=flatten, 

251 fLOG=fLOG): 

252 yield el 

253 elif lines: 

254 for el in enumerate_json_items( 

255 JsonPerRowsStream(filename), 

256 encoding=encoding, lines=False, flatten=flatten, fLOG=fLOG): 

257 yield el 

258 else: 

259 if hasattr(filename, 'seek'): 

260 filename.seek(0) 

261 parser = ijson.parse(filename) 

262 current = None 

263 curkey = None 

264 stack = [] 

265 nbyield = 0 

266 for i, (_, event, value) in enumerate(parser): 

267 if i % 1000000 == 0 and fLOG is not None: 

268 fLOG( # pragma: no cover 

269 "[enumerate_json_items] i={0} yielded={1}" 

270 "".format(i, nbyield)) 

271 if event == "start_array": 

272 if curkey is None: 

273 current = [] 

274 else: 

275 if not isinstance(current, dict): 

276 raise RuntimeError( # pragma: no cover 

277 "Type issue {0}".format(type(current))) 

278 c = [] 

279 current[curkey] = c # pylint: disable=E1137 

280 current = c 

281 curkey = None 

282 stack.append(current) 

283 elif event == "end_array": 

284 stack.pop() 

285 if len(stack) == 0: 

286 # We should be done. 

287 current = None 

288 else: 

289 current = stack[-1] 

290 elif event == "start_map": 

291 c = {} 

292 if curkey is None: 

293 if current is None: 

294 current = [] 

295 current.append(c) 

296 else: 

297 current[curkey] = c # pylint: disable=E1137 

298 stack.append(c) 

299 current = c 

300 curkey = None 

301 elif event == "end_map": 

302 stack.pop() 

303 current = stack[-1] 

304 if len(stack) == 1: 

305 nbyield += 1 

306 if flatten: 

307 yield flatten_dictionary(current[-1]) 

308 else: 

309 yield current[-1] 

310 # We clear the memory. 

311 current.clear() 

312 elif event == "map_key": 

313 curkey = value 

314 elif event in {"string", "number", "boolean"}: 

315 if curkey is None: 

316 current.append(value) 

317 else: 

318 current[curkey] = value # pylint: disable=E1137 

319 curkey = None 

320 elif event == "null": 

321 if curkey is None: 

322 current.append(None) 

323 else: 

324 current[curkey] = None # pylint: disable=E1137 

325 curkey = None 

326 else: 

327 raise ValueError("Unknown event '{0}'".format( 

328 event)) # pragma: no cover 

329 

330 

331class JsonIterator2Stream: 

332 """ 

333 Transforms an iterator on :epkg:`JSON` items 

334 into a stream which returns an items as a string every time 

335 method *read* is called. 

336 The iterator could be one returned by @see fn enumerate_json_items. 

337 

338 :param it: iterator 

339 :param kwargs: arguments to :epkg:`*py:json:dumps` 

340 

341 .. exref:: 

342 :title: Reshape a json file 

343 

344 The function @see fn enumerate_json_items reads any 

345 :epkg:`json` even if every record is split over 

346 multiple lines. Class @see cl JsonIterator2Stream 

347 mocks this iterator as a stream. Each row is a single item. 

348 

349 .. runpython:: 

350 :showcode: 

351 

352 from pandas_streaming.df.dataframe_io_helpers import enumerate_json_items, JsonIterator2Stream 

353 

354 text_json = b''' 

355 [ 

356 { 

357 "glossary": { 

358 "title": "example glossary", 

359 "GlossDiv": { 

360 "title": "S", 

361 "GlossList": [{ 

362 "GlossEntry": { 

363 "ID": "SGML", 

364 "SortAs": "SGML", 

365 "GlossTerm": "Standard Generalized Markup Language", 

366 "Acronym": "SGML", 

367 "Abbrev": "ISO 8879:1986", 

368 "GlossDef": { 

369 "para": "A meta-markup language, used to create markup languages such as DocBook.", 

370 "GlossSeeAlso": ["GML", "XML"] 

371 }, 

372 "GlossSee": "markup" 

373 } 

374 }] 

375 } 

376 } 

377 }, 

378 { 

379 "glossary": { 

380 "title": "example glossary", 

381 "GlossDiv": { 

382 "title": "S", 

383 "GlossList": { 

384 "GlossEntry": [{ 

385 "ID": "SGML", 

386 "SortAs": "SGML", 

387 "GlossTerm": "Standard Generalized Markup Language", 

388 "Acronym": "SGML", 

389 "Abbrev": "ISO 8879:1986", 

390 "GlossDef": { 

391 "para": "A meta-markup language, used to create markup languages such as DocBook.", 

392 "GlossSeeAlso": ["GML", "XML"] 

393 }, 

394 "GlossSee": "markup" 

395 }] 

396 } 

397 } 

398 } 

399 } 

400 ] 

401 ''' 

402 

403 for item in JsonIterator2Stream(lambda: enumerate_json_items(text_json)): 

404 print(item) 

405 

406 .. versionchanged:: 0.3 

407 The class takes a function which outputs an iterator and not an iterator. 

408 `JsonIterator2Stream(enumerate_json_items(text_json))` needs to be rewritten 

409 into JsonIterator2Stream(lambda: enumerate_json_items(text_json)). 

410 """ 

411 

412 def __init__(self, it, **kwargs): 

413 self.it = it 

414 self.kwargs = kwargs 

415 self.it0 = it() 

416 

417 def seek(self, offset): 

418 """ 

419 Change the stream position to the given byte offset. 

420 

421 :param offset: offset, only 0 is implemented 

422 """ 

423 if offset != 0: 

424 raise NotImplementedError( 

425 "The iterator can only return at the beginning.") 

426 self.it0 = self.it() 

427 

428 def write(self): 

429 """ 

430 The class does not write. 

431 """ 

432 raise NotImplementedError() 

433 

434 def read(self): 

435 """ 

436 Reads the next item and returns it as a string. 

437 """ 

438 try: 

439 value = next(self.it0) 

440 return dumps(value, **self.kwargs) 

441 except StopIteration: 

442 return None 

443 

444 def __iter__(self): 

445 """ 

446 Iterates on each row. The behaviour is a bit tricky. 

447 It is implemented to be swalled by :func:`pandas.read_json` which 

448 uses :func:`itertools.islice` to go through the items. 

449 It calls multiple times `__iter__` but does expect the 

450 iterator to continue from where it stopped last time. 

451 """ 

452 for value in self.it0: 

453 yield dumps(value, **self.kwargs)