Source code for pyrsslocal.xmlhelper.xmlfilewalk

"""
functions related to XML files representing objects


:githublink:`%|py|6`
"""

from pyquickhelper.loghelper.flog import GetSepLine
from .xml_tree import XMLHandlerDict, XMLIterParser


[docs]def _iteration_values(values): """ Iterators on all possible tuple of values taken into a list. Let's assume you have two rows: :: a1 a2 a3 b1 b2 The function will produce: :: a1 b1 a1 b2 a2 b1 a2 b2 a3 b1 a3 b2 The function is used by :func:`table_extraction_from_xml_files_iterator <pyrsslocal.xmlhelper.xmlfilewalk.table_extraction_from_xml_files_iterator>`. :param values: list of rows :return: iterator on rows :githublink:`%|py|37` """ co = [] for v in values: if isinstance(v, list): co.append(v) else: co.append([v]) ind = [0 for _ in co] while ind[0] < len(co[0]): line = [c[i] for c, i in zip(co, ind)] yield line ind[-1] += 1 i = len(ind) - 1 while i > 0: if ind[i] >= len(co[i]): ind[i] = 0 ind[i - 1] += 1 i -= 1
[docs]def table_extraction_from_xml_files_iterator(file, fields, log=False, fLOG=None, encoding="utf-8", errors=None): """ Goes through a XML file, extracts values and put them into an iterator. :param file: a file :param fields: list of fields to get from the XML files (see below) :param log: do logs if True :param fLOG: logging function :param errors: sent to function :epkg:`*py:library:function` :param encoding: encoding :return: iterator on lines One example for fields: :: [ ("tag1/tag2", "all"), ("tag1/tag2/tag3/_", "one"), ... ] :githublink:`%|py|80` """ fileh = open(file, "r", encoding=encoding, errors=errors) if isinstance( file, str) else file parser = XMLIterParser() handler = XMLHandlerDict(no_content=True) parser.setContentHandler(handler) fields = [(a.split("/"), b) for a, b in fields] if log: fLOG("table_extraction_from_xml_files: begin") for i_, o in enumerate(parser.parse(fileh)): values = [] nb = 0 for look, typ in fields: path = o.find_node_value(look) if typ == "one": if len(path) == 0: if log: fLOG(o.get_xml_content()) raise Exception( "unable to find a value for path %s" % "/".join(look)) val = path[0] if val is None: val = "" elif typ == "all": if len(path) == 1: val = path[0] elif len(path) == 0: val = "" else: val = path nb += 1 else: raise Exception( "the type must in (one, all) %s,%s" % (look, typ)) values.append(val) if nb == 0: line = "\t".join(values) yield line else: for v in _iteration_values(values): line = "\t".join(v) yield line if log and (i_ + 1) % 1000 == 0: fLOG("table_extraction_from_xml_files reading ", i_) if isinstance(file, str): fileh.close() if log: fLOG("table_extraction_from_xml_files: end")
[docs]def table_extraction_from_xml_files(file, output, fields, log=False, encoding="utf-8", errors=None): """ Goes through a :epkg:`XML` file, extracts values and put them into a flat file. :param file: a file :param output: output file, string or file object, :param fields: list of fields to get from the XML files :param log: do logs if True :param errors: sent to function :epkg:`*py:library:function` :param encoding: encoding One example for fields: :: [ ("tag1/tag2", "all"), ("tag1/tag2/tag3/_", "one"), ... ] :githublink:`%|py|161` """ outputh = open(output, "w", encoding=encoding, errors=errors) if isinstance(output, str) else output for line in table_extraction_from_xml_files_iterator(file, fields, log): outputh.write(line) outputh.write(GetSepLine()) if isinstance(output, str): outputh.close()
[docs]def xml_filter_iterator(file, filter_=None, log=False, xmlformat=True, fLOG=None, encoding="utf-8", errors=None): """ Goes through a :epkg:`XML` file, returns :epkg:`XML` content if a condition is verified, the result is an iterator. :param file: a file :param filter_: a function which takes a node and returns a boolean, if None, accepts everything :param log: do logs if True :param xmlformat: if True, return the xml, otherwise return the node :param fLOG: logging function :param encoding: encoding :param errors: sent to function :epkg:`*py:library:function` :return: the xml format or a node depending on thevalue of xmlformat :githublink:`%|py|186` """ if filter_ is None: def filter__(node): return True filter_ = filter__ fileh = open(file, "r", encoding=encoding, errors=errors) if isinstance( file, str) else file parser = XMLIterParser() handler = XMLHandlerDict() parser.setContentHandler(handler) for i_, o in enumerate(parser.parse(fileh)): res = filter_(o) if res: if xmlformat: yield o.get_xml_content() else: yield o if fLOG and log and (i_ + 1) % 1000 == 0: fLOG("table_extraction_from_xml_files reading ", i_) if isinstance(file, str): fileh.close() if log and fLOG: fLOG("xml_filter_iterator: end")
[docs]def xml_filter(file, output, filter_, log=False, xmlformat=True, encoding="utf-8", errors=None): """ Goes through a :epkg:`XML` file, returns :epkg:`XML` content if a condition is verified, the result is put into a stream. :param file: a file :param output: output file, string or file object :param filter_: a function which takes a node and returns a boolean :param xmlformat: if True, return the xml, otherwise return the node :param encoding: encoding :param errors: sent to function :epkg:`*py:library:function` :param log: do logs if True :githublink:`%|py|230` """ outputh = open(output, "r", encoding=encoding, errors=errors) if isinstance(output, str) else output for line in xml_filter_iterator(file, filter_, log, xmlformat): outputh.write(line) outputh.write(GetSepLine()) if isinstance(output, str): outputh.close()