"""
About completion
:githublink:`%|py|5`
"""
from typing import Tuple, List, Iterator
from collections import deque
[docs]class CompletionTrieNode:
"""
Node definition in a trie used to do completion, see :ref:`l-completion0`.
This implementation is not very efficient about memmory consumption,
it does not hold above 200.000 words.
It should be done another way (:epkg:`cython`, :epkg:`C++`).
:githublink:`%|py|15`
"""
__slots__ = ("value", "children", "weight",
"leave", "stat", "parent", "disp")
[docs] def __init__(self, value, leave, weight=1.0, disp=None):
"""
:param value: value (a character)
:param leave: boolean (is it a completion)
:param weight: ordering (the lower, the first)
:param disp: original string, use this to identify the node
:githublink:`%|py|26`
"""
if not isinstance(value, str):
raise TypeError(
"value must be str not '{0}' - type={1}".format(value, type(value)))
self.value = value
self.children = None
self.weight = weight
self.leave = leave
self.stat = None
self.parent = None
self.disp = disp
@property
def root(self):
"""
Returns the initial node with no parent.
:githublink:`%|py|42`
"""
node = self
while node.parent is not None:
node = node.parent
return node
[docs] def __str__(self):
"""
usual
:githublink:`%|py|51`
"""
return "[{2}:{0}:w={1}]".format(self.value, self.weight, "#" if self.leave else "-")
[docs] def _add(self, key, child):
"""
Adds a child.
:param key: one letter of the word
:param child: child
:return: self
:githublink:`%|py|61`
"""
if self.children is None:
self.children = {key: child}
child.parent = self
elif key in self.children:
raise KeyError("'{0}' already added".format(key))
else:
self.children[key] = child
child.parent = self
return self
[docs] def items_list(self) -> List['CompletionTrieNode']:
"""
All children nodes inluding itself in a list.
:return: list[
:githublink:`%|py|77`
"""
res = [self]
if self.children is not None:
for _, v in sorted(self.children.items()):
r = v.items_list()
res.extend(r)
return res
[docs] def __iter__(self):
"""
Iterates on all nodes (sorted).
:githublink:`%|py|88`
"""
stack = [self]
while len(stack) > 0:
node = stack.pop()
yield node
if node.children:
stack.extend(v for k, v in sorted(
node.children.items(), reverse=True))
[docs] def unsorted_iter(self):
"""
Iterates on all nodes.
:githublink:`%|py|100`
"""
stack = [self]
while len(stack) > 0:
node = stack.pop()
yield node
if node.children:
stack.extend(node.children.values())
[docs] def items(self) -> Iterator[Tuple[float, str, 'CompletionTrieNode']]:
"""
Iterates on children, iterates on weight, key, child.
:githublink:`%|py|111`
"""
if self.children is not None:
for k, v in self.children.items():
yield v.weight, k, v
[docs] def iter_leaves(self, max_weight=None) -> Iterator[Tuple[float, str]]:
"""
Iterators on leaves sorted per weight, yield weight, value.
:param max_weight: keep all value under this threshold or None for all
:githublink:`%|py|121`
"""
def iter_local(node):
if node.leave and (max_weight is None or node.weight <= max_weight):
yield node.weight, None, node.value
for w, k, v in sorted(node.items()):
for w_, k_, v_ in iter_local(v):
yield w_, k_, v_
for w, _, v in sorted(iter_local(self)):
yield w, v
[docs] def leaves(self) -> Iterator['CompletionTrieNode']:
"""
Iterators on leaves.
:githublink:`%|py|135`
"""
stack = [self]
while len(stack) > 0:
pop = stack.pop()
if pop.leave:
yield pop
if pop.children:
stack.extend(pop.children.values())
[docs] def all_completions(self) -> List[Tuple['CompletionTrieNone', List[str]]]:
"""
Retrieves all completions for a node,
the method does not need :meth:`precompute_stat <mlstatpy.nlp.completion.CompletionTrieNode.precompute_stat>` to be run first.
:githublink:`%|py|148`
"""
word = self.value
nodes = [self.root]
node = nodes[0]
for c in word:
if node.children is not None and c in node.children:
node = node.children[c]
nodes.append(node)
nodes.reverse()
all_res = []
for node in nodes:
res = list(n[1] for n in node.iter_leaves())
all_res.append((node, res))
all_res.reverse()
return all_res
[docs] def all_mks_completions(self) -> List[Tuple['CompletionTrieNone', List['CompletionTrieNone']]]:
"""
Retrieves all completions for a node,
the method assumes :meth:`precompute_stat <mlstatpy.nlp.completion.CompletionTrieNode.precompute_stat>` was run.
:githublink:`%|py|168`
"""
res = []
node = self
while True:
res.append((node, node.stat.completions))
if node.parent is None:
break
node = node.parent
res.reverse()
return res
[docs] def str_all_completions(self, maxn=10, use_precompute=True) -> str:
"""
Builds a string with all completions for all
prefixes along the paths.
:param maxn: maximum number of completions to show
:param use_precompute: use intermediate results built by :meth:`precompute_stat <mlstatpy.nlp.completion.CompletionTrieNode.precompute_stat>`
:return: str
:githublink:`%|py|187`
"""
res = self.all_mks_completions() if use_precompute else self.all_completions()
rows = []
for node, sug in res:
rows.append("l={3} p='{0}' {1} {2}".format(node.value, "-" * 10, node.stat.str_mks(),
'+' if node.leave else '-'))
for i, s in enumerate(sug):
if isinstance(s, str):
rows.append(" {0}-'{1}'".format(i + 1, s))
else:
rows.append(
" {0}-w{1}-'{2}'".format(i + 1, s[0], s[1].value))
if maxn is not None and i > maxn:
break
return "\n".join(rows)
[docs] @staticmethod
def build(words) -> 'CompletionTrieNode':
"""
Builds a trie.
:param words: list of ``(word)`` or ``(weight, word)`` or ``(weight, word, display string)``
:return: root of the trie (CompletionTrieNode)
:githublink:`%|py|210`
"""
root = CompletionTrieNode('', False)
nb = 0
minw = None
for wword in words:
if isinstance(wword, tuple):
if len(wword) == 2:
w, word = wword
disp = None
elif len(wword) == 3:
w, word, disp = wword
else:
raise ValueError(
"Unexpected number of values, it should be (weight, word) or (weight, word, dispplay string): {0}".format(wword))
else:
w = 1.0
word = wword
disp = None
if w is None:
w = nb
if minw is None or minw > w:
minw = w
node = root
new_node = None
for c in word:
if node.children is not None and c in node.children:
if not node.leave:
node.weight = min(node.weight, w)
node = node.children[c]
else:
new_node = CompletionTrieNode(
node.value + c, False, weight=w)
node._add(c, new_node)
node = new_node
if new_node is None:
if node.leave:
raise ValueError(
"Value '{0}' appears twice in the input list (not allowed).".format(word))
new_node = node
new_node.leave = True
new_node.weight = w
if disp is not None:
new_node.disp = disp
nb += 1
root.weight = minw
return root
[docs] def find(self, prefix: str) -> 'CompletionTrieNode':
"""
Returns the node which holds all completions starting with a given prefix.
:param prefix: prefix
:return: node or None for no result
:githublink:`%|py|263`
"""
if len(prefix) == 0:
if not self.value:
return self
else:
raise ValueError(
"find '{0}' but node is not empty '{1}'".format(prefix, self.value))
node = self
for c in prefix:
if node.children is not None and c in node.children:
node = node.children[c]
else:
return None
return node
[docs] def min_keystroke(self, word: str) -> Tuple[int, int]:
"""
Returns the minimum keystrokes for a word without optimisation,
this function should be used if you only have a couple of values to
computes. You shoud use :meth:`min_keystroke0 <mlstatpy.nlp.completion.CompletionTrieNode.min_keystroke0>` to compute all of them.
:param word: word
:return: number, length of best prefix
See :ref:`l-completion-optim`.
.. math::
:nowrap:
\\begin{eqnarray*}
K(q, k, S) &=& \\min\\acc{ i | s_i \\succ q[1..k], s_i \\in S } \\\\
M(q, S) &=& \\min_{0 \\infegal k \\infegal l(q)} k + K(q, k, S)
\\end{eqnarray*}
:githublink:`%|py|296`
"""
nodes = [self]
node = self
for c in word:
if node.children is not None and c in node.children:
node = node.children[c]
nodes.append(node)
else:
# not found
return len(word), -1
nodes.reverse()
metric = len(word)
best = len(word)
for node in nodes[1:]:
res = list(n[1] for n in node.iter_leaves())
ind = res.index(word)
m = len(node.value) + ind + 1
if m < metric:
metric = m
best = len(node.value)
if ind >= len(word):
# no need to go further, the position will increase
break
return metric, best
[docs] def min_keystroke0(self, word: str) -> Tuple[int, int]:
"""
Returns the minimum keystrokes for a word.
:param word: word
:return: number, length of best prefix, iteration it stops moving
This function must be called after :meth:`precompute_stat <mlstatpy.nlp.completion.CompletionTrieNode.precompute_stat>`
and :meth:`update_stat_dynamic <mlstatpy.nlp.completion.CompletionTrieNode.update_stat_dynamic>`.
See :ref:`l-completion-optim`.
.. math::
:nowrap:
\\begin{eqnarray*}
K(q, k, S) &=& \\min\\acc{ i | s_i \\succ q[1..k], s_i \\in S } \\\\
M(q, S) &=& \\min_{0 \\infegal k \\infegal l(q)} k + K(q, k, S)
\\end{eqnarray*}
:githublink:`%|py|340`
"""
node = self.find(word)
if node is None:
raise NotImplementedError(
"this metric is not yet computed for a query outside the trie: '{0}'".format(word))
if not hasattr(node, "stat"):
raise AttributeError("run precompute_stat and update_stat_dynamic")
if not hasattr(node.stat, "mks1"):
raise AttributeError("run precompute_stat and update_stat_dynamic\nnode={0}\n{1}".format(
self, "\n".join(sorted(self.stat.__dict__.keys()))))
return node.stat.mks0, node.stat.mks0_, 0
[docs] def min_dynamic_keystroke(self, word: str) -> Tuple[int, int]:
"""
Returns the dynamic minimum keystrokes for a word.
:param word: word
:return: number, length of best prefix, iteration it stops moving
This function must be called after :meth:`precompute_stat <mlstatpy.nlp.completion.CompletionTrieNode.precompute_stat>`
and :meth:`update_stat_dynamic <mlstatpy.nlp.completion.CompletionTrieNode.update_stat_dynamic>`.
See :ref:`Dynamic Minimum Keystroke <def-mks2>`.
.. math::
:nowrap:
\\begin{eqnarray*}
K(q, k, S) &=& \\min\\acc{ i | s_i \\succ q[1..k], s_i \\in S } \\\\
M'(q, S) &=& \\min_{0 \\infegal k \\infegal l(q)} \\acc{ M'(q[1..k], S) + K(q, k, S) | q[1..k] \\in S }
\\end{eqnarray*}
:githublink:`%|py|370`
"""
node = self.find(word)
if node is None:
raise NotImplementedError(
"this metric is not yet computed for a query outside the trie: '{0}'".format(word))
if not hasattr(node, "stat"):
raise AttributeError("run precompute_stat and update_stat_dynamic")
if not hasattr(node.stat, "mks1"):
raise AttributeError("run precompute_stat and update_stat_dynamic\nnode={0}\n{1}".format(
self, "\n".join(sorted(self.stat.__dict__.keys()))))
return node.stat.mks1, node.stat.mks1_, node.stat.mks1i_
[docs] def min_dynamic_keystroke2(self, word: str) -> Tuple[int, int]:
"""
Returns the modified dynamic minimum keystrokes for a word.
:param word: word
:return: number, length of best prefix, iteration it stops moving
This function must be called after :meth:`precompute_stat <mlstatpy.nlp.completion.CompletionTrieNode.precompute_stat>`
and :meth:`update_stat_dynamic <mlstatpy.nlp.completion.CompletionTrieNode.update_stat_dynamic>`.
See :ref:`Modified Dynamic Minimum Keystroke <def-mks3>`.
.. math::
:nowrap:
\\begin{eqnarray*}
K(q, k, S) &=& \\min\\acc{ i | s_i \\succ q[1..k], s_i \\in S } \\\\
M"(q, S) &=& \\min \\left\\{ \\begin{array}{l}
\\min_{1 \\infegal k \\infegal l(q)} \\acc{ M"(q[1..k-1], S) + 1 + K(q, k, S) | q[1..k] \\in S } \\\\
\\min_{0 \\infegal k \\infegal l(q)} \\acc{ M"(q[1..k], S) + \\delta + K(q, k, S) | q[1..k] \\in S }
\\end{array} \\right .
\\end{eqnarray*}
:githublink:`%|py|403`
"""
node = self.find(word)
if node is None:
raise NotImplementedError(
"this metric is not yet computed for a query outside the trie: '{0}'".format(word))
if not hasattr(node, "stat"):
raise AttributeError("run precompute_stat and update_stat_dynamic")
if not hasattr(node.stat, "mks2"):
raise AttributeError("run precompute_stat and update_stat_dynamic\nnode={0}\n{1}".format(
self, "\n".join(sorted(self.stat.__dict__.keys()))))
return node.stat.mks2, node.stat.mks2_, node.stat.mks2i_
[docs] def precompute_stat(self):
"""
Computes and stores list of completions for each node,
computes *mks*.
:param clean: clean stat
:githublink:`%|py|421`
"""
stack = deque()
stack.extend(self.leaves())
while len(stack) > 0:
pop = stack.popleft()
if pop.stat is not None:
continue
if not pop.children:
pop.stat = CompletionTrieNode._Stat()
pop.stat.completions = []
pop.stat.mks0 = len(pop.value)
pop.stat.mks0_ = len(pop.value)
if pop.parent is not None:
stack.append(pop.parent)
elif all(v.stat is not None for v in pop.children.values()):
pop.stat = CompletionTrieNode._Stat()
if pop.leave:
pop.stat.mks0 = len(pop.value)
pop.stat.mks0_ = len(pop.value)
stack.extend(pop.children.values())
pop.stat.merge_completions(pop.value, pop.children.values())
pop.stat.next_nodes = pop.children
pop.stat.update_minimum_keystroke(len(pop.value))
if pop.parent is not None:
stack.append(pop.parent)
else:
# we'll do it again later
stack.append(pop)
[docs] def update_stat_dynamic(self, delta=0.8):
"""
Must be called after :meth:`precompute_stat <mlstatpy.nlp.completion.CompletionTrieNode.precompute_stat>`
and computes dynamic mks (see :ref:`Dynamic Minimum Keystroke <def-mks2>`).
:param delta: parameter :math:`\\delta` in defintion
:ref:`Modified Dynamic KeyStroke <def-mks3>`
:return: number of iterations to converge
:githublink:`%|py|458`
"""
for node in self.unsorted_iter():
node.stat.init_dynamic_minimum_keystroke(len(node.value))
node.stat.iter_ = 0
updates = 1
itera = 0
while updates > 0:
updates = 0
stack = []
stack.append(self)
while len(stack) > 0:
pop = stack.pop()
if pop.stat.iter_ > itera:
continue
updates += pop.stat.update_dynamic_minimum_keystroke(
len(pop.value), delta)
if pop.children:
stack.extend(pop.children.values())
pop.stat.iter_ += 1
itera += 1
return itera
##
# end of methods, beginning of subclasses
##
[docs] class _Stat:
"""
Stores statistics and intermediate data about the compuation the metrics.
It contains the following members:
* mks0*: value of minimum keystroke
* mks0_*: length of the prefix to obtain *mks0*
* *mks_iter*: current iteration during the computation of mks
* *mks1*: value of dynamic minimum keystroke
* *mks1_*: length of the prefix to obtain *mks*
* *mks1i_*: iteration when it was obtained
* *mks2*: value of modified dynamic minimum keystroke
* *mks2_*: length of the prefix to obtain *mks2*
* *mks2i*: iteration when it converged
:githublink:`%|py|501`
"""
[docs] def merge_completions(self, prefix: int, nodes: '[CompletionTrieNode]'):
"""
Merges list of completions and cut the list, we assume
given lists are sorted.
:githublink:`%|py|507`
"""
class Fake:
pass
res = []
indexes = [0 for _ in nodes]
indexes.append(0)
last = Fake()
last.value = None
last.stat = CompletionTrieNode._Stat()
last.stat.completions = list(
sorted((_.weight, _) for _ in nodes if _.leave))
nodes = list(nodes)
nodes.append(last)
maxl = 0
while True:
en = [(_.stat.completions[indexes[i]][0], i, _.stat.completions[indexes[i]][1])
for i, _ in enumerate(nodes) if indexes[i] < len(_.stat.completions)]
if not en:
break
e = min(en)
i = e[1]
res.append((e[0], e[2]))
indexes[i] += 1
maxl = max(maxl, len(res[-1][1].value))
# maxl - len(prefix) represents the longest list which reduces the number of keystrokes
# however, as the method aggregates completions at a lower lovel,
# we must keep longer completions for lower levels
ind = maxl
if len(res) > ind:
self.completions = res[:ind]
else:
self.completions = res
[docs] def update_minimum_keystroke(self, lw):
"""
Updates minimum keystroke for the completions.
:param lw: prefix length
:githublink:`%|py|547`
"""
for i, wsug in enumerate(self.completions):
sug = wsug[1]
nl = lw + i + 1
if not hasattr(sug.stat, "mks0") or sug.stat.mks0 > nl:
sug.stat.mks0 = nl
sug.stat.mks0_ = lw
[docs] def update_dynamic_minimum_keystroke(self, lw, delta):
"""
Updates dynamic minimum keystroke for the completions.
:param lw: prefix length
:param delta: parameter :math:`\\delta` in defintion
:ref:`Modified Dynamic KeyStroke <def-mks3>`
:return: number of updates
:githublink:`%|py|563`
"""
self.mks_iter += 1
update = 0
for i, wsug in enumerate(self.completions):
sug = wsug[1]
if sug.leave:
# this is a leave so we consider the completion being part
# of the list of completions
nl = self.mks1 + i + 1
if sug.stat.mks1 > nl:
sug.stat.mks1 = nl
sug.stat.mks1_ = lw
sug.stat.mks1i_ = self.mks_iter
update += 1
nl = self.mks2 + i + 1 + delta
if sug.stat.mks2 > nl:
sug.stat.mks2 = nl
sug.stat.mks2_ = lw
sug.stat.mks2i_ = self.mks_iter
update += 1
else:
raise Exception("this case should not happen")
# optimisation of second case of modified metric
# in a separate function for profiling
def second_step(update):
if hasattr(self, "next_nodes"):
for _, child in self.next_nodes.items():
for i, wsug in enumerate(child.stat.completions):
sug = wsug[1]
if not sug.leave:
continue
nl = self.mks2 + i + 2
if sug.stat.mks2 > nl:
sug.stat.mks2 = nl
sug.stat.mks2_ = lw
sug.stat.mks2i_ = self.mks_iter
update += 1
return update
update = second_step(update)
# finally we need to update mks, mks2 for every prefix
# this is not necessary a leave so it does not appear in the list of completions
# but we need to update mks for these strings, we assume it just
# requires an extra character, somehow, we propagate the values
if hasattr(self, "next_nodes"):
for _, n in self.next_nodes.items():
if not hasattr(n.stat, "mks1") or n.stat.mks1 > self.mks1 + 1:
n.stat.mks1 = self.mks1 + 1
n.stat.mks1_ = self.mks1_
n.stat.mks1i_ = self.mks_iter
update += 1
if not hasattr(n.stat, "mks2") or n.stat.mks2 > self.mks2 + 1:
n.stat.mks2 = self.mks2 + 1
n.stat.mks2_ = self.mks2_
n.stat.mks2i_ = self.mks_iter
update += 1
return update
[docs] def init_dynamic_minimum_keystroke(self, lw):
"""
Initializes *mks* and *mks2* from from *mks0*.
:param lw: length of the prefix
:githublink:`%|py|629`
"""
if hasattr(self, "mks0"):
self.mks1 = self.mks0
self.mks1_ = self.mks0_
self.mks_iter = 0
self.mks1i_ = 0
self.mks2 = self.mks0
self.mks2_ = self.mks0_
self.mks2i_ = 0
else:
self.mks0 = lw
self.mks0_ = 0
self.mks1 = lw
self.mks1_ = lw
self.mks_iter = 0
self.mks1i_ = 0
self.mks2 = lw
self.mks2_ = lw
self.mks2i_ = 0
[docs] def str_mks0(self) -> str:
"""
Returns a string with metric information.
:githublink:`%|py|652`
"""
if hasattr(self, "mks0"):
return "MKS={0} *={1}".format(self.mks0, self.mks0_)
else:
return "-"
[docs] def str_mks(self) -> str:
"""
Returns a string with metric information.
:githublink:`%|py|661`
"""
s0 = self.str_mks0()
if hasattr(self, "mks1"):
return s0 + " |'={0} *={1},{2} |\"={3} *={4},{5} |nn={6}".format(
self.mks1, self.mks1_, self.mks1i_, self.mks2, self.mks2i_, self.mks2i_, '+' if hasattr(self, "next_nodes") else '-')
else:
return s0