Source code for mlprodict.onnxrt.ops_cpu.op_tokenizer

# -*- encoding: utf-8 -*-
# pylint: disable=E0203,E1101,C0111
"""
Runtime operator.


:githublink:`%|py|7`
"""
import re
import numpy
from ._op import OpRunUnary, RuntimeTypeError
from ._new_ops import OperatorSchema
from ..shape_object import ShapeObject


[docs]class Tokenizer(OpRunUnary): """ See :epkg:`Tokenizer`. :githublink:`%|py|17` """ atts = {'mark': 0, 'mincharnum': 1, 'pad_value': b'#', 'separators': [], 'tokenexp': b'[a-zA-Z0-9_]+', 'tokenexpsplit': 0, 'stopwords': []}
[docs] def __init__(self, onnx_node, desc=None, **options): OpRunUnary.__init__(self, onnx_node, desc=desc, expected_attributes=Tokenizer.atts, **options) self.char_tokenization_ = ( self.tokenexp == b'.' or list(self.separators) == [b'']) self.stops_ = set(_.decode() for _ in self.stopwords) try: self.str_separators_ = set(_.decode('utf-8') for _ in self.separators) except AttributeError as e: # pragma: no cover raise RuntimeTypeError( "Unable to interpret separators {}.".format(self.separators)) from e if self.tokenexp not in (None, b''): self.tokenexp_ = re.compile(self.tokenexp.decode('utf-8'))
[docs] def _find_custom_operator_schema(self, op_name): if op_name == "Tokenizer": return TokenizerSchema() raise RuntimeError( # pragma: no cover "Unable to find a schema for operator '{}'.".format(op_name))
[docs] def _run(self, text): # pylint: disable=W0221 if self.char_tokenization_: return self._run_char_tokenization(text, self.stops_) if self.str_separators_ is not None and len(self.str_separators_) > 0: return self._run_sep_tokenization( text, self.stops_, self.str_separators_) if self.tokenexp not in (None, ''): return self._run_regex_tokenization( text, self.stops_, self.tokenexp_) raise RuntimeError( # pragma: no cover "Unable to guess which tokenization to use, sep={}, " "tokenexp='{}'.".format(self.separators, self.tokenexp))
[docs] def _run_tokenization(self, text, stops, split): """ Tokenizes a char level. :githublink:`%|py|65` """ max_len = max(map(len, text.flatten())) if self.mark: max_len += 2 begin = 1 else: begin = 0 shape = text.shape + (max_len, ) max_pos = 0 res = numpy.empty(shape, dtype=text.dtype) if len(text.shape) == 1: res[:] = self.pad_value for i in range(text.shape[0]): pos = begin for c in split(text[i]): if c not in stops: res[i, pos] = c pos += 1 if self.mark: res[i, 0] = self.pad_value max_pos = max(pos + 1, max_pos) else: max_pos = max(pos, max_pos) res = res[:, :max_pos] elif len(text.shape) == 2: res[:, :] = self.pad_value for i in range(text.shape[0]): for ii in range(text.shape[1]): pos = begin for c in split(text[i, ii]): if c not in stops: res[i, ii, pos] = c pos += 1 if self.mark: res[i, ii, 0] = self.pad_value max_pos = max(pos + 1, max_pos) else: max_pos = max(pos, max_pos) res = res[:, :, :max_pos] else: raise RuntimeError( # pragma: no cover "Only vector or matrices are supported not shape {}.".format(text.shape)) return (res, )
[docs] def _run_char_tokenization(self, text, stops): """ Tokenizes y charaters. :githublink:`%|py|112` """ def split(t): for c in t: yield c return self._run_tokenization(text, stops, split)
[docs] def _run_sep_tokenization(self, text, stops, separators): """ Tokenizes using separators. The function should use a trie to find text. :githublink:`%|py|122` """ def split(t): begin = 0 pos = 0 while pos < len(t): for sep in separators: if (pos + len(sep) <= len(t) and sep == t[pos: pos + len(sep)]): word = t[begin: pos] yield word begin = pos + len(sep) break pos += 1 if begin < pos: word = t[begin: pos] yield word return self._run_tokenization(text, stops, split)
[docs] def _run_regex_tokenization(self, text, stops, exp): """ Tokenizes using separators. The function should use a trie to find text. :githublink:`%|py|145` """ if self.tokenexpsplit: def split(t): return filter(lambda x: x, exp.split(t)) else: def split(t): return filter(lambda x: x, exp.findall(t)) return self._run_tokenization(text, stops, split)
[docs] def _infer_shapes(self, x): # pylint: disable=E0202,W0221 """ Returns the same shape by default. :githublink:`%|py|157` """ if x.shape is None: return (x, ) if len(x) == 1: return (ShapeObject((x[0], None), dtype=x.dtype, name=self.__class__.__name__), ) if len(x) == 2: return (ShapeObject((x[0], x[1], None), dtype=x.dtype, name=self.__class__.__name__), ) raise RuntimeTypeError( # pragma: no cover "Only two dimension are allowed, got {}.".format(x))
[docs]class TokenizerSchema(OperatorSchema): """ Defines a schema for operators added in this package such as :class:`TreeEnsembleClassifierDouble <mlprodict.onnxrt.ops_cpu.op_tree_ensemble_classifier.TreeEnsembleClassifierDouble>`. :githublink:`%|py|174` """
[docs] def __init__(self): OperatorSchema.__init__(self, 'Tokenizer') self.attributes = Tokenizer.atts