Train a linear regression with onnxruntime-training on GPU in details#

This example follows the same steps introduced in example Train a linear regression with onnxruntime-training in details but on GPU. This example works on CPU and GPU but automatically chooses GPU if it is available. The main change in this example is the parameter device which indicates where the computation takes place, on CPU or GPU.

A simple linear regression with scikit-learn#

This code begins like example Train a linear regression with onnxruntime-training in details. It creates a graph to train a linear regression initialized with random coefficients.

from pprint import pprint
import numpy
from pandas import DataFrame
from onnx import helper, numpy_helper, TensorProto
from onnxruntime import (
    __version__ as ort_version, get_device,
    TrainingParameters, SessionOptions, TrainingSession)
from onnxruntime.capi._pybind_state import (  # pylint: disable=E0611
    OrtValue as C_OrtValue)
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from onnxcustom.plotting.plotting_onnx import plot_onnxs
from onnxcustom.utils.onnxruntime_helper import get_ort_device
from tqdm import tqdm

X, y = make_regression(n_features=2, bias=2)
X = X.astype(numpy.float32)
y = y.astype(numpy.float32)
X_train, X_test, y_train, y_test = train_test_split(X, y)


def onnx_linear_regression_training(coefs, intercept):
    if len(coefs.shape) == 1:
        coefs = coefs.reshape((1, -1))
    coefs = coefs.T

    # input
    X = helper.make_tensor_value_info(
        'X', TensorProto.FLOAT, [None, coefs.shape[0]])

    # expected input
    label = helper.make_tensor_value_info(
        'label', TensorProto.FLOAT, [None, coefs.shape[1]])

    # output
    Y = helper.make_tensor_value_info(
        'Y', TensorProto.FLOAT, [None, coefs.shape[1]])

    # loss
    loss = helper.make_tensor_value_info('loss', TensorProto.FLOAT, [])

    # inference
    node_matmul = helper.make_node('MatMul', ['X', 'coefs'], ['y1'], name='N1')
    node_add = helper.make_node('Add', ['y1', 'intercept'], ['Y'], name='N2')

    # loss
    node_diff = helper.make_node('Sub', ['Y', 'label'], ['diff'], name='L1')
    node_square = helper.make_node(
        'Mul', ['diff', 'diff'], ['diff2'], name='L2')
    node_square_sum = helper.make_node(
        'ReduceSum', ['diff2'], ['loss'], name='L3')

    # initializer
    init_coefs = numpy_helper.from_array(coefs, name="coefs")
    init_intercept = numpy_helper.from_array(intercept, name="intercept")

    # graph
    graph_def = helper.make_graph(
        [node_matmul, node_add, node_diff, node_square, node_square_sum],
        'lrt', [X, label], [loss, Y], [init_coefs, init_intercept])
    model_def = helper.make_model(
        graph_def, producer_name='orttrainer', ir_version=7,
        producer_version=ort_version,
        opset_imports=[helper.make_operatorsetid('', 14)])
    return model_def


onx_train = onnx_linear_regression_training(
    numpy.random.randn(2).astype(numpy.float32),
    numpy.random.randn(1).astype(numpy.float32))

plot_onnxs(onx_train, title="Graph with Loss")
Graph with Loss
<AxesSubplot: title={'center': 'Graph with Loss'}>

First iterations of training on GPU#

Prediction needs an instance of class InferenceSession, the training needs an instance of class TrainingSession. Next function creates this one.

device = "cuda" if get_device().upper() == 'GPU' else 'cpu'

print(f"device={device!r} get_device()={get_device()!r}")
device='cpu' get_device()='CPU'

Function creating the training session.

def create_training_session(
        training_onnx, weights_to_train, loss_output_name='loss',
        training_optimizer_name='SGDOptimizer', device='cpu'):
    """
    Creates an instance of class `TrainingSession`.

    :param training_onnx: ONNX graph used to train
    :param weights_to_train: names of initializers to be optimized
    :param loss_output_name: name of the loss output
    :param training_optimizer_name: optimizer name
    :param device: `'cpu'` or `'cuda'`
    :return: instance of `TrainingSession`
    """
    ort_parameters = TrainingParameters()
    ort_parameters.loss_output_name = loss_output_name

    output_types = {}
    for output in training_onnx.graph.output:
        output_types[output.name] = output.type.tensor_type

    ort_parameters.weights_to_train = set(weights_to_train)
    ort_parameters.training_optimizer_name = training_optimizer_name

    ort_parameters.optimizer_attributes_map = {
        name: {} for name in weights_to_train}
    ort_parameters.optimizer_int_attributes_map = {
        name: {} for name in weights_to_train}

    session_options = SessionOptions()
    session_options.use_deterministic_compute = True

    if hasattr(device, 'device_type'):
        if device.device_type() == device.cpu():
            provider = ['CPUExecutionProvider']
        elif device.device_type() == device.cuda():
            provider = ['CUDAExecutionProvider']
        else:
            raise ValueError(f"Unexpected device {device!r}.")
    else:
        if device == 'cpu':
            provider = ['CPUExecutionProvider']
        elif device.startswith("cuda"):
            provider = ['CUDAExecutionProvider']
        else:
            raise ValueError(f"Unexpected device {device!r}.")

    session = TrainingSession(
        training_onnx.SerializeToString(), ort_parameters, session_options,
        providers=provider)
    return session


