Source code for pyrsslocal.xmlhelper.xml_tree

"""
parsing XML


:githublink:`%|py|6`
"""

import xml.sax.handler
import io
import xml.sax.expatreader
import xml.sax.saxutils as xsaxutils
from xml.parsers import expat

from .xml_tree_node import XMLHandlerDictNode


[docs]class XMLHandlerDict (xml.sax.handler.ContentHandler): """ Overloads functions about XML, it produces objects at the end we assume the file contains a list of objects. :githublink:`%|py|20` """
[docs] def __init__(self, no_content=False): """ :param no_content: avoid loading the content of every record :githublink:`%|py|25` """ xml.sax.handler.ContentHandler.__init__(self) self._objs = [] self._being = None self._level = 0 self._tag = None self._tile = [] self._pointer = None self._forget_root = True # always True self._no_content = no_content self._prepare_stringio()
[docs] def _prepare_stringio(self): """ prepare the StringIO stream :githublink:`%|py|39` """ if not self._no_content: self._xmlio = io.StringIO() self._xmlgen = xsaxutils.XMLGenerator(self._xmlio, "utf8") self._xmlgen.startDocument() else: self._xmlgen = None
[docs] def startElement(self, name, attrs): """ When enters a section. :githublink:`%|py|51` """ if self._level == 0 and self._forget_root: self._level = 1 return if self._xmlgen is not None: self._xmlgen.startElement(name, attrs) self._tile.append(name) if self._being is None: self._tag = name self._being = XMLHandlerDictNode( None, name, self._level, root=True) self._pointer = self._being else: node = XMLHandlerDictNode( self._pointer, name, self._level, root=False) self._pointer.set(name, node) self._pointer = node for k in attrs.getNames(): self._pointer.set(k, attrs[k].strip()) self._level += 1
[docs] def endElement(self, name): """ After a tag. :githublink:`%|py|78` """ if len(self._tile) == 0: return if self._xmlgen is not None: self._xmlgen.endElement(name) self._pointer.strip() self._tile.pop() self._level -= 1 if len(self._tile) == 0: self._being.rearrange() if self._xmlgen is not None: self._xmlgen.endDocument() self._xmlio.write("\n") content = self._xmlio.getvalue() if content.startswith("<?xml"): end = content.find("\n") + 1 if len(content) > end and content[end] == "\n": end += 1 content = content[end:] else: content = "" if isinstance(content, bytes): raise AssertionError("this should not happen") self._being.add_xml_content(content) self._objs.append(self._being) self._being = None self._pointer = None self._prepare_stringio() else: self._pointer = self._pointer.father
[docs] def characters(self, content): """ Adds characters. :githublink:`%|py|116` """ if self._xmlgen is not None: self._xmlgen.characters(content) if self._pointer is not None: self._pointer.buffer += content
# iteration version
[docs]class XMLIterParser(xml.sax.expatreader.ExpatParser): """ To use a parser like an iterator. Example: :: zxml = \"\"\" <mixed engine___="conf1" fid="3" grade___="Fair" query___="queryA" rank="3"> <urls> <url___>http://www.shop.com/Soloxine_1_0mg_Tab-181378988-214010464-p!.shtml</url___> <url___>http://fake</url___> </urls> </mixed> <mixed engine___="conf1" fid="4" grade___="Good" query___="queryA" rank="4" url___="http%3A//www.lamars.com/products/nutrition.html" /> \"\"\" zxml = "<root>%s</root>" % zxml f = StringIO.StringIO (zxml) assert len(f.getvalue()) > 0 parser = XMLIterParser() handler = XMLHandlerDict(no_content = False) parser.setContentHandler(handler) nb = 0 for o in parser.parse(f) : assert o["query___"] == "queryA" nb += 1 assert nb > 0 :githublink:`%|py|157` """
[docs] def __init__(self, namespaceHandling=0, bufsize=2 ** 17): if bufsize is None: bufsize = 2 ** 17 xml.sax.expatreader.ExpatParser.__init__( self, namespaceHandling=namespaceHandling, bufsize=bufsize)
[docs] def parse(self, source): """ Parses an :epkg:`XML` document from a URL or an *InputSource*. :param source: a file or a stream :githublink:`%|py|171` """ source0 = source source = xsaxutils.prepare_input_source(source) self._source = source self.reset() self._cont_handler.setDocumentLocator( xml.sax.expatreader.ExpatLocator(self)) # xmlreader.IncrementalParser.parse(self, source) # source = xsaxutils.prepare_input_source(source) self.prepareParser(source) file_char = source.getCharacterStream() if file_char is None: file_bytes = source.getByteStream() file = file_bytes else: file = file_char if file is None: raise FileNotFoundError( "file is None, it should not, source={0}\n{1}".format(source0, source0.name)) buffer = file.read(self._bufsize) isFinal = 0 while buffer != "" or isFinal == 0: # self.feed(buffer) data = buffer isFinal = 1 if len(buffer) == 0 else 0 if not self._parsing: self.reset() self._parsing = 1 self._cont_handler.startDocument() try: # The isFinal parameter is internal to the expat reader. # If it is set to true, expat will check validity of the entire # document. When feeding chunks, they are not normally final - # except when invoked from close. self._parser.Parse(data, isFinal) for o in self._cont_handler._objs: yield o del self._cont_handler._objs[:] except expat.error as e: exc = xml.sax.SAXParseException( expat.ErrorString( e.code), e, self) self._err_handler.fatalError(exc) buffer = file.read(self._bufsize) # self.close() self._cont_handler.endDocument() self._parsing = 0 # break cycle created by expat handlers pointing to our methods self._parser = None for o in self._cont_handler._objs: yield o del self._cont_handler._objs[:]