Source code for pyrsslocal.xmlhelper.xml_tree
"""
parsing XML
:githublink:`%|py|6`
"""
import xml.sax.handler
import io
import xml.sax.expatreader
import xml.sax.saxutils as xsaxutils
from xml.parsers import expat
from .xml_tree_node import XMLHandlerDictNode
[docs]class XMLHandlerDict (xml.sax.handler.ContentHandler):
"""
Overloads functions about XML, it produces objects at the end
we assume the file contains a list of objects.
:githublink:`%|py|20`
"""
[docs] def __init__(self, no_content=False):
"""
:param no_content: avoid loading the content of every record
:githublink:`%|py|25`
"""
xml.sax.handler.ContentHandler.__init__(self)
self._objs = []
self._being = None
self._level = 0
self._tag = None
self._tile = []
self._pointer = None
self._forget_root = True # always True
self._no_content = no_content
self._prepare_stringio()
[docs] def _prepare_stringio(self):
"""
prepare the StringIO stream
:githublink:`%|py|39`
"""
if not self._no_content:
self._xmlio = io.StringIO()
self._xmlgen = xsaxutils.XMLGenerator(self._xmlio, "utf8")
self._xmlgen.startDocument()
else:
self._xmlgen = None
[docs] def startElement(self, name, attrs):
"""
When enters a section.
:githublink:`%|py|51`
"""
if self._level == 0 and self._forget_root:
self._level = 1
return
if self._xmlgen is not None:
self._xmlgen.startElement(name, attrs)
self._tile.append(name)
if self._being is None:
self._tag = name
self._being = XMLHandlerDictNode(
None, name, self._level, root=True)
self._pointer = self._being
else:
node = XMLHandlerDictNode(
self._pointer, name, self._level, root=False)
self._pointer.set(name, node)
self._pointer = node
for k in attrs.getNames():
self._pointer.set(k, attrs[k].strip())
self._level += 1
[docs] def endElement(self, name):
"""
After a tag.
:githublink:`%|py|78`
"""
if len(self._tile) == 0:
return
if self._xmlgen is not None:
self._xmlgen.endElement(name)
self._pointer.strip()
self._tile.pop()
self._level -= 1
if len(self._tile) == 0:
self._being.rearrange()
if self._xmlgen is not None:
self._xmlgen.endDocument()
self._xmlio.write("\n")
content = self._xmlio.getvalue()
if content.startswith("<?xml"):
end = content.find("\n") + 1
if len(content) > end and content[end] == "\n":
end += 1
content = content[end:]
else:
content = ""
if isinstance(content, bytes):
raise AssertionError( # pragma: no cover
"this should not happen")
self._being.add_xml_content(content)
self._objs.append(self._being)
self._being = None
self._pointer = None
self._prepare_stringio()
else:
self._pointer = self._pointer.father
[docs] def characters(self, content):
"""
Adds characters.
:githublink:`%|py|117`
"""
if self._xmlgen is not None:
self._xmlgen.characters(content)
if self._pointer is not None:
self._pointer.buffer += content
# iteration version
[docs]class XMLIterParser(xml.sax.expatreader.ExpatParser):
"""
To use a parser like an iterator.
Example:
::
zxml = \"\"\"
<mixed engine___="conf1" fid="3" grade___="Fair" query___="queryA" rank="3">
<urls>
<url___>http://www.shop.com/Soloxine_1_0mg_Tab-181378988-214010464-p!.shtml</url___>
<url___>http://fake</url___>
</urls>
</mixed>
<mixed engine___="conf1" fid="4" grade___="Good" query___="queryA" rank="4"
url___="http%3A//www.lamars.com/products/nutrition.html" />
\"\"\"
zxml = "<root>%s</root>" % zxml
f = StringIO.StringIO (zxml)
assert len(f.getvalue()) > 0
parser = XMLIterParser()
handler = XMLHandlerDict(no_content = False)
parser.setContentHandler(handler)
nb = 0
for o in parser.parse(f) :
assert o["query___"] == "queryA"
nb += 1
assert nb > 0
:githublink:`%|py|158`
"""
[docs] def __init__(self, namespaceHandling=0, bufsize=2 ** 17):
if bufsize is None:
bufsize = 2 ** 17
xml.sax.expatreader.ExpatParser.__init__(
self,
namespaceHandling=namespaceHandling,
bufsize=bufsize)
[docs] def parse(self, source):
"""
Parses an :epkg:`XML` document from a URL or an *InputSource*.
:param source: a file or a stream
:githublink:`%|py|172`
"""
source0 = source
source = xsaxutils.prepare_input_source(source)
self._source = source
self.reset()
self._cont_handler.setDocumentLocator(
xml.sax.expatreader.ExpatLocator(self))
# xmlreader.IncrementalParser.parse(self, source)
# source = xsaxutils.prepare_input_source(source)
self.prepareParser(source)
file_char = source.getCharacterStream()
if file_char is None:
file_bytes = source.getByteStream()
file = file_bytes
else:
file = file_char
if file is None:
raise FileNotFoundError( # pragma: no cover
"File is None, it should not, source='{0}'\n{1}".format(
source0, source0.name))
buffer = file.read(self._bufsize)
isFinal = 0
while buffer != "" or isFinal == 0:
# self.feed(buffer)
data = buffer
isFinal = 1 if len(buffer) == 0 else 0
if not self._parsing:
self.reset()
self._parsing = 1
self._cont_handler.startDocument()
try:
# The isFinal parameter is internal to the expat reader.
# If it is set to true, expat will check validity of the entire
# document. When feeding chunks, they are not normally final -
# except when invoked from close.
self._parser.Parse(data, isFinal)
for o in self._cont_handler._objs:
yield o
del self._cont_handler._objs[:]
except expat.error as e: # pragma: no cover
exc = xml.sax.SAXParseException(
expat.ErrorString(
e.code),
e,
self)
self._err_handler.fatalError(exc)
buffer = file.read(self._bufsize)
# self.close()
self._cont_handler.endDocument()
self._parsing = 0
# break cycle created by expat handlers pointing to our methods
self._parser = None
for o in self._cont_handler._objs:
yield o
del self._cont_handler._objs[:]