train_session = create_training_session(
    onx_train, ['coefs', 'intercept'], device=device)
print(train_session)
<onnxruntime.capi.training.training_session.TrainingSession object at 0x7faf89108700>

The coefficients.

state_tensors = train_session.get_state()
pprint(state_tensors)
{'coefs': array([[-0.15773152],
       [ 0.9348314 ]], dtype=float32),
 'intercept': array([-0.64845073], dtype=float32)}

We can now check the coefficients are updated after one iteration.

dev = get_ort_device(device)
ortx = C_OrtValue.ortvalue_from_numpy(X_train[:1], dev)
orty = C_OrtValue.ortvalue_from_numpy(y_train[:1].reshape((-1, 1)), dev)
ortlr = C_OrtValue.ortvalue_from_numpy(
    numpy.array([0.01], dtype=numpy.float32), dev)

bind = train_session.io_binding()._iobinding
bind.bind_ortvalue_input('X', ortx)
bind.bind_ortvalue_input('label', orty)
bind.bind_ortvalue_input('Learning_Rate', ortlr)
bind.bind_output('loss', dev)
train_session._sess.run_with_iobinding(bind, None)
outputs = bind.copy_outputs_to_cpu()
pprint(outputs)
[array([[8007.567]], dtype=float32)]

We check the coefficients have changed.

state_tensors = train_session.get_state()
pprint(state_tensors)
{'coefs': array([[1.3895737],
       [2.207656 ]], dtype=float32),
 'intercept': array([1.1412494], dtype=float32)}

Training on GPU#

We still need to implement a gradient descent. Let’s wrap this into a class similar following scikit-learn’s API. It needs to have an extra parameter device.

class DataLoaderDevice:
    """
    Draws consecutive random observations from a dataset
    by batch. It iterates over the datasets by drawing
    *batch_size* consecutive observations.

    :param X: features
    :param y: labels
    :param batch_size: batch size (consecutive observations)
    :param device: `'cpu'`, `'cuda'`, `'cuda:0'`, ...
    """

    def __init__(self, X, y, batch_size=20, device='cpu'):
        if len(y.shape) == 1:
            y = y.reshape((-1, 1))
        if X.shape[0] != y.shape[0]:
            raise ValueError(
                f"Shape mismatch X.shape={X.shape!r}, y.shape={y.shape!r}.")
        self.X = numpy.ascontiguousarray(X)
        self.y = numpy.ascontiguousarray(y)
        self.batch_size = batch_size
        self.device = get_ort_device(device)

    def __len__(self):
        "Returns the number of observations."
        return self.X.shape[0]

    def __iter__(self):
        """
        Iterates over the datasets by drawing
        *batch_size* consecutive observations.
        """
        N = 0
        b = len(self) - self.batch_size
        while N < len(self):
            i = numpy.random.randint(0, b)
            N += self.batch_size
            yield (
                C_OrtValue.ortvalue_from_numpy(
                    self.X[i:i + self.batch_size],
                    self.device),
                C_OrtValue.ortvalue_from_numpy(
                    self.y[i:i + self.batch_size],
                    self.device))

    @property
    def data(self):
        "Returns a tuple of the datasets."
        return self.X, self.y


data_loader = DataLoaderDevice(X_train, y_train, batch_size=2)


for i, batch in enumerate(data_loader):
    if i >= 2:
        break
    print(f"batch {i!r}: {batch!r}")
batch 0: (<onnxruntime.capi.onnxruntime_pybind11_state.OrtValue object at 0x7faf8c6e2a70>, <onnxruntime.capi.onnxruntime_pybind11_state.OrtValue object at 0x7faf8c6e27f0>)
batch 1: (<onnxruntime.capi.onnxruntime_pybind11_state.OrtValue object at 0x7faf8c6e28f0>, <onnxruntime.capi.onnxruntime_pybind11_state.OrtValue object at 0x7faf8c6e2370>)

The training algorithm.

