Source code for mlprodict.onnxrt.ops_cpu.op_max_pool

# -*- encoding: utf-8 -*-
# pylint: disable=E0203,E1101,C0111
"""
Runtime operator.


:githublink:`%|py|7`
"""
import itertools
import numpy
from ..shape_object import ShapeObjectFct
from ._op import OpRun
from .op_max_pool_ import MaxPoolFloat, MaxPoolDouble  # pylint: disable=E0611,E0401


[docs]def _pool_get_output_shape(auto_pad, input_spatial_shape, kernel_spatial_shape, strides_spatial): out_shape = [0] * len(input_spatial_shape) if auto_pad in (b'SAME_UPPER', b'SAME_LOWER'): for i in range(len(input_spatial_shape)): # pylint: disable=C0200 out_shape[i] = int( numpy.ceil( float(input_spatial_shape[i]) / float(strides_spatial[i]))) elif auto_pad == b'VALID': for i in range(len(input_spatial_shape)): # pylint: disable=C0200 out_shape[i] = int( numpy.ceil(float(input_spatial_shape[i] - (kernel_spatial_shape[i] - 1)) / float(strides_spatial[i]))) return out_shape
[docs]def _pool_impl(padded, x_shape, kernel_shape, strides_shape, out_shape, pad_shape, pooling_type, count_include_pad=0): spatial_size = len(x_shape) - 2 y = numpy.zeros([x_shape[0], x_shape[1]] + list(out_shape)) for shape in itertools.product( range(x_shape[0]), range(x_shape[1]), *[range(int((x_shape[i + 2] + pad_shape[i] - kernel_shape[i]) / strides_shape[i] + 1)) for i in range(spatial_size)]): window = padded[shape[0], shape[1]] window_vals = numpy.array( [window[i] for i in list( itertools.product( *[range(strides_shape[i] * shape[i + 2], strides_shape[i] * shape[i + 2] + kernel_shape[i]) for i in range(spatial_size)]))]) if pooling_type == b'AVG': f = numpy.average elif pooling_type == b'MAX': f = numpy.max else: raise NotImplementedError( # pragma: no cover "Pooling type '{}' does not support. Should be AVG, MAX." "".format(pooling_type)) if count_include_pad == 1 and pooling_type == b'AVG': y[shape] = f(window_vals) else: y[shape] = f(window_vals[numpy.where(~numpy.isnan(window_vals))]) return y.astype(numpy.float32)
[docs]class MaxPool(OpRun): atts = {'auto_pad': b'NOTSET', 'ceil_mode': 0, 'dilations': [], 'kernel_shape': [], 'pads': [], 'storage_order': 0, 'strides': []}
[docs] def __init__(self, onnx_node, desc=None, **options): OpRun.__init__(self, onnx_node, desc=desc, expected_attributes=MaxPool.atts, **options) self.auto_pad_ = self.auto_pad.decode('ascii') self.nb_outputs = len(onnx_node.output) self._init()
[docs] def _init(self): self.rt32_ = MaxPoolFloat() self.rt64_ = MaxPoolDouble() for rt in [self.rt32_, self.rt64_]: rt.init(self.auto_pad, numpy.array(self.dilations, dtype=numpy.int64), self.ceil_mode, self.storage_order, numpy.array(self.kernel_shape, dtype=numpy.int64), numpy.array(self.pads, dtype=numpy.int64), numpy.array(self.strides, dtype=numpy.int64))
[docs] def _run(self, X): # pylint: disable=W0221 if X.dtype == numpy.float32: res = self.rt32_.compute(X) else: res = self.rt64_.compute(X) if self.nb_outputs == 1: return res[:1] return res
[docs] def _infer_shapes(self, X): # pylint: disable=W0221 def compute_shape1(xshape): xs = numpy.ones(xshape, dtype=numpy.float32) res, _ = self.rt32_.compute(xs) return res.shape def compute_shape2(xshape): xs = numpy.ones(xshape, dtype=numpy.float32) _, res2 = self.rt32_.compute(xs) return res2.shape if self.nb_outputs == 1: return (ShapeObjectFct(compute_shape1, X, name="MaxPool", dtype=X.dtype), ) return (ShapeObjectFct(compute_shape1, X, name="MaxPool", dtype=X.dtype), ShapeObjectFct(compute_shape2, X, name="MaxPool", dtype=X.dtype))