When to parallelize?

That is the question. Parallize computation takes some time to set up, it is not the right solution in every case. The following example studies the parallelism introduced into the runtime of TreeEnsembleRegressor to see when it is best to do it.

from pprint import pprint
import numpy
from pandas import DataFrame
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.datasets import make_regression
from sklearn.experimental import enable_hist_gradient_boosting
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.model_selection import train_test_split
from cpyquickhelper.numbers.speed_measure import measure_time
from pyquickhelper.pycode.profiling import profile
from mlprodict.onnx_conv import to_onnx, register_rewritten_operators
from mlprodict.onnxrt import OnnxInference
from mlprodict.tools.model_info import analyze_model

Training and converting a model

data = make_regression(100000, 20)
X, y = data
X_train, X_test, y_train, y_test = train_test_split(X, y)

hgb = HistGradientBoostingRegressor(max_iter=100, max_depth=6)
hgb.fit(X_train, y_train)
print(hgb)

Out:

HistGradientBoostingRegressor(max_depth=6)

Let’s get more statistics about the model itself.

pprint(analyze_model(hgb))

Out:

{'_predictors.max|tree_.max_depth': 6,
 '_predictors.size': 100,
 '_predictors.sum|tree_.leave_count': 3100,
 '_predictors.sum|tree_.node_count': 6100,
 'n_features_': 20,
 'train_score_.shape': 101,
 'validation_score_.shape': 101}

And let’s convert it.

register_rewritten_operators()
onx = to_onnx(hgb, X_train[:1].astype(numpy.float32))
oinf = OnnxInference(onx, runtime='python_compiled')
print(oinf)

Out:

OnnxInference(...)
    def compiled_run(dict_inputs):
        # inputs
        X = dict_inputs['X']
        (variable, ) = n0_treeensembleregressor(X)
        return {
            'variable': variable,
        }

The runtime of the forest is in the following object.

print(oinf.sequence_[0].ops_)
print(oinf.sequence_[0].ops_.rt_)

Out:

TreeEnsembleRegressor(
    op_type=TreeEnsembleRegressor
    aggregate_function=b'SUM',
    base_values=[-0.40719593],
    domain=ai.onnx.ml,
    inplaces={},
    ir_version=6,
    n_targets=1,
    nodes_falsenodeids=[28 17 10 ...  0  0  0],
    nodes_featureids=[15 19  9 ...  0  0  0],
    nodes_hitrates=[1. 1. 1. ... 1. 1. 1.],
    nodes_missing_value_tracks_true=[0 1 0 ... 0 0 0],
    nodes_modes=[b'BRANCH_LEQ' b'BRANCH_LEQ' b'BRANCH_LEQ' ... b'LEAF' b'LEAF' b'LEAF'],
    nodes_nodeids=[ 0  1  2 ... 58 59 60],
    nodes_treeids=[ 0  0  0 ... 99 99 99],
    nodes_truenodeids=[1 2 3 ... 0 0 0],
    nodes_values=[-0.10471009  0.04022047 -0.13770805 ...  0.          0.
  0.        ],
    post_transform=b'NONE',
    target_ids=[0 0 0 ... 0 0 0],
    target_nodeids=[ 5  6  8 ... 58 59 60],
    target_opset=1,
    target_treeids=[ 0  0  0 ... 99 99 99],
    target_weights=[-32.84931   -22.256498  -22.161201  ...   1.3725924   1.519581
   1.7092005],
)
<mlprodict.onnxrt.ops_cpu.op_tree_ensemble_regressor_p_.RuntimeTreeEnsembleRegressorPFloat object at 0x7f5eafe1dd50>

And the threshold used to start parallelizing based on the number of observations.

print(oinf.sequence_[0].ops_.rt_.omp_N_)

Out:

20

Profiling

This step involves pyinstrument to measure where the time is spent. Both scikit-learn and mlprodict runtime are called so that the prediction times can be compared.

X32 = X_test.astype(numpy.float32)


def runlocal():
    for i in range(0, 100):
        oinf.run({'X': X32[:1000]})
        hgb.predict(X_test[:1000])


print("profiling...")
txt = profile(runlocal, pyinst_format='text')
print(txt[1])

Out:

profiling...

  _     ._   __/__   _ _  _  _ _/_   Recorded: 03:37:50 AM Samples:  453
 /_//_/// /_\ / //_// / //_'/ //     Duration: 1.730     CPU time: 6.035
/   _/                      v3.2.0

Program: somewhere/workspace/mlprodict/mlprodict_UT_37_std/_doc/examples/plot_parallelism.py

1.730 profile  ../pycode/profiling.py:49
`- 1.730 runlocal  plot_parallelism.py:85
      [15 frames hidden]  plot_parallelism, sklearn, <built-in>...
         0.805 _predict_from_numeric_data  <built-in>:0
         0.829 _run  mlprodict/onnxrt/ops_cpu/op_tree_ensemble_regressor.py:70

Now let’s measure the performance the average computation time per observations for 2 to 100 observations. The runtime implemented in mlprodict parallizes the computation after a given number of observations.

obs = []
for N in tqdm(list(range(2, 21))):
    m = measure_time("oinf.run({'X': x})",
                     {'oinf': oinf, 'x': X32[:N]},
                     div_by_number=True,
                     number=20)
    m['N'] = N
    m['RT'] = 'ONNX'
    obs.append(m)

    m = measure_time("hgb.predict(x)",
                     {'hgb': hgb, 'x': X32[:N]},
                     div_by_number=True,
                     number=15)
    m['N'] = N
    m['RT'] = 'SKL'
    obs.append(m)

df = DataFrame(obs)
num = ['min_exec', 'average', 'max_exec']
for c in num:
    df[c] /= df['N']
df.head()

Out:

  0%|          | 0/19 [00:00<?, ?it/s]
  5%|5         | 1/19 [00:00<00:05,  3.01it/s]
 11%|#         | 2/19 [00:00<00:05,  2.98it/s]
 16%|#5        | 3/19 [00:01<00:05,  2.89it/s]
 21%|##1       | 4/19 [00:01<00:04,  3.03it/s]
 26%|##6       | 5/19 [00:01<00:04,  3.11it/s]
 32%|###1      | 6/19 [00:01<00:04,  3.00it/s]
 37%|###6      | 7/19 [00:02<00:03,  3.05it/s]
 42%|####2     | 8/19 [00:02<00:03,  3.12it/s]
 47%|####7     | 9/19 [00:02<00:03,  3.15it/s]
 53%|#####2    | 10/19 [00:03<00:02,  3.08it/s]
 58%|#####7    | 11/19 [00:03<00:02,  2.94it/s]
 63%|######3   | 12/19 [00:04<00:02,  2.87it/s]
 68%|######8   | 13/19 [00:04<00:02,  2.32it/s]
 74%|#######3  | 14/19 [00:05<00:02,  2.39it/s]
 79%|#######8  | 15/19 [00:05<00:01,  2.63it/s]
 84%|########4 | 16/19 [00:05<00:01,  2.81it/s]
 89%|########9 | 17/19 [00:05<00:00,  2.95it/s]
 95%|#########4| 18/19 [00:06<00:00,  3.05it/s]
100%|##########| 19/19 [00:06<00:00,  3.13it/s]
100%|##########| 19/19 [00:06<00:00,  2.92it/s]
average deviation min_exec max_exec repeat number context_size N RT
0 0.000006 0.000002 0.000006 0.000009 10 20 240 2 ONNX
1 0.001098 0.000510 0.000920 0.001844 10 15 240 2 SKL
2 0.000006 0.000002 0.000005 0.000008 10 20 240 3 ONNX
3 0.000754 0.000862 0.000583 0.001557 10 15 240 3 SKL
4 0.000006 0.000003 0.000005 0.000008 10 20 240 4 ONNX


Graph.

fig, ax = plt.subplots(1, 2, figsize=(10, 4))
df[df.RT == 'ONNX'].set_index('N')[num].plot(ax=ax[0])
ax[0].set_title("Average ONNX prediction time per observation in a batch.")
df[df.RT == 'SKL'].set_index('N')[num].plot(ax=ax[1])
ax[1].set_title(
    "Average scikit-learn prediction time\nper observation in a batch.")
Average ONNX prediction time per observation in a batch., Average scikit-learn prediction time per observation in a batch.

Gain from parallelization

There is a clear gap between after and before 10 observations when it is parallelized. Does this threshold depends on the number of trees in the model? For that we compute for each model the average prediction time up to 10 and from 10 to 20.

def parallized_gain(df):
    df = df[df.RT == 'ONNX']
    df10 = df[df.N <= 10]
    t10 = sum(df10['average']) / df10.shape[0]
    df10p = df[df.N > 10]
    t10p = sum(df10p['average']) / df10p.shape[0]
    return t10 / t10p


print('gain', parallized_gain(df))

Out:

gain 0.9448635949690766

Measures based on the number of trees

We trained many models with different number of trees to see how the parallelization gain is moving. One models is trained for every distinct number of trees and then the prediction time is measured for different number of observations.

tries = [(nb, N)
         for N in range(2, 21, 2)
         for nb in ([2, 5, 8] + list(range(10, 50, 5)) + list(range(50, 101, 10)))]

training

models = {100: (hgb, oinf)}
for nb in tqdm(set(_[0] for _ in tries)):
    if nb not in models:
        hgb = HistGradientBoostingRegressor(max_iter=nb, max_depth=6)
        hgb.fit(X_train, y_train)
        onx = to_onnx(hgb, X_train[:1].astype(numpy.float32))
        oinf = OnnxInference(onx, runtime='python_compiled')
        models[nb] = (hgb, oinf)

Out:

  0%|          | 0/17 [00:00<?, ?it/s]
  6%|5         | 1/17 [00:00<00:03,  4.30it/s]
 12%|#1        | 2/17 [00:00<00:05,  2.83it/s]
 24%|##3       | 4/17 [00:01<00:03,  3.48it/s]
 29%|##9       | 5/17 [00:02<00:06,  1.93it/s]
 35%|###5      | 6/17 [00:02<00:04,  2.20it/s]
 41%|####1     | 7/17 [00:03<00:05,  1.83it/s]
 47%|####7     | 8/17 [00:03<00:04,  2.07it/s]
 53%|#####2    | 9/17 [00:04<00:04,  1.77it/s]
 59%|#####8    | 10/17 [00:04<00:03,  1.94it/s]
 65%|######4   | 11/17 [00:05<00:04,  1.40it/s]
 71%|#######   | 12/17 [00:06<00:03,  1.34it/s]
 76%|#######6  | 13/17 [00:07<00:02,  1.51it/s]
 82%|########2 | 14/17 [00:07<00:01,  1.62it/s]
 88%|########8 | 15/17 [00:09<00:01,  1.02it/s]
 94%|#########4| 16/17 [00:26<00:05,  5.88s/it]
100%|##########| 17/17 [00:29<00:00,  4.87s/it]
100%|##########| 17/17 [00:29<00:00,  1.73s/it]

prediction time

obs = []

for nb, N in tqdm(tries):
    hgb, oinf = models[nb]
    m = measure_time("oinf.run({'X': x})",
                     {'oinf': oinf, 'x': X32[:N]},
                     div_by_number=True,
                     number=50)
    m['N'] = N
    m['nb'] = nb
    m['RT'] = 'ONNX'
    obs.append(m)

df = DataFrame(obs)
num = ['min_exec', 'average', 'max_exec']
for c in num:
    df[c] /= df['N']
df.head()

Out:

  0%|          | 0/170 [00:00<?, ?it/s]
 16%|#6        | 28/170 [00:00<00:00, 271.52it/s]
 28%|##7       | 47/170 [00:00<00:00, 233.71it/s]
 36%|###6      | 62/170 [00:00<00:00, 197.79it/s]
 44%|####4     | 75/170 [00:00<00:00, 159.41it/s]
 51%|#####1    | 87/170 [00:00<00:00, 119.96it/s]
 58%|#####8    | 99/170 [00:00<00:00, 112.62it/s]
 65%|######4   | 110/170 [00:00<00:00, 99.28it/s]
 71%|#######   | 120/170 [00:01<00:00, 66.55it/s]
 77%|#######7  | 131/170 [00:01<00:00, 74.67it/s]
 82%|########2 | 140/170 [00:01<00:00, 55.73it/s]
 87%|########7 | 148/170 [00:01<00:00, 60.89it/s]
 92%|#########1| 156/170 [00:01<00:00, 49.80it/s]
 97%|#########7| 165/170 [00:01<00:00, 55.57it/s]
100%|##########| 170/170 [00:02<00:00, 78.08it/s]
average deviation min_exec max_exec repeat number context_size N nb RT
0 0.000002 1.080259e-07 0.000002 0.000002 10 50 240 2 2 ONNX
1 0.000002 4.447435e-08 0.000002 0.000002 10 50 240 2 5 ONNX
2 0.000002 7.397110e-08 0.000002 0.000003 10 50 240 2 8 ONNX
3 0.000003 1.204489e-07 0.000002 0.000003 10 50 240 2 10 ONNX
4 0.000003 9.929579e-08 0.000003 0.000003 10 50 240 2 15 ONNX


Let’s compute the gains.

gains = []
for nb in set(df['nb']):
    gain = parallized_gain(df[df.nb == nb])
    gains.append(dict(nb=nb, gain=gain))

dfg = DataFrame(gains)
dfg = dfg.sort_values('nb').reset_index(drop=True).copy()
dfg
nb gain
0 2 3.256118
1 5 2.913289
2 8 2.734518
3 10 2.588864
4 15 2.209568
5 20 1.978319
6 25 1.735174
7 30 1.542318
8 35 1.376555
9 40 1.238024
10 45 1.110165
11 50 1.058352
12 60 0.928100
13 70 0.886466
14 80 0.717878
15 90 0.805956
16 100 0.810670


Graph.

ax = dfg.set_index('nb').plot()
ax.set_title(
    "Parallelization gain depending\non the number of trees\n(max_depth=6).")
Parallelization gain depending on the number of trees (max_depth=6).

That does not answer the question we are looking for as we would like to know the best threshold th which defines the number of observations for which we should parallelized. This number depends on the number of trees. A gain > 1 means the parallization should happen Here, even two observations is ok. Let’s check with lighter trees (max_depth=2), maybe in that case, the conclusion is different.

models = {100: (hgb, oinf)}
for nb in tqdm(set(_[0] for _ in tries)):
    if nb not in models:
        hgb = HistGradientBoostingRegressor(max_iter=nb, max_depth=2)
        hgb.fit(X_train, y_train)
        onx = to_onnx(hgb, X_train[:1].astype(numpy.float32))
        oinf = OnnxInference(onx, runtime='python_compiled')
        models[nb] = (hgb, oinf)

obs = []
for nb, N in tqdm(tries):
    hgb, oinf = models[nb]
    m = measure_time("oinf.run({'X': x})",
                     {'oinf': oinf, 'x': X32[:N]},
                     div_by_number=True,
                     number=50)
    m['N'] = N
    m['nb'] = nb
    m['RT'] = 'ONNX'
    obs.append(m)

df = DataFrame(obs)
num = ['min_exec', 'average', 'max_exec']
for c in num:
    df[c] /= df['N']
df.head()

Out:

  0%|          | 0/17 [00:00<?, ?it/s]
  6%|5         | 1/17 [00:00<00:12,  1.30it/s]
 12%|#1        | 2/17 [00:02<00:15,  1.01s/it]
 24%|##3       | 4/17 [00:02<00:10,  1.28it/s]
 29%|##9       | 5/17 [00:06<00:18,  1.54s/it]
 35%|###5      | 6/17 [00:06<00:14,  1.28s/it]
 41%|####1     | 7/17 [00:07<00:12,  1.24s/it]
 47%|####7     | 8/17 [00:08<00:09,  1.04s/it]
 53%|#####2    | 9/17 [00:11<00:12,  1.54s/it]
 59%|#####8    | 10/17 [00:15<00:16,  2.39s/it]
 65%|######4   | 11/17 [00:22<00:22,  3.73s/it]
 71%|#######   | 12/17 [00:25<00:17,  3.49s/it]
 76%|#######6  | 13/17 [00:26<00:11,  2.77s/it]
 82%|########2 | 14/17 [00:26<00:06,  2.07s/it]
 88%|########8 | 15/17 [00:29<00:04,  2.25s/it]
 94%|#########4| 16/17 [00:32<00:02,  2.33s/it]
100%|##########| 17/17 [00:33<00:00,  2.01s/it]
100%|##########| 17/17 [00:33<00:00,  1.96s/it]

  0%|          | 0/170 [00:00<?, ?it/s]
 15%|#5        | 26/170 [00:00<00:00, 258.51it/s]
 30%|###       | 51/170 [00:00<00:00, 248.97it/s]
 44%|####3     | 74/170 [00:00<00:00, 241.48it/s]
 54%|#####3    | 91/170 [00:00<00:00, 214.39it/s]
 63%|######2   | 107/170 [00:00<00:00, 186.59it/s]
 72%|#######2  | 123/170 [00:00<00:00, 162.81it/s]
 81%|########1 | 138/170 [00:00<00:00, 141.10it/s]
 89%|########9 | 152/170 [00:00<00:00, 140.25it/s]
 98%|#########7| 166/170 [00:01<00:00, 130.74it/s]
100%|##########| 170/170 [00:01<00:00, 151.40it/s]
average deviation min_exec max_exec repeat number context_size N nb RT
0 0.000002 9.230711e-08 0.000002 0.000002 10 50 240 2 2 ONNX
1 0.000002 6.638472e-08 0.000002 0.000002 10 50 240 2 5 ONNX
2 0.000002 1.255782e-07 0.000002 0.000003 10 50 240 2 8 ONNX
3 0.000002 7.788895e-08 0.000002 0.000002 10 50 240 2 10 ONNX
4 0.000002 4.819410e-08 0.000002 0.000002 10 50 240 2 15 ONNX


Measures.

gains = []
for nb in set(df['nb']):
    gain = parallized_gain(df[df.nb == nb])
    gains.append(dict(nb=nb, gain=gain))

dfg = DataFrame(gains)
dfg = dfg.sort_values('nb').reset_index(drop=True).copy()
dfg
nb gain
0 2 3.536813
1 5 3.300619
2 8 3.201020
3 10 3.023804
4 15 2.839964
5 20 2.670700
6 25 2.473901
7 30 2.311176
8 35 2.196866
9 40 2.017637
10 45 4.899798
11 50 1.759624
12 60 1.600092
13 70 1.431155
14 80 2.924146
15 90 1.239434
16 100 0.925917


Graph.

ax = dfg.set_index('nb').plot()
ax.set_title(
    "Parallelization gain depending\non the number of trees\n(max_depth=3).")
Parallelization gain depending on the number of trees (max_depth=3).

The conclusion is somewhat the same but it shows that the bigger the number of trees is the bigger the gain is and under the number of cores of the processor.

Moving the theshold

The last experiment consists in comparing the prediction time with or without parallelization for different number of observation.

hgb = HistGradientBoostingRegressor(max_iter=40, max_depth=6)
hgb.fit(X_train, y_train)
onx = to_onnx(hgb, X_train[:1].astype(numpy.float32))
oinf = OnnxInference(onx, runtime='python_compiled')


obs = []
for N in tqdm(list(range(2, 51))):
    oinf.sequence_[0].ops_.rt_.omp_N_ = 100
    m = measure_time("oinf.run({'X': x})",
                     {'oinf': oinf, 'x': X32[:N]},
                     div_by_number=True,
                     number=20)
    m['N'] = N
    m['RT'] = 'ONNX'
    m['PARALLEL'] = False
    obs.append(m)

    oinf.sequence_[0].ops_.rt_.omp_N_ = 1
    m = measure_time("oinf.run({'X': x})",
                     {'oinf': oinf, 'x': X32[:N]},
                     div_by_number=True,
                     number=50)
    m['N'] = N
    m['RT'] = 'ONNX'
    m['PARALLEL'] = True
    obs.append(m)

df = DataFrame(obs)
num = ['min_exec', 'average', 'max_exec']
for c in num:
    df[c] /= df['N']
df.head()

Out:

  0%|          | 0/49 [00:00<?, ?it/s]
  2%|2         | 1/49 [00:00<00:14,  3.29it/s]
  4%|4         | 2/49 [00:01<00:24,  1.92it/s]
  6%|6         | 3/49 [00:02<00:35,  1.28it/s]
  8%|8         | 4/49 [00:04<00:43,  1.04it/s]
 10%|#         | 5/49 [00:05<00:47,  1.09s/it]
 14%|#4        | 7/49 [00:06<00:41,  1.02it/s]
 16%|#6        | 8/49 [00:08<00:45,  1.10s/it]
 18%|#8        | 9/49 [00:08<00:37,  1.07it/s]
 20%|##        | 10/49 [00:10<00:41,  1.07s/it]
 24%|##4       | 12/49 [00:11<00:35,  1.04it/s]
 27%|##6       | 13/49 [00:13<00:39,  1.09s/it]
 29%|##8       | 14/49 [00:14<00:40,  1.16s/it]
 31%|###       | 15/49 [00:15<00:35,  1.04s/it]
 33%|###2      | 16/49 [00:16<00:33,  1.00s/it]
 37%|###6      | 18/49 [00:16<00:24,  1.28it/s]
 39%|###8      | 19/49 [00:17<00:28,  1.03it/s]
 41%|####      | 20/49 [00:19<00:30,  1.06s/it]
 43%|####2     | 21/49 [00:20<00:32,  1.16s/it]
 45%|####4     | 22/49 [00:21<00:26,  1.03it/s]
 47%|####6     | 23/49 [00:22<00:24,  1.06it/s]
 49%|####8     | 24/49 [00:22<00:19,  1.28it/s]
 51%|#####1    | 25/49 [00:23<00:21,  1.12it/s]
 53%|#####3    | 26/49 [00:24<00:18,  1.24it/s]
 55%|#####5    | 27/49 [00:24<00:13,  1.60it/s]
 57%|#####7    | 28/49 [00:25<00:15,  1.36it/s]
 59%|#####9    | 29/49 [00:26<00:18,  1.10it/s]
 63%|######3   | 31/49 [00:27<00:14,  1.21it/s]
 65%|######5   | 32/49 [00:28<00:14,  1.16it/s]
 69%|######9   | 34/49 [00:30<00:12,  1.22it/s]
 71%|#######1  | 35/49 [00:31<00:10,  1.29it/s]
 73%|#######3  | 36/49 [00:32<00:12,  1.03it/s]
 76%|#######5  | 37/49 [00:33<00:11,  1.04it/s]
 80%|#######9  | 39/49 [00:34<00:08,  1.13it/s]
 82%|########1 | 40/49 [00:35<00:08,  1.11it/s]
 84%|########3 | 41/49 [00:37<00:08,  1.05s/it]
 86%|########5 | 42/49 [00:38<00:07,  1.10s/it]
 88%|########7 | 43/49 [00:39<00:07,  1.20s/it]
 90%|########9 | 44/49 [00:41<00:06,  1.27s/it]
 92%|#########1| 45/49 [00:42<00:04,  1.19s/it]
 94%|#########3| 46/49 [00:42<00:02,  1.11it/s]
 96%|#########5| 47/49 [00:43<00:01,  1.21it/s]
 98%|#########7| 48/49 [00:44<00:01,  1.00s/it]
100%|##########| 49/49 [00:45<00:00,  1.13s/it]
100%|##########| 49/49 [00:45<00:00,  1.07it/s]
average deviation min_exec max_exec repeat number context_size N RT PARALLEL
0 0.000004 4.317572e-07 0.000003 0.000004 10 20 240 2 ONNX False
1 0.000302 1.213244e-03 0.000004 0.001733 10 50 240 2 ONNX True
2 0.000003 3.837866e-07 0.000003 0.000003 10 20 240 3 ONNX False
3 0.000682 1.099925e-03 0.000002 0.000938 10 50 240 3 ONNX True
4 0.000002 1.408475e-06 0.000002 0.000004 10 20 240 4 ONNX False


Graph.

piv = df[['N', 'PARALLEL', 'average']].pivot('N', 'PARALLEL', 'average')
ax = piv.plot(logy=True)
ax.set_title("Prediction time with and without parallelization.")
Prediction time with and without parallelization.

Parallelization is working.

plt.show()

Total running time of the script: ( 2 minutes 30.429 seconds)

Gallery generated by Sphinx-Gallery