class CustomTraining:
    """
    Implements a simple :epkg:`Stochastic Gradient Descent`.

    :param model_onnx: ONNX graph to train
    :param weights_to_train: list of initializers to train
    :param loss_output_name: name of output loss
    :param max_iter: number of training iterations
    :param training_optimizer_name: optimizing algorithm
    :param batch_size: batch size (see class *DataLoader*)
    :param eta0: initial learning rate for the `'constant'`, `'invscaling'`
        or `'adaptive'` schedules.
    :param alpha: constant that multiplies the regularization term,
        the higher the value, the stronger the regularization.
        Also used to compute the learning rate when set to *learning_rate*
        is set to `'optimal'`.
    :param power_t: exponent for inverse scaling learning rate
    :param learning_rate: learning rate schedule:
        * `'constant'`: `eta = eta0`
        * `'optimal'`: `eta = 1.0 / (alpha * (t + t0))` where *t0* is chosen
            by a heuristic proposed by Leon Bottou.
        * `'invscaling'`: `eta = eta0 / pow(t, power_t)`
    :param device: `'cpu'` or `'cuda'`
    :param verbose: use :epkg:`tqdm` to display the training progress
    """

    def __init__(self, model_onnx, weights_to_train, loss_output_name='loss',
                 max_iter=100, training_optimizer_name='SGDOptimizer',
                 batch_size=10, eta0=0.01, alpha=0.0001, power_t=0.25,
                 learning_rate='invscaling', device='cpu', verbose=0):
        # See https://scikit-learn.org/stable/modules/generated/
        # sklearn.linear_model.SGDRegressor.html
        self.model_onnx = model_onnx
        self.batch_size = batch_size
        self.weights_to_train = weights_to_train
        self.loss_output_name = loss_output_name
        self.training_optimizer_name = training_optimizer_name
        self.verbose = verbose
        self.max_iter = max_iter
        self.eta0 = eta0
        self.alpha = alpha
        self.power_t = power_t
        self.learning_rate = learning_rate.lower()
        self.device = get_ort_device(device)

    def _init_learning_rate(self):
        self.eta0_ = self.eta0
        if self.learning_rate == "optimal":
            typw = numpy.sqrt(1.0 / numpy.sqrt(self.alpha))
            self.eta0_ = typw / max(1.0, (1 + typw) * 2)
            self.optimal_init_ = 1.0 / (self.eta0_ * self.alpha)
        else:
            self.eta0_ = self.eta0
        return self.eta0_

    def _update_learning_rate(self, t, eta):
        if self.learning_rate == "optimal":
            eta = 1.0 / (self.alpha * (self.optimal_init_ + t))
        elif self.learning_rate == "invscaling":
            eta = self.eta0_ / numpy.power(t + 1, self.power_t)
        return eta

    def fit(self, X, y):
        """
        Trains the model.
        :param X: features
        :param y: expected output
        :return: self
        """
        self.train_session_ = create_training_session(
            self.model_onnx, self.weights_to_train,
            loss_output_name=self.loss_output_name,
            training_optimizer_name=self.training_optimizer_name,
            device=self.device)

        data_loader = DataLoaderDevice(
            X, y, batch_size=self.batch_size, device=self.device)
        lr = self._init_learning_rate()
        self.input_names_ = [i.name for i in self.train_session_.get_inputs()]
        self.output_names_ = [
            o.name for o in self.train_session_.get_outputs()]
        self.loss_index_ = self.output_names_.index(self.loss_output_name)

        bind = self.train_session_.io_binding()._iobinding

        loop = (
            tqdm(range(self.max_iter))
            if self.verbose else range(self.max_iter))
        train_losses = []
        for it in loop:
            bind_lr = C_OrtValue.ortvalue_from_numpy(
                numpy.array([lr], dtype=numpy.float32),
                self.device)
            loss = self._iteration(data_loader, bind_lr, bind)
            lr = self._update_learning_rate(it, lr)
            if self.verbose > 1:
                loop.set_description(f"loss={loss:1.3g} lr={lr:1.3g}")
            train_losses.append(loss)
        self.train_losses_ = train_losses
        self.trained_coef_ = self.train_session_.get_state()
        return self

    def _iteration(self, data_loader, learning_rate, bind):
        actual_losses = []
        for batch_idx, (data, target) in enumerate(data_loader):

            bind.bind_ortvalue_input(self.input_names_[0], data)
            bind.bind_ortvalue_input(self.input_names_[1], target)
            bind.bind_ortvalue_input(self.input_names_[2], learning_rate)
            bind.bind_output('loss', self.device)
            self.train_session_._sess.run_with_iobinding(bind, None)
            outputs = bind.copy_outputs_to_cpu()
            actual_losses.append(outputs[self.loss_index_])
        return numpy.array(actual_losses).mean()

Let’s now train the model in a very similar way that it would be done with scikit-learn.

trainer = CustomTraining(onx_train, ['coefs', 'intercept'], verbose=1,
                         max_iter=10, device=device)
trainer.fit(X, y)
print("training losses:", trainer.train_losses_)

df = DataFrame({"iteration": numpy.arange(len(trainer.train_losses_)),
                "loss": trainer.train_losses_})
df.set_index('iteration').plot(title="Training loss", logy=True)
Training loss
  0%|          | 0/10 [00:00<?, ?it/s]
100%|##########| 10/10 [00:00<00:00, 438.68it/s]
training losses: [nan, nan, nan, nan, nan, nan, nan, nan, nan, nan]

<AxesSubplot: title={'center': 'Training loss'}, xlabel='iteration'>

The final coefficients.

print("onnxruntime", trainer.trained_coef_)
onnxruntime {'coefs': array([[nan],
       [nan]], dtype=float32), 'intercept': array([nan], dtype=float32)}

Total running time of the script: ( 0 minutes 1.537 seconds)

Gallery generated by Sphinx-Gallery