Coverage for src/pyrsslocal/xmlhelper/xml_tree.py: 95%
112 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-04-30 08:45 +0200
« prev ^ index » next coverage.py v7.1.0, created at 2024-04-30 08:45 +0200
1"""
2@file
4@brief parsing XML
5"""
7import xml.sax.handler
8import io
9import xml.sax.expatreader
10import xml.sax.saxutils as xsaxutils
11from xml.parsers import expat
13from .xml_tree_node import XMLHandlerDictNode
16class XMLHandlerDict (xml.sax.handler.ContentHandler):
17 """
18 Overloads functions about XML, it produces objects at the end
19 we assume the file contains a list of objects.
20 """
22 def __init__(self, no_content=False):
23 """
24 @param no_content avoid loading the content of every record
25 """
26 xml.sax.handler.ContentHandler.__init__(self)
27 self._objs = []
28 self._being = None
29 self._level = 0
30 self._tag = None
31 self._tile = []
32 self._pointer = None
33 self._forget_root = True # always True
34 self._no_content = no_content
35 self._prepare_stringio()
37 def _prepare_stringio(self):
38 """prepare the StringIO stream
39 """
41 if not self._no_content:
42 self._xmlio = io.StringIO()
43 self._xmlgen = xsaxutils.XMLGenerator(self._xmlio, "utf8")
44 self._xmlgen.startDocument()
45 else:
46 self._xmlgen = None
48 def startElement(self, name, attrs):
49 """
50 When enters a section.
51 """
52 if self._level == 0 and self._forget_root:
53 self._level = 1
54 return
56 if self._xmlgen is not None:
57 self._xmlgen.startElement(name, attrs)
59 self._tile.append(name)
60 if self._being is None:
61 self._tag = name
62 self._being = XMLHandlerDictNode(
63 None, name, self._level, root=True)
64 self._pointer = self._being
65 else:
66 node = XMLHandlerDictNode(
67 self._pointer, name, self._level, root=False)
68 self._pointer.set(name, node)
69 self._pointer = node
71 for k in attrs.getNames():
72 self._pointer.set(k, attrs[k].strip())
73 self._level += 1
75 def endElement(self, name):
76 """
77 After a tag.
78 """
79 if len(self._tile) == 0:
80 return
82 if self._xmlgen is not None:
83 self._xmlgen.endElement(name)
85 self._pointer.strip()
86 self._tile.pop()
87 self._level -= 1
88 if len(self._tile) == 0:
89 self._being.rearrange()
90 if self._xmlgen is not None:
91 self._xmlgen.endDocument()
92 self._xmlio.write("\n")
93 content = self._xmlio.getvalue()
94 if content.startswith("<?xml"):
95 end = content.find("\n") + 1
96 if len(content) > end and content[end] == "\n":
97 end += 1
98 content = content[end:]
99 else:
100 content = ""
102 if isinstance(content, bytes):
103 raise AssertionError( # pragma: no cover
104 "this should not happen")
106 self._being.add_xml_content(content)
107 self._objs.append(self._being)
108 self._being = None
109 self._pointer = None
110 self._prepare_stringio()
111 else:
112 self._pointer = self._pointer.father
114 def characters(self, content):
115 """
116 Adds characters.
117 """
118 if self._xmlgen is not None:
119 self._xmlgen.characters(content)
121 if self._pointer is not None:
122 self._pointer.buffer += content
124# iteration version
127class XMLIterParser(xml.sax.expatreader.ExpatParser):
129 """
130 To use a parser like an iterator.
131 Example:
133 ::
135 zxml = \"\"\"
136 <mixed engine___="conf1" fid="3" grade___="Fair" query___="queryA" rank="3">
137 <urls>
138 <url___>http://www.shop.com/Soloxine_1_0mg_Tab-181378988-214010464-p!.shtml</url___>
139 <url___>http://fake</url___>
140 </urls>
141 </mixed>
142 <mixed engine___="conf1" fid="4" grade___="Good" query___="queryA" rank="4"
143 url___="http%3A//www.lamars.com/products/nutrition.html" />
144 \"\"\"
146 zxml = "<root>%s</root>" % zxml
147 f = StringIO.StringIO (zxml)
148 assert len(f.getvalue()) > 0
150 parser = XMLIterParser()
151 handler = XMLHandlerDict(no_content = False)
152 parser.setContentHandler(handler)
153 nb = 0
154 for o in parser.parse(f) :
155 assert o["query___"] == "queryA"
156 nb += 1
157 assert nb > 0
158 """
160 def __init__(self, namespaceHandling=0, bufsize=2 ** 17):
161 if bufsize is None:
162 bufsize = 2 ** 17
163 xml.sax.expatreader.ExpatParser.__init__(
164 self,
165 namespaceHandling=namespaceHandling,
166 bufsize=bufsize)
168 def parse(self, source):
169 """
170 Parses an :epkg:`XML` document from a URL or an *InputSource*.
171 @param source a file or a stream
172 """
173 source0 = source
174 source = xsaxutils.prepare_input_source(source)
176 self._source = source
177 self.reset()
178 self._cont_handler.setDocumentLocator(
179 xml.sax.expatreader.ExpatLocator(self))
181 # xmlreader.IncrementalParser.parse(self, source)
182 # source = xsaxutils.prepare_input_source(source)
184 self.prepareParser(source)
185 file_char = source.getCharacterStream()
186 if file_char is None:
187 file_bytes = source.getByteStream()
188 file = file_bytes
189 else:
190 file = file_char
192 if file is None:
193 raise FileNotFoundError( # pragma: no cover
194 "File is None, it should not, source='{0}'\n{1}".format(
195 source0, source0.name))
197 buffer = file.read(self._bufsize)
198 isFinal = 0
199 while buffer != "" or isFinal == 0:
200 # self.feed(buffer)
201 data = buffer
202 isFinal = 1 if len(buffer) == 0 else 0
204 if not self._parsing:
205 self.reset()
206 self._parsing = 1
207 self._cont_handler.startDocument()
209 try:
210 # The isFinal parameter is internal to the expat reader.
211 # If it is set to true, expat will check validity of the entire
212 # document. When feeding chunks, they are not normally final -
213 # except when invoked from close.
214 self._parser.Parse(data, isFinal)
216 for o in self._cont_handler._objs:
217 yield o
218 del self._cont_handler._objs[:]
220 except expat.error as e: # pragma: no cover
221 exc = xml.sax.SAXParseException(
222 expat.ErrorString(
223 e.code),
224 e,
225 self)
226 self._err_handler.fatalError(exc)
228 buffer = file.read(self._bufsize)
230 # self.close()
231 self._cont_handler.endDocument()
232 self._parsing = 0
233 # break cycle created by expat handlers pointing to our methods
234 self._parser = None
236 for o in self._cont_handler._objs:
237 yield o
238 del self._cont_handler._objs[:]