"""
About completion, simple algorithm
:githublink:`%|py|5`
"""
import time
from typing import Tuple, List, Iterator, Dict
from pyquickhelper.loghelper import noLOG
from .completion import CompletionTrieNode
[docs]class CompletionElement:
"""
Definition of an element in a completion system,
it contains the following members:
* *value*: the completion
* *weight*: a weight or a position, we assume a completion with
a lower weight is shown at a lower position
* *disp*: display string (no impact on the algorithm)
* mks0*: value of minimum keystroke
* mks0_*: length of the prefix to obtain *mks0*
* *mks1*: value of dynamic minimum keystroke
* *mks1_*: length of the prefix to obtain *mks1*
* *mks2*: value of modified dynamic minimum keystroke
* *mks2_*: length of the prefix to obtain *mks2*
:githublink:`%|py|29`
"""
__slots__ = "value", "weight", "disp", 'mks0', 'mks0_', 'mks1', 'mks1_', 'mks2', 'mks2_', 'prefix', '_info'
[docs] def __init__(self, value: str, weight=1.0, disp=None):
"""
constructor
:param value: value (a character)
:param weight: ordering (the lower, the first)
:param disp: original string, use this to identify the node
:githublink:`%|py|40`
"""
if not isinstance(value, str):
raise TypeError(
"value must be str not '{0}' - type={1}".format(value, type(value)))
self.value = value
self.weight = weight
self.disp = disp
self.prefix = None
self._info = None
[docs] @staticmethod
def empty_prefix():
"""
return an instance filled with an empty prefix
:githublink:`%|py|54`
"""
if not hasattr(CompletionElement, "static_empty_prefix"):
res = CompletionElement('', None)
res.mks0 = res.mks1 = res.mks2 = 0
res.mks0_ = res.mks1_ = res.mks2_ = 0
CompletionElement.static_empty_prefix = res
return res
else:
return CompletionElement.static_empty_prefix
[docs] def __repr__(self):
"""
usual
:githublink:`%|py|67`
"""
if self._info:
return "CompletionElementInfo('{0}'{1}{2})".format(self.value,
", {0}".format(
self.weight) if self.weight != 1 else "",
", disp='{0}'" if self.disp else "")
else:
return "CompletionElement('{0}'{1}{2})".format(self.value,
", {0}".format(
self.weight) if self.weight != 1 else "",
", disp='{0}'" if self.disp else "")
[docs] def str_mks0(self) -> str:
"""
return a string with metric information
:githublink:`%|py|82`
"""
if hasattr(self, "mks0"):
return "MKS={0} *={1}".format(self.mks0, self.mks0_)
else:
return "-"
[docs] def str_mks(self) -> str:
"""
return a string with metric information
:githublink:`%|py|91`
"""
s0 = self.str_mks0()
if hasattr(self, "mks1"):
return s0 + " |'={0} *={1} |\"={2} *={3}".format(
self.mks1, self.mks1_, self.mks2, self.mks2_)
else:
return s0
[docs] def str_all_completions(self, maxn=10, use_precompute=True) -> str:
"""
builds a string with all completions for all
prefixes along the paths, this is only available
if parameter *completions* was used when calling
method :meth:`update_metrics <mlstatpy.nlp.completion_simple.CompletionElement.update_metrics>`.
:param maxn: maximum number of completions to show
:param use_precompute: use intermediate results built by :meth:`precompute_stat <mlstatpy.nlp.completion.CompletionTrieNode.precompute_stat>`
:return: str
:githublink:`%|py|109`
"""
rows = [
"{0} -- {1} -- {2}".format(self.weight, self.value, self.str_mks())]
if self._info is not None:
rows.append("------------------")
for el in self._info._log_imp:
rows.append(str(el))
for i in range(len(self.value)):
prefix = self.value[:i]
rows.append("------------------")
rows.append("i={0} - {1}".format(i, prefix))
completions = self._info._completions.get(prefix, [])
for i2, el in enumerate(completions):
ar = " " if el.value != self.value else "-> "
add = "{5}{0}:{1} -- {2}{4}-- {3}".format(
i2, el.weight, el.value, el.str_mks(), " " * (20 - len(el.value)), ar)
rows.append(add)
else:
rows.append("NO INFO")
return "\n".join(rows)
[docs] def init_metrics(self, position: int, completions: List['CompletionElement'] = None):
"""
initiate the metrics
:param position: position in the completion system when prefix is null,
*position starting from 0*
:param completions: displayed completions, if not None, the method will
store them in member *_completions*
:return: boolean which indicates there was an update
:githublink:`%|py|139`
"""
if completions is not None:
log_imp = True
class c:
def __str__(self):
return "{0}-{1}".format(self._completions, self._log_imp)
self._info = c()
self._info._completions = {}
self._info._log_imp = []
if '' not in self._info._completions:
cut = min(15, max(10, len(self.value)),
len(completions['']))
if len(completions['']) >= cut:
self._info._completions[''] = completions[''][:cut]
else:
self._info._completions[''] = completions[''].copy()
else:
log_imp = False
self.prefix = CompletionElement.empty_prefix()
position += 1
if len(self.value) <= position:
self.mks0 = len(self.value)
self.mks1 = len(self.value)
self.mks2 = len(self.value)
self.mks0_ = len(self.value)
self.mks1_ = len(self.value)
self.mks2_ = len(self.value)
return False
else:
self.mks0 = position
self.mks1 = position
self.mks2 = position
self.mks0_ = 0
self.mks1_ = 0
self.mks2_ = 0
if log_imp:
self._info._log_imp.append(
(0, "mks0", position, '', "k={0}".format(0),
"p={0}".format(position), "it={0}".format(0)))
return True
[docs] def update_metrics(self, prefix: str, position: int, improved: dict, delta: float,
completions: List['CompletionElement'] = None, iteration=-1):
"""
update the metrics
:param prefix: prefix
:param position: position in the completion system when prefix has length k,
*position starting from 0*
:param improved: if one metrics is < to the completion length, it means
it can be used to improve others queries
:param delta: delta in the dynamic modified mks
:param completions: displayed completions, if not None, the method will
store them in member *_completions*
:param iteration: for debugging purpose, indicates when this improvment was detected
:return: boolean which indicates there was an update
:githublink:`%|py|198`
"""
if self.prefix is not None and len(prefix) < len(self.prefix.value):
# no need to look into it
return False
if completions is not None:
log_imp = True
if prefix not in self._info._completions:
cut = min(15, max(10, len(self.value)),
len(completions[prefix]))
if len(completions[prefix]) >= cut:
self._info._completions[prefix] = completions[prefix][:cut]
else:
self._info._completions[
prefix] = completions[prefix].copy()
else:
log_imp = False
k = len(prefix)
pos = position + 1
mks = k + pos
check = False
if mks < self.mks0:
self.mks0 = mks
self.mks0_ = k
check = True
if log_imp:
self._info._log_imp.append(
(1, "mks0", mks, prefix, "k={0}".format(k), "p={0}".format(position),
"it={0}".format(iteration), "last={0}".format(self.prefix.value)))
elif mks == self.mks0 and self.mks0_ < k:
self.mks0_ = k
if mks < self.mks1:
self.mks1 = mks
self.mks1_ = k
check = True
if mks < self.mks2:
self.mks2 = mks
self.mks2_ = k
check = True
if self.prefix and len(self.prefix.value) < len(prefix):
# we use the latest prefix available
v = self.prefix
dd = len(prefix) - len(v.value) + pos
mks = v.mks1 + dd
if mks < self.mks1:
self.mks1 = mks
self.mks1_ = k
check = True
if log_imp:
self._info._log_imp.append(
(4, "mks1", mks, prefix, "k={0}".format(k), "p={0}".format(position),
"it={0}".format(iteration), "last={0}".format(self.prefix.value)))
mks = v.mks2 + dd
if mks < self.mks2:
self.mks2 = mks
self.mks2_ = k
check = True
if log_imp:
self._info._log_imp.append(
(5, "mks2", mks, prefix, "k={0}".format(k), "p={0}".format(position),
"it={0}".format(iteration), "last={0}".format(self.prefix.value)))
if prefix in improved:
v = improved[prefix]
self.prefix = v
mks = v.mks1 + min(len(self.value) - len(prefix), pos)
if mks < self.mks1:
self.mks1 = mks
self.mks1_ = k
check = True
if log_imp:
self._info._log_imp.append(
(2, "mks1", mks, prefix, "k={0}".format(k), "p={0}".format(position),
"it={0}".format(iteration), "last={0}".format(self.prefix.value)))
mks = v.mks2 + min(len(self.value) - len(prefix), pos + delta)
if mks < self.mks2:
self.mks2 = mks
self.mks2_ = k
check = True
if log_imp:
self._info._log_imp.append(
(3, "mks2", mks, prefix, "k={0}".format(k), "p={0}".format(position),
"it={0}".format(iteration), "last={0}".format(self.prefix.value)))
if log_imp and self.prefix and self.prefix.value != '':
self._info._log_imp.append(self.prefix)
return check
[docs]class CompletionSystem:
"""
define a completion system
:githublink:`%|py|291`
"""
[docs] def __init__(self, elements: List[CompletionElement]):
"""
fill the completion system
:githublink:`%|py|296`
"""
def create_element(i, e):
if isinstance(e, CompletionElement):
return e
if isinstance(e, tuple):
return CompletionElement(e[1], e[0] if e[0] else i)
return CompletionElement(e, i)
self._elements = [create_element(i, e) for i, e in enumerate(elements)]
[docs] def __getitem__(self, i):
"""
Returns ``elements[i]``.
:githublink:`%|py|308`
"""
return self._elements[i]
[docs] def find(self, value: str, is_sorted=False) -> CompletionElement:
"""
Not very efficient, finds an item in a the list.
:param value: string to find
:param is_sorted: the function will assume the elements are sorted by
alphabetical order
:return: element or None
:githublink:`%|py|319`
"""
if is_sorted:
raise NotImplementedError( # pragma: no cover
"No optimisation for the sorted case.")
for e in self:
if e.value == value:
return e
return None
[docs] def items(self) -> Iterator[Tuple[str, CompletionElement]]:
"""
Iterates on ``(e.value, e)``.
:githublink:`%|py|331`
"""
for e in self._elements:
yield e.value, e
[docs] def tuples(self) -> Iterator[Tuple[float, str]]:
"""
Iterates on ``(e.weight, e.value)``.
:githublink:`%|py|338`
"""
for e in self._elements:
yield e.weight, e.value
[docs] def __len__(self) -> int:
"""
Number of elements.
:githublink:`%|py|345`
"""
return len(self._elements)
[docs] def __iter__(self) -> Iterator[CompletionElement]:
"""
Iterates over elements.
:githublink:`%|py|351`
"""
for e in self._elements:
yield e
[docs] def sort_values(self):
"""
sort the elements by value
:githublink:`%|py|358`
"""
self._elements = list(
_[-1] for _ in sorted((e.value, e.weight, e) for e in self))
[docs] def sort_weight(self):
"""
Sorts the elements by value.
:githublink:`%|py|365`
"""
self._elements = list(
_[-1] for _ in sorted((e.weight, e.value, e) for e in self))
[docs] def compare_with_trie(self, delta=0.8, fLOG=noLOG):
"""
Compares the results with the other implementation.
:param delta: parameter *delta* in the dynamic modified mks
:param fLOG: logging function
:return: None or differences
:githublink:`%|py|376`
"""
def format_diff(el, f, diff):
s = "VALUE={0}\nSYST=[{1}]\nTRIE=[{2}]\nMORE SYSTEM:\n{3}\n######\nMORE TRIE:\n{4}".format(
el.value, el.str_mks(), f.stat.str_mks(),
el.str_all_completions(), f.str_all_completions())
if diff:
return "-------\n{0}\n-------".format(s)
return s
trie = CompletionTrieNode.build(self.tuples())
self.compute_metrics(delta=delta, fLOG=fLOG, details=True)
trie.precompute_stat()
trie.update_stat_dynamic(delta=delta)
diffs = []
for el in self:
f = trie.find(el.value)
d0 = el.mks0 - f.stat.mks0
d1 = el.mks1 - f.stat.mks1
d2 = el.mks2 - f.stat.mks2
d4 = el.mks0_ - f.stat.mks0_
if d0 != 0 or d1 != 0 or d2 != 0 or d4 != 0:
diffs.append((d0, d1, d2, d4, el, f, format_diff(el, f, True)))
if diffs:
diffs.sort(key=str)
return diffs
else:
return None
[docs] def to_dict(self) -> Dict[str, CompletionElement]:
"""
Returns a dictionary.
:githublink:`%|py|407`
"""
return {el.value: el for el in self}
[docs] def compute_metrics(self, ffilter=None, delta=0.8,
details=False, fLOG=noLOG) -> int:
"""
Computes the metric for the completion itself.
:param ffilter: filter function
:param delta: parameter *delta* in the dynamic modified mks
:param details: log more details about displayed completions
:param fLOG: logging function
:return: number of iterations
The function ends by sorting the set of completion by alphabetical order.
:githublink:`%|py|422`
"""
self.sort_weight()
if ffilter is not None:
raise NotImplementedError( # pragma: no cover
"ffilter not None is not implemented")
if details:
store_completions = {'': []}
improved = {}
to = time.perf_counter()
fLOG("init_metrics:", len(self))
for i, el in enumerate(self._elements):
if details:
store_completions[''].append(el)
r = el.init_metrics(i, store_completions)
else:
r = el.init_metrics(i)
if r and el.value not in improved:
improved[el.value] = el
t = time.perf_counter()
fLOG(
"interation 0: #={0} dt={1} - log details={2}".format(len(self), t - to, details))
updates = 1
it = 1
while updates > 0:
displayed = {}
updates = 0
for i, el in enumerate(self._elements):
for k in range(0, len(el.value)):
prefix = el.value[:k]
if prefix not in displayed:
displayed[prefix] = 0
if details:
store_completions[prefix] = [el]
else:
displayed[prefix] += 1
if details:
store_completions[prefix].append(el)
r = el.update_metrics(
prefix, displayed[prefix], improved, delta,
completions=(store_completions if details else None),
iteration=it)
if r:
if el.value not in improved:
improved[el.value] = el
updates += 1
t = time.perf_counter()
fLOG("interation {0}: updates={1} dt={2}".format(
it, updates, t - to))
it += 1
self.sort_values()
return it - 1
[docs] def enumerate_test_metric(self, qset: Iterator[Tuple[str, float]]) -> Iterator[Tuple[CompletionElement, CompletionElement]]:
"""
Evaluates the completion set on a set of queries,
the function returns a list of :class:`CompletionElement <mlstatpy.nlp.completion_simple.CompletionElement>`
with the three metrics :math:`M`, :math:`M'`, :math:`M"`
for these particular queries.
:param qset: list of tuple(str, float) = (query, weight)
:return: list of tuple of :class:`CompletionElement <mlstatpy.nlp.completion_simple.CompletionElement>`,
the first one is the query, the second one is the None or
the matching completion
The method :meth:`compute_metric` needs to be called first.
:githublink:`%|py|490`
"""
qset = sorted(qset)
current = 0
for query, weight in qset:
while current < len(self) and self[current].value <= query:
current += 1
ind = current - 1
el = CompletionElement(query, weight)
if ind >= 0:
inset = self[ind]
le = len(inset.value)
if le <= len(query) and inset.value == query[:le]:
if le == len(query):
found = inset
el.mks0 = inset.mks0
el.mks1 = inset.mks1
el.mks2 = inset.mks2
el.mks0_ = len(query)
el.mks1_ = len(query)
el.mks2_ = len(query)
else:
found = None
el.mks0 = 0
el.mks0_ = 0
el.mks1 = inset.mks1 + len(query) - le
el.mks1_ = le
el.mks2 = inset.mks2 + len(query) - le
el.mks2_ = le
else:
found = None
el.mks0 = len(query)
el.mks1 = len(query)
el.mks2 = len(query)
el.mks0_ = len(query)
el.mks1_ = len(query)
el.mks2_ = len(query)
else:
found = None
el.mks0 = len(query)
el.mks1 = len(query)
el.mks2 = len(query)
el.mks0_ = len(query)
el.mks1_ = len(query)
el.mks2_ = len(query)
yield el, found
[docs] def test_metric(self, qset: Iterator[Tuple[str, float]]) -> Dict[str, float]:
"""
Evaluates the completion set on a set of queries,
the function returns a dictionary with the aggregated metrics and
some statistics about them.
:param qset: list of tuple(str, float) = (query, weight)
:return: list of :class:`CompletionElement <mlstatpy.nlp.completion_simple.CompletionElement>`
The method :meth:`compute_metric` needs to be called first.
It then calls :meth:`enumerate_metric`.
:githublink:`%|py|548`
"""
res = dict(mks0=0.0, mks1=0.0, mks2=0.0,
sum_weights=0.0, sum_wlen=0.0, n=0)
hist = {k: {} for k in {"mks0", "mks1", "mks2", "l"}}
wei = {k: {} for k in hist}
res["hist"] = hist
res["histnow"] = wei
for el, _ in self.enumerate_test_metric(qset):
le = len(el.value)
w = el.weight
res["mks0"] += w * el.mks0
res["mks1"] += w * el.mks1
res["mks2"] += w * el.mks2
res["sum_weights"] += w
res["sum_wlen"] += w * le
res["n"] += 1
if el.mks0 not in hist["mks0"]:
hist["mks0"][el.mks0] = w
wei["mks0"][el.mks0] = 1
else:
hist["mks0"][el.mks0] += w
wei["mks0"][el.mks0] += 1
if el.mks1 not in hist["mks1"]:
hist["mks1"][el.mks1] = w
wei["mks1"][el.mks1] = 1
else:
hist["mks1"][el.mks1] += w
wei["mks1"][el.mks1] += 1
if el.mks2 not in hist["mks2"]:
hist["mks2"][el.mks2] = w
wei["mks2"][el.mks2] = 1
else:
hist["mks2"][el.mks2] += w
wei["mks2"][el.mks2] += 1
if le not in hist["l"]:
hist["l"][le] = w
wei["l"][le] = 1
else:
hist["l"][le] += w
wei["l"][le] += 1
return res