Coverage for src/mlstatpy/nlp/completion_simple.py: 92%
332 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-27 05:59 +0100
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-27 05:59 +0100
1"""
2@file
3@brief About completion, simple algorithm
4"""
5import time
6from typing import Tuple, List, Iterator, Dict
7from pyquickhelper.loghelper import noLOG
8from .completion import CompletionTrieNode
11class CompletionElement:
12 """
13 Definition of an element in a completion system,
14 it contains the following members:
16 * *value*: the completion
17 * *weight*: a weight or a position, we assume a completion with
18 a lower weight is shown at a lower position
19 * *disp*: display string (no impact on the algorithm)
21 * mks0*: value of minimum keystroke
22 * mks0_*: length of the prefix to obtain *mks0*
24 * *mks1*: value of dynamic minimum keystroke
25 * *mks1_*: length of the prefix to obtain *mks1*
27 * *mks2*: value of modified dynamic minimum keystroke
28 * *mks2_*: length of the prefix to obtain *mks2*
29 """
31 __slots__ = "value", "weight", "disp", 'mks0', 'mks0_', 'mks1', 'mks1_', 'mks2', 'mks2_', 'prefix', '_info'
33 def __init__(self, value: str, weight=1.0, disp=None):
34 """
35 constructor
37 @param value value (a character)
38 @param weight ordering (the lower, the first)
39 @param disp original string, use this to identify the node
40 """
41 if not isinstance(value, str):
42 raise TypeError(
43 f"value must be str not '{value}' - type={type(value)}")
44 self.value = value
45 self.weight = weight
46 self.disp = disp
47 self.prefix = None
48 self._info = None
50 @staticmethod
51 def empty_prefix():
52 """
53 return an instance filled with an empty prefix
54 """
55 if not hasattr(CompletionElement, "static_empty_prefix"):
56 res = CompletionElement('', None)
57 res.mks0 = res.mks1 = res.mks2 = 0
58 res.mks0_ = res.mks1_ = res.mks2_ = 0
59 CompletionElement.static_empty_prefix = res
60 return res
61 else:
62 return CompletionElement.static_empty_prefix
64 def __repr__(self):
65 """
66 usual
67 """
68 if self._info:
69 return "CompletionElementInfo('{0}'{1}{2})".format(self.value,
70 ", {0}".format(
71 self.weight) if self.weight != 1 else "",
72 ", disp='{0}'" if self.disp else "")
73 else:
74 return "CompletionElement('{0}'{1}{2})".format(self.value,
75 ", {0}".format(
76 self.weight) if self.weight != 1 else "",
77 ", disp='{0}'" if self.disp else "")
79 def str_mks0(self) -> str:
80 """
81 return a string with metric information
82 """
83 if hasattr(self, "mks0"):
84 return f"MKS={self.mks0} *={self.mks0_}"
85 else:
86 return "-"
88 def str_mks(self) -> str:
89 """
90 return a string with metric information
91 """
92 s0 = self.str_mks0()
93 if hasattr(self, "mks1"):
94 return s0 + " |'={0} *={1} |\"={2} *={3}".format(
95 self.mks1, self.mks1_, self.mks2, self.mks2_)
96 else:
97 return s0
99 def str_all_completions(self, maxn=10, use_precompute=True) -> str:
100 """
101 builds a string with all completions for all
102 prefixes along the paths, this is only available
103 if parameter *completions* was used when calling
104 method @see me update_metrics.
106 @param maxn maximum number of completions to show
107 @param use_precompute use intermediate results built by @see me precompute_stat
108 @return str
109 """
110 rows = [
111 f"{self.weight} -- {self.value} -- {self.str_mks()}"]
112 if self._info is not None:
113 rows.append("------------------")
114 for el in self._info._log_imp:
115 rows.append(str(el))
116 for i in range(len(self.value)):
117 prefix = self.value[:i]
118 rows.append("------------------")
119 rows.append(f"i={i} - {prefix}")
120 completions = self._info._completions.get(prefix, [])
121 for i2, el in enumerate(completions):
122 ar = " " if el.value != self.value else "-> "
123 add = "{5}{0}:{1} -- {2}{4}-- {3}".format(
124 i2, el.weight, el.value, el.str_mks(), " " * (20 - len(el.value)), ar)
125 rows.append(add)
126 else:
127 rows.append("NO INFO")
128 return "\n".join(rows)
130 def init_metrics(self, position: int, completions: List['CompletionElement'] = None):
131 """
132 initiate the metrics
134 @param position position in the completion system when prefix is null,
135 *position starting from 0*
136 @param completions displayed completions, if not None, the method will
137 store them in member *_completions*
138 @return boolean which indicates there was an update
139 """
140 if completions is not None:
141 log_imp = True
143 class c:
144 def __str__(self):
145 return f"{self._completions}-{self._log_imp}"
147 self._info = c()
148 self._info._completions = {}
149 self._info._log_imp = []
150 if '' not in self._info._completions:
151 cut = min(15, max(10, len(self.value)),
152 len(completions['']))
153 if len(completions['']) >= cut:
154 self._info._completions[''] = completions[''][:cut]
155 else:
156 self._info._completions[''] = completions[''].copy()
157 else:
158 log_imp = False
160 self.prefix = CompletionElement.empty_prefix()
161 position += 1
162 if len(self.value) <= position:
163 self.mks0 = len(self.value)
164 self.mks1 = len(self.value)
165 self.mks2 = len(self.value)
166 self.mks0_ = len(self.value)
167 self.mks1_ = len(self.value)
168 self.mks2_ = len(self.value)
169 return False
170 else:
171 self.mks0 = position
172 self.mks1 = position
173 self.mks2 = position
174 self.mks0_ = 0
175 self.mks1_ = 0
176 self.mks2_ = 0
177 if log_imp:
178 self._info._log_imp.append(
179 (0, "mks0", position, '', f"k={0}",
180 f"p={position}", f"it={0}"))
181 return True
183 def update_metrics(self, prefix: str, position: int, improved: dict, delta: float,
184 completions: List['CompletionElement'] = None, iteration=-1):
185 """
186 update the metrics
188 @param prefix prefix
189 @param position position in the completion system when prefix has length k,
190 *position starting from 0*
191 @param improved if one metrics is < to the completion length, it means
192 it can be used to improve others queries
193 @param delta delta in the dynamic modified mks
194 @param completions displayed completions, if not None, the method will
195 store them in member *_completions*
196 @param iteration for debugging purpose, indicates when this improvment was detected
197 @return boolean which indicates there was an update
198 """
199 if self.prefix is not None and len(prefix) < len(self.prefix.value):
200 # no need to look into it
201 return False
203 if completions is not None:
204 log_imp = True
205 if prefix not in self._info._completions:
206 cut = min(15, max(10, len(self.value)),
207 len(completions[prefix]))
208 if len(completions[prefix]) >= cut:
209 self._info._completions[prefix] = completions[prefix][:cut]
210 else:
211 self._info._completions[
212 prefix] = completions[prefix].copy()
213 else:
214 log_imp = False
216 k = len(prefix)
217 pos = position + 1
218 mks = k + pos
219 check = False
220 if mks < self.mks0:
221 self.mks0 = mks
222 self.mks0_ = k
223 check = True
224 if log_imp:
225 self._info._log_imp.append(
226 (1, "mks0", mks, prefix, f"k={k}", f"p={position}",
227 f"it={iteration}", f"last={self.prefix.value}"))
228 elif mks == self.mks0 and self.mks0_ < k:
229 self.mks0_ = k
230 if mks < self.mks1:
231 self.mks1 = mks
232 self.mks1_ = k
233 check = True
234 if mks < self.mks2:
235 self.mks2 = mks
236 self.mks2_ = k
237 check = True
239 if self.prefix and len(self.prefix.value) < len(prefix):
240 # we use the latest prefix available
241 v = self.prefix
242 dd = len(prefix) - len(v.value) + pos
243 mks = v.mks1 + dd
244 if mks < self.mks1:
245 self.mks1 = mks
246 self.mks1_ = k
247 check = True
248 if log_imp:
249 self._info._log_imp.append(
250 (4, "mks1", mks, prefix, f"k={k}", f"p={position}",
251 f"it={iteration}", f"last={self.prefix.value}"))
252 mks = v.mks2 + dd
253 if mks < self.mks2:
254 self.mks2 = mks
255 self.mks2_ = k
256 check = True
257 if log_imp:
258 self._info._log_imp.append(
259 (5, "mks2", mks, prefix, f"k={k}", f"p={position}",
260 f"it={iteration}", f"last={self.prefix.value}"))
261 if prefix in improved:
262 v = improved[prefix]
263 self.prefix = v
264 mks = v.mks1 + min(len(self.value) - len(prefix), pos)
265 if mks < self.mks1:
266 self.mks1 = mks
267 self.mks1_ = k
268 check = True
269 if log_imp:
270 self._info._log_imp.append(
271 (2, "mks1", mks, prefix, f"k={k}", f"p={position}",
272 f"it={iteration}", f"last={self.prefix.value}"))
273 mks = v.mks2 + min(len(self.value) - len(prefix), pos + delta)
274 if mks < self.mks2:
275 self.mks2 = mks
276 self.mks2_ = k
277 check = True
278 if log_imp:
279 self._info._log_imp.append(
280 (3, "mks2", mks, prefix, f"k={k}", f"p={position}",
281 f"it={iteration}", f"last={self.prefix.value}"))
283 if log_imp and self.prefix and self.prefix.value != '':
284 self._info._log_imp.append(self.prefix)
285 return check
288class CompletionSystem:
289 """
290 define a completion system
291 """
293 def __init__(self, elements: List[CompletionElement]):
294 """
295 fill the completion system
296 """
297 def create_element(i, e):
298 if isinstance(e, CompletionElement):
299 return e
300 if isinstance(e, tuple):
301 return CompletionElement(e[1], e[0] if e[0] else i)
302 return CompletionElement(e, i)
303 self._elements = [create_element(i, e) for i, e in enumerate(elements)]
305 def __getitem__(self, i):
306 """
307 Returns ``elements[i]``.
308 """
309 return self._elements[i]
311 def find(self, value: str, is_sorted=False) -> CompletionElement:
312 """
313 Not very efficient, finds an item in a the list.
315 @param value string to find
316 @param is_sorted the function will assume the elements are sorted by
317 alphabetical order
318 @return element or None
319 """
320 if is_sorted:
321 raise NotImplementedError( # pragma: no cover
322 "No optimisation for the sorted case.")
323 for e in self:
324 if e.value == value:
325 return e
326 return None
328 def items(self) -> Iterator[Tuple[str, CompletionElement]]:
329 """
330 Iterates on ``(e.value, e)``.
331 """
332 for e in self._elements:
333 yield e.value, e
335 def tuples(self) -> Iterator[Tuple[float, str]]:
336 """
337 Iterates on ``(e.weight, e.value)``.
338 """
339 for e in self._elements:
340 yield e.weight, e.value
342 def __len__(self) -> int:
343 """
344 Number of elements.
345 """
346 return len(self._elements)
348 def __iter__(self) -> Iterator[CompletionElement]:
349 """
350 Iterates over elements.
351 """
352 for e in self._elements:
353 yield e
355 def sort_values(self):
356 """
357 sort the elements by value
358 """
359 self._elements = list(
360 _[-1] for _ in sorted((e.value, e.weight, e) for e in self))
362 def sort_weight(self):
363 """
364 Sorts the elements by value.
365 """
366 self._elements = list(
367 _[-1] for _ in sorted((e.weight, e.value, e) for e in self))
369 def compare_with_trie(self, delta=0.8, fLOG=noLOG):
370 """
371 Compares the results with the other implementation.
373 @param delta parameter *delta* in the dynamic modified mks
374 @param fLOG logging function
375 @return None or differences
376 """
377 def format_diff(el, f, diff):
378 s = "VALUE={0}\nSYST=[{1}]\nTRIE=[{2}]\nMORE SYSTEM:\n{3}\n######\nMORE TRIE:\n{4}".format(
379 el.value, el.str_mks(), f.stat.str_mks(),
380 el.str_all_completions(), f.str_all_completions())
381 if diff:
382 return f"-------\n{s}\n-------"
383 return s
385 trie = CompletionTrieNode.build(self.tuples())
386 self.compute_metrics(delta=delta, fLOG=fLOG, details=True)
387 trie.precompute_stat()
388 trie.update_stat_dynamic(delta=delta)
389 diffs = []
390 for el in self:
391 f = trie.find(el.value)
392 d0 = el.mks0 - f.stat.mks0
393 d1 = el.mks1 - f.stat.mks1
394 d2 = el.mks2 - f.stat.mks2
395 d4 = el.mks0_ - f.stat.mks0_
396 if d0 != 0 or d1 != 0 or d2 != 0 or d4 != 0:
397 diffs.append((d0, d1, d2, d4, el, f, format_diff(el, f, True)))
398 if diffs:
399 diffs.sort(key=str)
400 return diffs
401 else:
402 return None
404 def to_dict(self) -> Dict[str, CompletionElement]:
405 """
406 Returns a dictionary.
407 """
408 return {el.value: el for el in self}
410 def compute_metrics(self, ffilter=None, delta=0.8,
411 details=False, fLOG=noLOG) -> int:
412 """
413 Computes the metric for the completion itself.
415 @param ffilter filter function
416 @param delta parameter *delta* in the dynamic modified mks
417 @param details log more details about displayed completions
418 @param fLOG logging function
419 @return number of iterations
421 The function ends by sorting the set of completion by alphabetical order.
422 """
423 self.sort_weight()
424 if ffilter is not None:
425 raise NotImplementedError( # pragma: no cover
426 "ffilter not None is not implemented")
427 if details:
428 store_completions = {'': []}
430 improved = {}
431 to = time.perf_counter()
432 fLOG("init_metrics:", len(self))
433 for i, el in enumerate(self._elements):
434 if details:
435 store_completions[''].append(el)
436 r = el.init_metrics(i, store_completions)
437 else:
438 r = el.init_metrics(i)
439 if r and el.value not in improved:
440 improved[el.value] = el
441 t = time.perf_counter()
442 fLOG(
443 f"interation 0: #={len(self)} dt={t - to} - log details={details}")
445 updates = 1
446 it = 1
447 while updates > 0:
448 displayed = {}
449 updates = 0
450 for i, el in enumerate(self._elements):
451 for k in range(0, len(el.value)):
452 prefix = el.value[:k]
453 if prefix not in displayed:
454 displayed[prefix] = 0
455 if details:
456 store_completions[prefix] = [el]
457 else:
458 displayed[prefix] += 1
459 if details:
460 store_completions[prefix].append(el)
461 r = el.update_metrics(
462 prefix, displayed[prefix], improved, delta,
463 completions=(store_completions if details else None),
464 iteration=it)
465 if r:
466 if el.value not in improved:
467 improved[el.value] = el
468 updates += 1
469 t = time.perf_counter()
470 fLOG(f"interation {it}: updates={updates} dt={t - to}")
471 it += 1
473 self.sort_values()
474 return it - 1
476 def enumerate_test_metric(self, qset: Iterator[Tuple[str, float]]) -> Iterator[Tuple[CompletionElement, CompletionElement]]:
477 """
478 Evaluates the completion set on a set of queries,
479 the function returns a list of @see cl CompletionElement
480 with the three metrics :math:`M`, :math:`M'`, :math:`M"`
481 for these particular queries.
483 @param qset list of tuple(str, float) = (query, weight)
484 @return list of tuple of @see cl CompletionElement,
485 the first one is the query, the second one is the None or
486 the matching completion
488 The method @see me compute_metric needs to be called first.
489 """
490 qset = sorted(qset)
491 current = 0
492 for query, weight in qset:
493 while current < len(self) and self[current].value <= query:
494 current += 1
495 ind = current - 1
496 el = CompletionElement(query, weight)
497 if ind >= 0:
498 inset = self[ind]
499 le = len(inset.value)
500 if le <= len(query) and inset.value == query[:le]:
501 if le == len(query):
502 found = inset
503 el.mks0 = inset.mks0
504 el.mks1 = inset.mks1
505 el.mks2 = inset.mks2
506 el.mks0_ = len(query)
507 el.mks1_ = len(query)
508 el.mks2_ = len(query)
509 else:
510 found = None
511 el.mks0 = 0
512 el.mks0_ = 0
513 el.mks1 = inset.mks1 + len(query) - le
514 el.mks1_ = le
515 el.mks2 = inset.mks2 + len(query) - le
516 el.mks2_ = le
517 else:
518 found = None
519 el.mks0 = len(query)
520 el.mks1 = len(query)
521 el.mks2 = len(query)
522 el.mks0_ = len(query)
523 el.mks1_ = len(query)
524 el.mks2_ = len(query)
525 else:
526 found = None
527 el.mks0 = len(query)
528 el.mks1 = len(query)
529 el.mks2 = len(query)
530 el.mks0_ = len(query)
531 el.mks1_ = len(query)
532 el.mks2_ = len(query)
534 yield el, found
536 def test_metric(self, qset: Iterator[Tuple[str, float]]) -> Dict[str, float]:
537 """
538 Evaluates the completion set on a set of queries,
539 the function returns a dictionary with the aggregated metrics and
540 some statistics about them.
542 @param qset list of tuple(str, float) = (query, weight)
543 @return list of @see cl CompletionElement
545 The method @see me compute_metric needs to be called first.
546 It then calls @see me enumerate_metric.
547 """
548 res = dict(mks0=0.0, mks1=0.0, mks2=0.0,
549 sum_weights=0.0, sum_wlen=0.0, n=0)
550 hist = {k: {}
551 for k in {"mks0", "mks1", "mks2", "l"}} # pylint: disable=C0208
552 wei = {k: {} for k in hist}
553 res["hist"] = hist
554 res["histnow"] = wei
556 for el, _ in self.enumerate_test_metric(qset):
557 le = len(el.value)
558 w = el.weight
559 res["mks0"] += w * el.mks0
560 res["mks1"] += w * el.mks1
561 res["mks2"] += w * el.mks2
562 res["sum_weights"] += w
563 res["sum_wlen"] += w * le
564 res["n"] += 1
566 if el.mks0 not in hist["mks0"]:
567 hist["mks0"][el.mks0] = w
568 wei["mks0"][el.mks0] = 1
569 else:
570 hist["mks0"][el.mks0] += w
571 wei["mks0"][el.mks0] += 1
572 if el.mks1 not in hist["mks1"]:
573 hist["mks1"][el.mks1] = w
574 wei["mks1"][el.mks1] = 1
575 else:
576 hist["mks1"][el.mks1] += w
577 wei["mks1"][el.mks1] += 1
578 if el.mks2 not in hist["mks2"]:
579 hist["mks2"][el.mks2] = w
580 wei["mks2"][el.mks2] = 1
581 else:
582 hist["mks2"][el.mks2] += w
583 wei["mks2"][el.mks2] += 1
584 if le not in hist["l"]:
585 hist["l"][le] = w
586 wei["l"][le] = 1
587 else:
588 hist["l"][le] += w
589 wei["l"][le] += 1
590 return res