"""
parsing XML
:githublink:`%|py|5`
"""
import copy
from pyquickhelper.loghelper.flog import guess_type_list, guess_type_value_type
from .xml_utils import escape
from .xml_exceptions import XmlException
[docs]class XMLHandlerDictNode(dict):
"""
Defines a node containing a dictionary.
.. list-table::
:widths: auto
:header-rows: 1
* - attribute
- meaning
* - father
- ancestor
* - name
- name of the section (tag)
* - buffer
- value or content of the section
* - level
- level in the hierarchy
* - other
- included sections
:githublink:`%|py|22`
"""
[docs] def __init__(self, father, name, level, root=False):
"""
:param father: father
:param name: node name
:param level: could be infered but still
:param root: is it the root
:githublink:`%|py|30`
"""
dict.__init__(self)
self.father = father
self.name = name
self.buffer = ""
self.level = level
if father is None and not root:
raise XmlException(
"father is None and root is False, name = %s level = %d" %
(name, level))
self.other = []
[docs] def __cmp__(self, other):
a, b = id(self), id(other)
if a < b:
return -1
elif a == b:
return 0
else:
return 1
[docs] def __lt__(self, other):
return self.__cmp__(other) == -1
[docs] def enumerate_on_tag(self, tag, recursive=False):
"""
Enumerates all nodes sharing the same name: tag.
:param tag: node name to enumerate on
:param recursive: if True, looks into node (name == tag) if there are
sub-node with the same name
:return: enumeration on node
:githublink:`%|py|62`
"""
if self.name == tag:
yield self
for o in self.other:
if isinstance(o, tuple):
if recursive:
for _ in o[1].enumerate_on_tag(tag):
yield _
else:
yield o[1]
else:
for _ in o.enumerate_on_tag(tag):
yield _
[docs] def add_xml_content(self, content):
"""
Adds the content of the node itself (and all included nodes).
:githublink:`%|py|79`
"""
self.xmlcontent = content
[docs] def get_xml_content(self):
"""
:return: self.xmlcontent
:githublink:`%|py|85`
"""
return self.xmlcontent if "xmlcontent" in self.__dict__ else None
[docs] def __str__(self):
"""
usual
:githublink:`%|py|91`
"""
mx = 0
for k in self:
mx = max(len(k), mx)
head = self.level * " "
pile = [head + "*" + self.name]
try:
buf = str(self.buffer) \
if self.buffer[0] in guess_type_value_type() \
else self.buffer
except IndexError:
buf = str(self.buffer)
if len(buf) > 0:
t = " " * (mx - len("lst") + 1)
ty = self.__dict__.get("conversion_table", {}).get(self.name, "")
if ty != "":
ty = " \t(%s)" % str(ty)
if isinstance(buf, (list, tuple)):
pile.append(head + " lst" + t + ": " + str(repr(buf) + ty))
else:
pile.append(head + " val" + t + ": " + buf + ty)
for k in sorted(self):
v = self[k]
vs = str(v) if v in guess_type_value_type() or isinstance(
v,
tuple) else v
ty = self.__dict__.get("conversion_table", {}).get(k, "")
if ty != "":
ty = " \t(%s)" % str(ty)
if isinstance(vs, str):
t = " " * (mx - len(k) + 1)
pile.append(head + " " + k + t + ": " + vs + ty)
elif isinstance(vs, list):
t = " " * (mx - len(k) + 1)
pile.append(head + " " + k + t + ": " + str(repr(vs)) + ty)
elif isinstance(vs, tuple):
pile.append("-" + str(vs) + ty)
else:
pile.append(str(vs) + ty)
if len(self.other) > 0:
pile.append(head + " ----")
soro = sorted(copy.copy(self.other))
for k, v in soro:
temp = str(v)
star = temp.find("*")
if star != -1 and "_othercount" in self.__dict__:
temp = "%s*(%d) %s" % (temp[:star],
self._othercount.get(k, -1), temp[star + 1:])
pile.append(temp)
return "\n".join(pile)
[docs] def strip(self):
"""
Strips the buffer.
:githublink:`%|py|150`
"""
self.buffer = self.buffer.strip()
[docs] def copy(self):
"""
Gets a copy.
:githublink:`%|py|156`
"""
u = XMLHandlerDictNode(self, self.father, self.name, self.level)
u.buffer = self.buffer
u.level = self.level
return u
[docs] def set(self, i, v):
"""
Changes the value of a field.
:param i: field
:param v: new value
:githublink:`%|py|167`
"""
if i in self:
if isinstance(v, XMLHandlerDictNode):
self.other.append((i, v))
return v
else:
raise XmlException(
"unable to append a new string value for an existing field %s:%s" %
(i, v))
else:
self[i] = v
return self
[docs] def is_text_only(self):
"""
Returns True if it only contains text.
:githublink:`%|py|183`
"""
if len(self.other) > 0:
return False
if len(self) > 1:
return False
for k, v in self.items():
if k != self.name:
return False
if not isinstance(v, str):
return False
return True
[docs] def rearrange(self, debug=False):
"""
Moves all objects to other.
:githublink:`%|py|198`
"""
# check level
if self.father is not None:
self.level = self.father.level + 1
# is is_text_only --> fill buffer, clean the rest
if self.is_text_only() and len(self) == 1:
k = self.keys()[0]
self.buffer = self[k]
self.clear()
return
# values in self.keys also in other --> all in other
# unique values in other and if text --> self
count = {}
for k, v in self.other:
count[k] = 0
v.rearrange()
for k, v in self.other:
count[k] += 1
move = [k for k, v in count.items() if v == 1]
keys = {}
for m in move:
keys[m] = None
mult = []
rem = []
i = 0
for k, v in self.other:
if k in keys and v.is_text_only():
if k in self:
if k not in mult:
tempv = self[k]
if isinstance(tempv, str):
tempv = XMLHandlerDictNode(self, k, self.level + 1)
tempv.buffer = self[k]
mult.append((k, tempv))
else:
self[k] = v.buffer
rem.append(i)
i += 1
mult.reverse()
for m in mult:
self.other.insert(0, m)
del self[m[0]]
rem.reverse()
for e in rem:
del self.other[e]
# in case of self contains object --> other
rem = []
for k, v in self.items():
if not isinstance(v, str) and not isinstance(v, list):
v.rearrange(debug=True)
if not v.is_text_only():
self.other.append((k, v))
rem.append(k)
else:
self[k] = v.buffer
for k in rem:
del self[k]
# in case other already contains some objects of the same kind
rem = []
count = {}
for k, v in self.other:
count[k] = 1
for k, v in self.items():
if k in count:
if isinstance(v, str):
node = XMLHandlerDictNode(self, k, self.level + 1, False)
node.buffer = v
self.other.append((k, node))
else:
self.other.append((k, v))
rem.append(k)
for k in rem:
del self[k]
# last check
if len(self) == 1:
# self.popitem(), strange it works in version 2
k, _ = list(self.items())[0]
if k == self.name:
self.buffer = self[k]
del self[k]
[docs] def get_xml_output(self):
"""
:return: an XML output (all lines terminated by end_of_line
:githublink:`%|py|292`
"""
att = [""] + ["%s=\"%s\"" %
(k, escape(self[k])) for k in sorted(self) if len(self[k]) <= 20]
att = " ".join(att)
lev = max(self.level - 1, 0)
lev = " " * lev
if len(self.other) == 0:
if len(self.buffer) == 0:
return "%s<%s%s />\n" % (lev, self.name, att)
else:
return "%s<%s%s>%s</%s>\n" % (lev,
self.name, att, self.buffer, self.name)
else:
res = ["%s<%s%s>\n" % (lev, self.name, att)]
if len(self.buffer) > 0:
res.append("%s%s\n" % (lev, escape(self.buffer)))
for k in sorted(self):
v = self[k]
if len(v) <= 20:
continue
res.append("%s<%s>\n" % (lev, k))
res.append("%s%s\n" % (lev, escape(v)))
res.append("%s</%s>\n" % (lev, k))
other = sorted(copy.copy(self.other))
for k, v in other:
res.append(v.get_xml_output())
res.append("%s</%s>\n" % (lev, self.name))
return "".join(res)
[docs] def get_values(self, field):
"""
Gets all values associated to a given field name.
:param field: field name
:return: list of [ key, value ]
:githublink:`%|py|329`
"""
res = []
if self.name == field:
res.append((("", -1), self.buffer))
for k, v in self.items():
if k == field:
res.append(((k, -1), v))
i = 0
for k, v in self.other:
temp = v.get_values(field)
for a, b in temp:
res.append(((k, i) + a, b))
i += 1
return res
[docs] def get_values_group(self, fields, nb=1):
"""
Gets all values associated to a list of fields
(must come together in a single node, not in *self.other*).
:param fields: fields name (list or dictionary)
:param nb: at least nb fields must be filled
:return: list of dictionaries
:githublink:`%|py|354`
"""
res = []
if self.name in fields:
res.append((self.name, self.buffer))
for k, v in self.items():
if k in fields:
res.append((k, v))
if len(res) >= nb:
temp = {}
for k, v in res:
if k in temp:
raise XmlException("field %s already present in '%s' (full name '%s')" % (
k, ", ".join(temp.keys()), "/".join(self.get_full_name())))
temp[k] = v
for f in fields:
if f not in temp:
temp[f] = None
res = [((self.name, -1), temp)]
else:
res = []
i = 0
for k, v in self.other:
temp = v.get_values_group(fields, nb)
for a, b in temp:
res.append(((k, i) + a, b))
i += 1
return res
[docs] def _convert_into_list(self):
"""
Converts all types into lists.
:githublink:`%|py|389`
"""
if isinstance(self.buffer, str):
self.buffer = [self.buffer]
for k in self:
v = self[k]
if isinstance(v, str):
self[k] = [v]
for k, v in self.other:
v._convert_into_list()
[docs] def __iadd__(self, other):
"""
Concatenates every information.
:param other: other value to concatenate
:return: self
:githublink:`%|py|406`
"""
self.iadd(other, False, False)
return self
[docs] def iadd(self, other, use_list, collapse):
"""
Concatenates every information.
:param other: other value to concatenate
:param use_list: use a list or not
:param collapse: collapse all information
:return: self
:githublink:`%|py|417`
"""
if self.name != other.name:
raise XmlException("the two names should be equal %s != %s full names (%s != %s)" % (
self.name, other.name, "/".join(self.get_full_name()), "/".join(other.get_full_name())))
# _othercount
if "_othercount" not in self.__dict__:
self._othercount = {}
# next
if use_list:
self._convert_into_list()
if use_list:
if isinstance(other.buffer, list):
self.buffer.extend(other.buffer)
else:
self.buffer.append(other.buffer)
else:
self.buffer += other.buffer
for k, v in other.items():
if k not in self:
if use_list:
if isinstance(v, list):
self[k] = v
else:
self[k] = [v]
else:
self[k] = v
else:
if use_list:
if isinstance(v, list):
self[k].extend(v)
else:
self[k].append(v)
else:
self[k] += v
# count the number
selfcount = {}
othcount = {}
for k, v in self.other:
if k in selfcount:
selfcount[k] += 1
else:
selfcount[k] = 1
self._othercount[k] = max(self._othercount.get(k, 0), selfcount[k])
for k, v in other.other:
if k in othcount:
othcount[k] += 1
else:
othcount[k] = 1
self._othercount[k] = max(self._othercount.get(k, 0), othcount[k])
if "_othercount" in other.__dict__:
for k, v in other._othercount.items():
self._othercount[k] = max(self._othercount.get(k, 0), v)
# iadd single elements + append other from others
for node in other.other:
ok = False
for n in self.other:
if node[0] != n[0]:
continue
key = node[0]
if selfcount.get(key, 0) == othcount.get(key, 0) == 1:
n[1].iadd(node[1], use_list=use_list, collapse=collapse)
ok = True
break
if not ok:
nt = copy.deepcopy(node)
nt[1].parent = self
nt[1]._build_othercount()
if use_list:
nt[1]._convert_into_list()
if collapse:
nt[1]._collapse(use_list)
self.other.append(nt)
k = node[0]
# count
count = {}
for k, v in self.other:
count[k] = count.get(k, 0) + 1
# transfert from dict self if a key is present in self.other
rem = []
for k, v in self.items():
if k in count:
tn = XMLHandlerDictNode(self, k, self.level + 1, False)
tn._build_othercount()
if use_list:
tn._convert_into_list()
tn.buffer = [v] if use_list and not isinstance(v, list) else v
self.other.append((k, tn))
rem.append(k)
self._othercount[k] = self._othercount.get(k, 0) + 1
for k in rem:
del self[k]
# count again
count = {}
for k, v in self.other:
if isinstance(v, str):
count[k, 0] = count.get((k, 0), 0) + 1
else:
count[k, 1] = count.get((k, 1), 0) + 1
# string to object
for i, tu in enumerate(self.other):
k, v = tu
if isinstance(v, str) and count.get((k, 1), 0) > 0:
tn = XMLHandlerDictNode(self, k, self.level + 1, False)
tn._build_othercount()
tn.buffer = [v] if use_list else v
self.other[i] = (k, tn)
# collapsing
if collapse:
self._collapse(use_list)
[docs] def _build_othercount(self):
"""
Builds *_othercount* when not present.
:githublink:`%|py|544`
"""
if "_othercount" not in self.__dict__:
self._othercount = {}
for k, v in self.other:
self._othercount[k] = self._othercount.get(k, 0) + 1
v._build_othercount()
[docs] def _collapse(self, use_list):
"""
Collapses together all fields having the same name
in the member other.
.. warning:: it should be called after iadd
:githublink:`%|py|556`
"""
names = {}
for k, v in self.other:
if k in names:
names[k].append(v)
else:
names[k] = [v]
del self.other[:]
for k, lv in names.items():
if len(lv) > 1:
self._othercount[k] = max(self._othercount.get(k, 0), len(lv))
for i in range(1, len(lv)):
lv[0].iadd(lv[i], use_list=use_list, collapse=True)
self.other.append((k, lv[0]))
else:
lv[0]._collapse(use_list)
self.other.append((k, lv[0]))
#self._check_ (False)
[docs] def _check_(self, add_root_id):
"""
some checking
:githublink:`%|py|578`
"""
count = {}
# if add_root_id and "add_root_id" not in self.__dict__ :
# fLOG (self)
# raise Exception ("unable to find add_root_id in '%s'" % self.get_full_name ())
if "_othercount" not in self.__dict__:
raise XmlException("unable to find _othercount in '%s'" %
"/".join(self.get_full_name()))
for k, v in self.other:
count[k] = count.get(k, 0) + 1
if len(count) > 0:
if max(count.values()) > 1:
raise XmlException("max (count.values ()) > 1 in '%s' \nexp: %s" % (
"/".join(self.get_full_name()), str(count)))
for k, v in self.other:
if isinstance(v, list):
for _ in v:
_._check_(add_root_id)
else:
v._check_(add_root_id)
[docs] def _guess_type(self, tolerance=0.01, utf8=False):
"""
Replaces all values in the object.
:param tolerance: :func:`guess_type_list`
:param utf8: if True, all types are str
.. warning:: it should be called after _collapse
:githublink:`%|py|605`
"""
self.buffer = (str, 10) if utf8 else guess_type_list(self.buffer)
for k in self:
self[k] = (str, 10) if utf8 else guess_type_list(self[k])
for k, v in self.other:
v._guess_type(utf8)
[docs] def find_node(self, li):
"""
:param li: list of names
:return: a list of nodes which correspond to the list of names
:githublink:`%|py|616`
"""
node = [self]
for el in li:
temp = []
for n in node:
for k, v in n.other:
if k == el:
temp.append(v)
node = temp
return node
[docs] def find_node_value(self, li):
"""
:param li: list of names
:return: a list of values
:githublink:`%|py|632`
"""
path = li if isinstance(li, list) else li.split("/")
way, last = path[:-1], path[-1]
if len(way) > 0 and way[0] == self.name:
del way[0]
res = []
node = self.find_node(way)
for n in node:
if last == "_":
res.append(n.buffer)
else:
res.append(n.get(last, None))
return res
[docs] def get_full_name(self):
"""
:return: the list of self.name from all parents
:githublink:`%|py|651`
"""
li = [self.name]
node = self
while node.father is not None:
node = node.father
li.append(node.name)
li.reverse()
return li
[docs] def _log_error(self):
"""
logs an object from the root if not already done
:githublink:`%|py|663`
"""
root = self.get_root()
if "_logged" in root.__dict__:
return
root._logged = True
[docs] def _adopt_table(self, tbl, exception):
"""
Adopts a table built on anoher object.
:param tbl: same kind of node but including members:
- table
- conversion_table
:param exception: if True, raises an exception, log otherwise
.. warning:: The method could change the object itself if it does not fit.
.. warning:: The method adds members 'conversion_table', 'add_root_id'
:githublink:`%|py|680`
"""
self.conversion_table = tbl.conversion_table # field conversion
self.add_root_id = tbl.add_root_id
memo = {}
for k, v in tbl.other:
memo[k] = v
rem = []
for k in self:
if k not in tbl.conversion_table:
if len(self[k]) == 0:
continue
if k not in memo:
self._log_error()
if exception:
raise XmlException(
"a field '%s' is not provided by the reference (path: %s)\nmemo.keys(): %s" %
(k, "/".join(
self.get_full_name()), str(
memo.keys())))
tn = XMLHandlerDictNode(self, k, self.level + 1, False)
v = self[k]
tn.buffer = v
self.other.append((k, tn))
rem.append(k)
for k in rem:
del self[k]
count = {}
for k, v in self.other:
if k in count:
count[k] += 1
else:
count[k] = 1
# checking if relation 11 are ok with this object
if "_othercount" not in tbl.__dict__:
raise XmlException("we expect _othercount to be here")
for k, v in count.items():
if k not in tbl._othercount:
self._log_error()
if exception:
raise XmlException("unable to find field '%s' (1:n) in path '%s'" % (
k, "/".join(self.get_full_name())))
elif v > 1 and tbl._othercount[k] <= 1: # pylint: disable=R1716
self._log_error()
if exception:
raise XmlException("we expect a relation 1:1 for field '%s' in path '%s'" % (
k, "/".join(self.get_full_name())))
# next
for k, v in self.other:
if k not in memo:
# fLOG("ERROR: unable to find field '%s' (1:n) in path '%s'" %
# (k, "/".join(self.get_full_name())))
self._log_error()
else:
v._adopt_table(memo[k], exception=exception)
[docs] def _transfer_to_object(self, root=True, exception=True):
"""
Transfers values to the object *self.table*.
:param root: if True, it is the root
:param exception: if True, raise Exception
:return: the value, dictionary of dictionary of list sometimes...
.. warning:: We assume fid is the key.
.. warning:: If root.add_root_id is True, is assumes column root_id is root.add_root_id
:githublink:`%|py|751`
"""
attr = {}
try:
v = self.conversion_table[self.name](self.buffer)
except Exception as ex:
if "conversion_table" not in self.__dict__:
# fLOG("ERROR: unable to find conversion_table for field ",
# self.name,
# " in node " + "/".join(self.get_full_name()))
self._log_error()
#if exception : raise Exception ("fail to convert value for field " + k)
elif len(self.buffer) > 0:
# fLOG("ERROR: fail to convert value '",
# self.buffer,
# "' into ",
# self.conversion_table.get(self.name,
# "not found"),
# " for field ",
# self.name,
# " --- ",
# repr(self.buffer),
# " path: ",
# "/".join(self.get_full_name()))
self._log_error()
if exception:
raise XmlException(
"Fail to convert value for field '{}'".format(self.name)) from ex
v = ""
if not isinstance(v, str) or len(v) > 0:
attr[self.name] = v
for k, v in self.items():
try:
v = self.conversion_table[k](v)
except Exception as ex:
if "conversion_table" not in self.__dict__:
# fLOG("ERROR: unable to find conversion_table field " +
# k +
# " in node " +
# "/".join(self.get_full_name()))
self._log_error()
#if exception : raise Exception ("fail to convert value for field " + k)
continue
if len(v) > 0:
# fLOG("ERROR: fail to convert value ",
# v,
# " field ",
# k,
# " into ",
# self.conversion_table.get(k,
# "not found"),
# " for field ",
# "/".join(self.get_full_name()))
self._log_error()
if exception: # pylint: disable=R1720
raise XmlException("fail to convert value for field '%s' in node '%s'" % (
k, "/".join(self.get_full_name()))) from ex
else:
continue
continue
if not isinstance(v, str) or len(v) > 0:
attr[k] = v
if "add_root_id" not in self.__dict__:
raise XmlException("unable to find add_root_id in '%s' (name '%s')" % (
"/".join(self.get_full_name()), self.name))
if self.add_root_id is not None:
attr[self.add_root_id] = ("mapto", self.get_root().name, "fid")
# other attributes
for k, v in self.other:
kn = "$" + k
if kn not in attr:
attr[kn] = []
r = v._transfer_to_object(root=False, exception=exception)
attr[kn].append(r)
return attr
[docs] def apply_change_names(self, change_names):
"""
private: change names attributes.
:param change_names: { oldname : newname }
:githublink:`%|py|836`
"""
if self.name in change_names:
self.name = change_names[self.name]
if "_othercount" in self.__dict__:
rem = []
upd = {}
for k, v in self._othercount.items():
if k in change_names:
rem.append(k)
upd[change_names[k]] = v
for r in rem:
del self._othercount[r]
self._othercount.update(upd)
rem = []
upd = {}
for k, v in self.items():
if k in change_names:
rem.append(k)
upd[change_names[k]] = v
for r in rem:
del self[r]
self.update(upd)
old = self.other
self.other = []
for k, v in old:
if k in change_names:
self.other.append((change_names[k], v))
else:
self.other.append((k, v))
v.apply_change_names(change_names)
[docs] def get_root(self):
"""
:return: the root of the node
:githublink:`%|py|873`
"""
node = self
while node.father is not None:
node = node.father
return node
[docs] def iterfields(self):
"""
Iterator on the nodes.
:githublink:`%|py|882`
"""
root = "/".join(self.get_full_name())
if self.name is not None:
yield (root + "/_", self.buffer)
for k, v in self.items():
yield (root + "/" + k, v)
for k, v in self.other:
for a, b in v.iterfields():
yield (a, b)
[docs] def find_node_regex(self, regex):
"""
Finds all nodes depending on a regular expression.
:param regex: regular expression
:return: list of ``[ (node, value) ]``
:githublink:`%|py|898`
"""
res = []
for node, value in self.iterfields():
if regex.search(node) is not None:
res.append((node, value))
return res