# -*- encoding: utf-8 -*-
# pylint: disable=E0203,E1101,C0111
"""
Runtime operator.
:githublink:`%|py|7`
"""
import numpy
from ._op import OpRun
from ..shape_object import ShapeObject
[docs]class RNN(OpRun):
atts = {
'activation_alpha': [0.],
'activation_beta': [0.],
'activations': ['tanh', 'tanh'],
'clip': [],
'direction': 'forward',
'hidden_size': None,
}
[docs] def __init__(self, onnx_node, desc=None, **options):
OpRun.__init__(self, onnx_node, desc=desc,
expected_attributes=RNN.atts,
**options)
if self.direction in ("forward", "reverse"):
self.num_directions = 1
elif self.direction == "bidirectional":
self.num_directions = 2
else:
raise RuntimeError( # pragma: no cover
"Unknown direction '{}'.".format(self.direction))
if len(self.activation_alpha) != self.num_directions:
raise RuntimeError( # pragma: no cover
"activation_alpha must have the same size as num_directions={}".format(
self.num_directions))
if len(self.activation_beta) != self.num_directions:
raise RuntimeError( # pragma: no cover
"activation_beta must have the same size as num_directions={}".format(
self.num_directions))
self.f1 = self.choose_act(self.activations[0],
self.activation_alpha[0],
self.activation_beta[0])
if len(self.activations) > 1:
self.f2 = self.choose_act(self.activations[1],
self.activation_alpha[1],
self.activation_beta[1])
self.nb_outputs = len(onnx_node.output)
def choose_act(self, name, alpha, beta):
if name == b"Tanh":
return self._f_tanh
if name == b"Affine":
return lambda x: x * alpha + beta
raise RuntimeError( # pragma: no cover
"Unknown activation function '{}'.".format(name))
[docs] def _f_tanh(self, x):
return numpy.tanh(x)
[docs] def _step(self, X, R, B, W, H_0):
h_list = []
H_t = H_0
for x in numpy.split(X, X.shape[0], axis=0):
H = self.f1(numpy.dot(x, numpy.transpose(W)) +
numpy.dot(H_t, numpy.transpose(R)) +
numpy.add(*numpy.split(B, 2)))
h_list.append(H)
H_t = H
concatenated = numpy.concatenate(h_list)
if self.num_directions == 1:
output = numpy.expand_dims(concatenated, 1)
return output, h_list[-1]
[docs] def _run(self, X, W, R, B=None, sequence_lens=None, initial_h=None): # pylint: disable=W0221
self.num_directions = W.shape[0]
if self.num_directions == 1:
R = numpy.squeeze(R, axis=0)
W = numpy.squeeze(W, axis=0)
if B is not None:
B = numpy.squeeze(B, axis=0)
if sequence_lens is not None:
sequence_lens = numpy.squeeze(sequence_lens, axis=0)
if initial_h is not None:
initial_h = numpy.squeeze(initial_h, axis=0)
hidden_size = R.shape[-1]
batch_size = X.shape[1]
b = (B if B is not None else
numpy.zeros(2 * hidden_size, dtype=numpy.float32))
h_0 = (initial_h if initial_h is not None else
numpy.zeros((batch_size, hidden_size), dtype=numpy.float32))
B = b
H_0 = h_0
else:
raise NotImplementedError() # pragma: no cover
Y, Y_h = self._step(X, R, B, W, H_0)
return (Y, ) if self.nb_outputs == 1 else (Y, Y_h)
[docs] def _infer_shapes(self, X, W, R, B=None, sequence_lens=None, initial_h=None): # pylint: disable=W0221
num_directions = W.shape[0]
if num_directions == 1:
hidden_size = R[-1]
batch_size = X[1]
y_shape = ShapeObject((X[0], num_directions, batch_size, hidden_size),
dtype=X.dtype)
else:
raise NotImplementedError() # pragma: no cover
if self.nb_outputs == 1:
return (y_shape, )
y_h_shape = ShapeObject((num_directions, batch_size, hidden_size),
dtype=X.dtype)
return (y_shape, y_h_shape)