Source code for ensae_projects.hackathon.json_helper
# -*- coding: utf-8 -*-
"""
Helpers for the hackathon 2017 (Label Emmaüs).
:githublink:`%|py|6`
"""
import os
from io import BytesIO
import ijson
from pyquickhelper.loghelper import noLOG
[docs]def enumerate_json_items(filename, encoding=None, fLOG=noLOG):
"""
Enumerates items from a JSON file or string.
:param filename: filename or string or stream to parse
:param encoding: encoding
:param fLOG: logging function
:return: iterator on records at first level.
It assumes the syntax follows the format: ``[ {"id":1, ...}, {"id": 2, ...}, ...]``.
.. exref::
:title: Processes a json file by streaming.
The module :epkg:`ijson` can read a JSON file by streaming.
This module is needed because a record can be written on multiple lines.
This function leverages it produces the following results.
.. runpython::
:showcode:
from ensae_projects.hackathon import enumerate_json_items
text_json = '''
[
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": [{
"GlossEntry": {
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}
}]
}
}
},
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": {
"GlossEntry": [{
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}]
}
}
}
}
]
'''
for item in enumerate_json_items(text_json):
print('------------')
print(item)
:githublink:`%|py|87`
"""
if isinstance(filename, str):
if "{" not in filename and os.path.exists(filename):
with open(filename, "rb", encoding=encoding) as f:
for el in enumerate_json_items(f, encoding=encoding, fLOG=fLOG):
yield el
elif isinstance(filename, str):
st = BytesIO(filename.encode('utf-8'))
for el in enumerate_json_items(st, encoding=encoding, fLOG=fLOG):
yield el
else:
raise TypeError(
"Unable to process type '{}'.".format(type(filename)))
else:
parser = ijson.parse(filename)
current = None
curkey = None
stack = []
nbyield = 0
for i, (_, event, value) in enumerate(parser):
if i % 1000000 == 0:
fLOG("[enumerate_json_items] i={0} yielded={1}".format(
i, nbyield))
if event == "start_array":
if curkey is None:
current = []
else:
if not isinstance(current, dict):
raise RuntimeError(
"Type issue {0}".format(type(current)))
c = []
current[curkey] = c
current = c
curkey = None
stack.append(current)
elif event == "end_array":
stack.pop()
if len(stack) == 0:
# We should be done.
current = None
else:
current = stack[-1]
elif event == "start_map":
c = {}
if curkey is None:
current.append(c)
else:
current[curkey] = c # pylint: disable=E1137
stack.append(c)
current = c
curkey = None
elif event == "end_map":
stack.pop()
current = stack[-1]
if len(stack) == 1:
nbyield += 1
yield current[-1]
# We clear the memory.
current.clear()
elif event == "map_key":
curkey = value
elif event in {"string", "number", "boolean"}:
if curkey is None:
current.append(value)
else:
current[curkey] = value # pylint: disable=E1137
curkey = None
elif event == "null":
if curkey is None:
current.append(None)
else:
current[curkey] = None # pylint: disable=E1137
curkey = None
else:
raise ValueError("Unknown event '{0}'".format(event))
[docs]def extract_images_from_json_2017(filename, encoding=None, fLOG=noLOG):
"""
Extracts fields from a JSON files such as images.
:param filename: filename
:param encoding: encoding
:param fLOG: logging function
:return: iterator on images
..warning:: Copy between two iterations?
If you plan to store the enumerated dictionaries, you should
copy them because dictionary are reused.
One example on dummy data implementing a subset of the fields
the JSON contains. This can be easily converted into a dataframe.
.. runpython::
:showcode:
from ensae_projects.hackathon import extract_images_from_json_2017
text_json = '''
[
{"assigned_images": [],
"best_offer": {"created_on": "2016-11-04T23:20:53+01:00", "images": [], "offer_longitude": null, "availability": "in_stock",
"start_selling_date": null, "delay_before_shipping": 0.00, "free_return": null, "free_shipping": null,
"assigned_images": [{"image_path": "https://coucou.JPEG"}],
"id": 1306501, "eco_tax": 0.000000, "keywords": ["boutique", "test"],
"sku": "AAAA27160018",
"product": {"pk": 2550, "external_id": null, "id": 2580},
"description": "livre l", "last_modified": "2016-11-04T23:27:01+01:00",
"name": "les names", "language": "fr"}, "id": 25540,
"description": "livre 2", "slug": "les-l",
"application_categories": [280, 283], "product_type": "physical",
"name": "les l n", "language": "fr", "popularity": 99, "gender": null
}
]
'''
items = []
for item in extract_images_from_json_2017(text_json):
print(item)
items.append(item)
from pandas import DataFrame
df = DataFrame(items)
print(df)
:githublink:`%|py|212`
"""
for record in enumerate_json_items(filename, encoding=encoding, fLOG=fLOG):
images = []
if "best_offer" in record and record["best_offer"]:
best = record["best_offer"]
if "assigned_images" in best and best["assigned_images"]:
images.extend(best["assigned_images"])
else:
continue
product = best.get("product")
if product is None:
continue
if "assigned_images" in record and record["assigned_images"]:
images.extend(record["assigned_images"])
res = {}
res["product_pk"] = product.get("pk")
res["product_id"] = product.get("id")
res["id2"] = record.get("id2")
res["sku"] = best.get("sku")
res["created_on"] = record.get("created_on")
res["keywords"] = record.get("keywords")
if isinstance(res["keywords"], list):
res['keywords'] = ";".join(res['keywords'])
res["availability"] = best.get("availability")
res["eco_tax"] = best.get("eco_tax")
res["restock_date"] = best.get("restock_date")
res["status"] = best.get("status")
res["number_of_items"] = best.get("number_of_items")
res["price_with_vat"] = best.get("price_with_vat")
res["price_without_vat"] = best.get("price_without_vat")
res["previous_price_without_vat"] = best.get(
"previous_price_without_vat")
res["max_order_quantity"] = best.get("max_order_quantity")
res["stock"] = best.get("stock")
res["start_selling_date"] = best.get("start_selling_date")
res["description"] = record.get("description")
if isinstance(res["description"], str):
res["description"] = res["description"].replace(
"\n", "\\n").replace("\t", "\\t").replace("\r", "")
res["last_modified"] = best.get("last_modified")
res["name"] = record.get("name")
res["product_type"] = record.get("product_type")
res["gender"] = record.get("gender")
res["popularity"] = record.get("popularity")
res["application_categories"] = record.get("application_categories")
if isinstance(res["application_categories"], list):
res["application_categories"] = ",".join(
map(str, res["application_categories"]))
res["language"] = record.get("language")
paths = list(im.get("image_path") for im in images)
done = set()
for p in paths:
if p and p not in done:
res["image_path"] = p
yield res
done.add(p)