# Parallelization

Stan has support for different types of parallelization: multi-threading with Intel Threading Building Blocks (TBB), multi-processing with Message Passing Interface (MPI) and manycore processing with OpenCL.

Multi-threading in Stan can be used with two mechanisms: reduce with summation and rectangular map. The latter can also be used with multi-processing.

The advantages of reduce with summation are:

- More flexible argument interface, avoiding the packing and unpacking that is necessary with rectanguar map.
- Partitions data for parallelization automatically (this is done manually in rectanguar map).
- Is easier to use.

The advantages of rectangular map are:

- Returns a list of vectors, while the reduce summation returns only a scalar.
- Can be parallelized across multiple cores and multiple computers, while reduce summation can only parallelized across multiple cores on a single machine.

The actual speedup gained from using these functions will depend on many details. It is strongly recommended to only parallelize the computationally most expensive operations in a Stan program. Oftentimes this is the evaluation of the log likelihood for the observed data. When it is not clear which parts of the model is the most computationally expensive, we recommend using profiling, which is available in Stan 2.26 and newer.

Since only portions of a Stan program will run in parallel, the maximal speedup one can achieve is capped, a phenomen described by Amdahl’s law.

## Reduce-sum

It is often necessary in probabilistic modeling to compute the sum of a number of independent function evaluations. This occurs, for instance, when evaluating a number of conditionally independent terms in a log-likelihood. If `g: U -> real`

is the function and `{ x1, x2, ... }`

is an array of inputs, then that sum looks like:

`g(x1) + g(x2) + ...`

`reduce_sum`

and `reduce_sum_static`

are tools for parallelizing these calculations.

For efficiency reasons the reduce function doesn’t work with the element-wise evaluated function `g`

, but instead the partial sum function `f: U[] -> real`

, where `f`

computes the partial sum corresponding to a slice of the sequence `x`

passed in. Due to the associativity of the sum reduction it holds that:

```
g(x1) + g(x2) + g(x3) = f({ x1, x2, x3 })
= f({ x1, x2 }) + f({ x3 })
= f({ x1 }) + f({ x2, x3 }) = f({ x1 }) + f({ x2 }) + f({ x3 })
```

With the partial sum function `f: U[] -> real`

reduction of a large number of terms can be evaluated in parallel automatically, since the overall sum can be partitioned into arbitrary smaller partial sums. The exact partitioning into the partial sums is not under the control of the user. However, since the exact numerical result will depend on the order of summation, Stan provides two versions of the reduce summation facility:

`reduce_sum`

: Automatically choose partial sums partitioning based on a dynamic scheduling algorithm.`reduce_sum_static`

: Compute the same sum as`reduce_sum`

, but partition the input in the same way for given data set (in`reduce_sum`

this partitioning might change depending on computer load).

`grainsize`

is the one tuning parameter. For `reduce_sum`

, `grainsize`

is a suggested partial sum size. A `grainsize`

of 1 leaves the partitioning entirely up to the scheduler. This should be the default way of using `reduce_sum`

unless time is spent carefully picking `grainsize`

. For picking a `grainsize`

, see details below.

For `reduce_sum_static`

, `grainsize`

specifies the maximal partial sum size. With `reduce_sum_static`

it is more important to choose `grainsize`

carefully since it entirely determines the partitioning of work. See details below.

For efficiency and convenience additional shared arguments can be passed to every term in the sum. So for the array `{ x1, x2, ... }`

and the shared arguments `s1, s2, ...`

stan the effective sum (with individual terms) looks like:

` g(x1, s1, s2, ...) + g(x2, s1, s2, ...) + g(x3, s1, s2, ...) + ...`

which can be written equivalently with partial sums to look like:

` f({ x1, x2 }, s1, s2, ...) + f({ x3 }, s1, s2, ...)`

where the particular slicing of the `x`

array can change.

Given this, the signatures are:

```
real reduce_sum(F f, array[] T x, int grainsize, T1 s1, T2 s2, ...)
real reduce_sum_static(F f, array[] T x, int grainsize, T1 s1, T2 s2, ...)
```

`f`

- User defined function that computes partial sums`x`

- Array to slice, each element corresponds to a term in the summation`grainsize`

- Target for size of slices`s1, s2, ...`

- Arguments shared in every term

The user-defined partial sum functions have the signature:

`real f(array[] T x_slice, int start, int end, T1 s1, T2 s2, ...)`

and take the arguments:

`x_slice`

- The subset of`x`

(from`reduce_sum`

/`reduce_sum_static`

) for which this partial sum is responsible (`x_slice = x[start:end]`

)`start`

- An integer specifying the first term in the partial sum`end`

- An integer specifying the last term in the partial sum (inclusive)`s1, s2, ...`

- Arguments shared in every term (passed on without modification from the`reduce_sum`

/`reduce_sum_static`

call)

The user-provided function `f`

is expected to compute the partial sum with the terms `start`

through `end`

of the overall sum. The user function is passed the subset `x[start:end]`

as `x_slice`

. `start`

and `end`

are passed so that `f`

stan can index any of the tailing `sM`

arguments as necessary. The trailing `sM`

arguments are passed without modification to every call of `f`

.

A `reduce_sum`

(or `reduce_sum_static`

) call:

`real sum = reduce_sum(f, x, grainsize, s1, s2, ...);`

can be replaced by either:

`real sum = f(x, 1, size(x), s1, s2, ...);`

or the code:

```
real sum = 0.0;
for(i in 1:size(x)) {
sum += f({ x[i] }, i, i, s1, s2, ...); }
```

### Example: logistic regression

Logistic regression is a useful example to clarify both the syntax and semantics of reduce summation and how it can be used to speed up a typical model. A basic logistic regression can be coded in Stan as:

```
data {
int N;
array[N] int y;
vector[N] x;
}parameters {
vector[2] beta;
}model {
beta ~ std_normal();1] + beta[2] * x);
y ~ bernoulli_logit(beta[ }
```

In this model predictions are made about the `N`

outputs `y`

using the covariate `x`

. The intercept and slope of the linear equation are to be estimated. The key point to getting this calculation to use reduce summation, is recognizing that the statement:

`1] + beta[2] * x); y ~ bernoulli_logit(beta[`

can be rewritten (up to a proportionality constant) as:

```
for(n in 1:N) {
target += bernoulli_logit_lpmf(y[n] | beta[1] + beta[2] * x[n])
}
```

Now it is clear that the calculation is the sum of a number of conditionally independent Bernoulli log probability statements, which is the condition where reduce summation is useful. To use the reduce summation, a function must be written that can be used to compute arbitrary partial sums of the total sum. Using the interface defined in Reduce-Sum, such a function can be written like:

```
functions {
real partial_sum(array[] int y_slice,
int start, int end,
vector x,
vector beta) {
return bernoulli_logit_lpmf(y_slice | beta[1] + beta[2] * x[start:end]);
} }
```

The likelihood statement in the model can now be written:

`target += partial_sum(y, 1, N, x, beta); // Sum terms 1 to N of the likelihood`

In this example, `y`

was chosen to be sliced over because there is one term in the summation per value of `y`

. Technically `x`

would have worked as well. Use whatever conceptually makes the most sense for a given model, e.g. slice over independent terms like conditionally independent observations or groups of observations as in hierarchical models. Because `x`

is a shared argument, it is subset accordingly with `start:end`

. With this function, reduce summation can be used to automatically parallelize the likelihood:

```
int grainsize = 1;
target += reduce_sum(partial_sum, y,
grainsize, x, beta);
```

The reduce summation facility automatically breaks the sum into pieces and computes them in parallel. `grainsize = 1`

specifies that the `grainsize`

should be estimated automatically. The final model is:

```
functions {
real partial_sum(array[] int y_slice,
int start, int end,
vector x,
vector beta) {
return bernoulli_logit_lpmf(y_slice | beta[1] + beta[2] * x[start:end]);
}
}data {
int N;
array[N] int y;
vector[N] x;
}parameters {
vector[2] beta;
}model {
int grainsize = 1;
beta ~ std_normal();target += reduce_sum(partial_sum, y,
grainsize,
x, beta); }
```

### Picking the grainsize

The rational for choosing a sensible `grainsize`

is based on balancing the overhead implied by creating many small tasks versus creating fewer large tasks which limits the potential parallelism.

In `reduce_sum`

, `grainsize`

is a recommendation on how to partition the work in the partial sum into smaller pieces. A `grainsize`

of 1 leaves this entirely up to the internal scheduler and should be chosen if no benchmarking of other grainsizes is done. Ideally this will be efficient, but there are no guarantees.

In `reduce_sum_static`

, `grainsize`

is an upper limit on the worksize. Work will be split until all partial sums are just smaller than `grainsize`

(and the split will happen the same way every time for the same inputs). For the static version it is more important to select a sensible `grainsize`

.

In order to figure out an optimal `grainsize`

, if there are `N`

terms and `M`

cores, run a quick test model with `grainsize`

set roughly to `N / M`

. Record the time, cut the `grainsize`

in half, and run the test again. Repeat this iteratively until the model runtime begins to increase. This is a suitable `grainsize`

for the model, because this ensures the calculations can be carried out with the most parallelism without losing too much efficiency.

For instance, in a model with `N=10000`

and `M = 4`

, start with `grainsize = 2500`

, and sequentially try `grainsize = 1250`

, `grainsize = 625`

, etc.

It is important to repeat this process until performance gets worse. It is possible after many halvings nothing happens, but there might still be a smaller `grainsize`

that performs better. Even if a sum has many tens of thousands of terms, depending on the internal calculations, a `grainsize`

of thirty or forty or smaller might be the best, and it is difficult to predict this behavior. Without doing these halvings until performance actually gets worse, it is easy to miss this.

## Map-rect

Map-reduce allows large calculations (e.g., log likelihoods) to be broken into components which may be calculated modularly (e.g., data blocks) and combined (e.g., by summation and incrementing the target log density).

A *map function* is a higher-order function that applies an argument function to every member of some collection, returning a collection of the results. For example, mapping the square function, \(f(x) = x^2\), over the vector \([3, 5, 10]\) produces the vector \([9, 25, 100]\). In other words, map applies the square function elementwise.

The output of mapping a sequence is often fed into a reduction. A *reduction function* takes an arbitrarily long sequence of inputs and returns a single output. Examples of reduction functions are summation (with the return being a single value) or sorting (with the return being a sorted sequence). The combination of mapping and reducing is so common it has its own name, *map-reduce*.

### Map function

In order to generalize the form of functions and results that are possible and accommodate both parameters (which need derivatives) and data values (which don’t), Stan’s map function operates on more than just a sequence of inputs.

### Map function signature

Stan’s map function has the following signature

```
vector map_rect((vector, vector, array[] real, array[] int):vector f,
vector phi, array[] vector thetas,
data array[,] real x_rs, data array[,] int x_is);
```

The arrays `thetas`

of parameters, `x_rs`

of real data, and `x_is`

of integer data have the suffix “`s`

” to indicate they are arrays. These arrays must all be the same size, as they will be mapped in parallel by the function `f`

. The value of `phi`

is reused in each mapped operation.

The `_rect`

suffix in the name arises because the data structures it takes as arguments are rectangular. In order to deal with ragged inputs, ragged inputs must be padded out to rectangular form.

The last two arguments are two dimensional arrays of real and integer data values. These argument types are marked with the `data`

qualifier to indicate that they must only contain variables originating in the data or transformed data blocks. This will allow such data to be pinned to a processor on which it is being processed to reduce communication overhead.

The notation `(vector, vector, array[] real, array[] int):vector`

indicates that the function argument `f`

must have the following signature.

```
vector f(vector phi, vector theta,
data array[] real x_r, data array[] int x_i);
```

Although `f`

will often return a vector of size one, the built-in flexibility allows general multivariate functions to be mapped, even raggedly.

#### Map function semantics

Stan’s map function applies the function `f`

to the shared parameters along with one element each of the job parameters, real data, and integer data arrays. Each of the arguments `theta`

, `x_r`

, and `x_i`

must be arrays of the same size. If the arrays are all size `N`

, the result is defined as follows.

```
map_rect(f, phi, thetas, xs, ns)1], xs[1], ns[1]) . f(phi, thetas[2], xs[2], ns[2])
= f(phi, thetas[ . ... . f(phi, thetas[N], xs[N], ns[N])
```

The dot operators in the notation above are meant to indicate concatenation (implemented as `append_row`

in Stan). The output of each application of `f`

is a vector, and the sequence of `N`

vectors is concatenated together to return a single vector.

### Example: logistic regression

An example should help to clarify both the syntax and semantics of the mapping operation and how it may be combined with reductions built into Stan to provide a map-reduce implementation.

#### Unmapped logistic regression

Consider the following simple logistic regression model, which is coded unconventionally to accommodate direct translation to a mapped implementation.

```
data {
array[12] int y;
array[12] real x;
}parameters {
vector[2] beta;
}model {
beta ~ std_normal();1] + beta[2] * to_vector(x));
y ~ bernoulli_logit(beta[ }
```

The program is unusual in that it (a) hardcodes the data size, which is not required by the map function but is just used here for simplicity, (b) represents the predictors as a real array even though it needs to be used as a vector, and (c) represents the regression coefficients (intercept and slope) as a vector even though they’re used individually. The `bernoulli_logit`

distribution is used because the argument is on the logit scale—it implicitly applies the inverse logit function to map the argument to a probability.

#### Mapped logistic regression

The unmapped logistic regression model described in the previous subsection may be implemented using Stan’s rectangular mapping functionality as follows.

```
functions {
vector lr(vector beta, vector theta, array[] real x, array[] int y) {
real lp = bernoulli_logit_lpmf(y | beta[1]
2]);
+ to_vector(x) * beta[return [lp]';
}
}data {
array[12] int y;
array[12] real x;
}transformed data {
// K = 3 shards
array[3, 4] = { y[1:4], y[5:8], y[9:12] int ys };
array[3, 4] = { x[1:4], x[5:8], x[9:12] real xs };
array[3] vector[0] theta;
}parameters {
vector[2] beta;
}model {
beta ~ std_normal();target += sum(map_rect(lr, beta, theta, xs, ys));
}
```

The first piece of the code is the actual function to compute the logistic regression. The argument `beta`

will contain the regression coefficients (intercept and slope), as before. The second argument `theta`

of job-specific parameters is not used, but nevertheless must be present. The modeled data `y`

is passed as an array of integers and the predictors `x`

as an array of real values. The function body then computes the log probability mass of `y`

and assigns it to the local variable `lp`

. This variable is then used in `[lp]'`

to construct a row vector and then transpose it to a vector to return.

The data are taken in as before. There is an additional transformed data block that breaks the data up into three shards.^{1}

The value `3`

is also hard coded; a more practical program would allow the number of shards to be controlled. There are three parallel arrays defined here, each of size three, corresponding to the number of shards. The array `ys`

contains the modeled data variables; each element of the array `ys`

is an array of size four. The second array `xs`

is for the predictors, and each element of it is also of size four. These contained arrays are the same size because the predictors `x`

stand in a one-to-one relationship with the modeled data `y`

. The final array `theta`

is also of size three; its elements are empty vectors, because there are no shard-specific parameters.

The parameters and the prior are as before. The likelihood is now coded using map-reduce. The function `lr`

to compute the log probability mass is mapped over the data `xs`

and `ys`

, which contain the original predictors and outcomes broken into shards. The parameters `beta`

are in the first argument because they are shared across shards. There are no shard-specific parameters, so the array of job-specific parameters `theta`

contains only empty vectors.

### Example: hierarchical logistic regression

Consider a hierarchical model of American presidential voting behavior based on state of residence.^{2}

Each of the fifty states \(k \in \{1,\dotsc,50\}\) will have its own slope \(\beta_k\) and intercept \(\alpha_k\) to model the log odds of voting for the Republican candidate as a function of income. Suppose there are \(N\) voters and with voter \(n \in 1{:}N\) being in state \(s[n]\) with income \(x_n\). The likelihood for the vote \(y_n \in \{ 0, 1 \}\) is \[ y_n \sim \textsf{Bernoulli} \Big( \operatorname{logit}^{-1}\left( \alpha_{s[n]} + \beta_{s[n]} \, x_n \right) \Big). \]

The slopes and intercepts get hierarchical priors, \[\begin{align*} \alpha_k &\sim \textsf{normal}(\mu_{\alpha}, \sigma_{\alpha}) \\ \beta_k &\sim \textsf{normal}(\mu_{\beta}, \sigma_{\beta}) \end{align*}\]

#### Unmapped implementation

This model can be coded up in Stan directly as follows.

```
data {
int<lower=0> K;
int<lower=0> N;
array[N] int<lower=1, upper=K> kk;
vector[N] x;
array[N] int<lower=0, upper=1> y;
}parameters {
matrix[K, 2] beta;
vector[2] mu;
vector<lower=0>[2] sigma;
}model {
0, 2);
mu ~ normal(0, 2);
sigma ~ normal(for (i in 1:2) {
beta[ , i] ~ normal(mu[i], sigma[i]);
}1] + beta[kk, 2] .* x);
y ~ bernoulli_logit(beta[kk, }
```

For this model the vector of predictors `x`

is coded as a vector, corresponding to how it is used in the likelihood. The priors for `mu`

and `sigma`

are vectorized. The priors on the two components of `beta`

(intercept and slope, respectively) are stored in a \(K \times 2\) matrix.

The likelihood is also vectorized using multi-indexing with index `kk`

for the states and elementwise multiplication (`.*`

) for the income `x`

. The vectorized likelihood works out to the same thing as the following less efficient looped form.

```
for (n in 1:N) {
1] + beta[kk[n], 2] * x[n]);
y[n] ~ bernoulli_logit(beta[kk[n], }
```

#### Mapped implementation

The mapped version of the model will map over the states `K`

. This means the group-level parameters, real data, and integer-data must be arrays of the same size.

The mapped implementation requires a function to be mapped. The following function evaluates both the likelihood for the data observed for a group as well as the prior for the group-specific parameters (the name `bl_glm`

derives from the fact that it’s a generalized linear model with a Bernoulli likelihood and logistic link function).

```
functions {
vector bl_glm(vector mu_sigma, vector beta,
array[] real x, array[] int y) {
vector[2] mu = mu_sigma[1:2];
vector[2] sigma = mu_sigma[3:4];
real lp = normal_lpdf(beta | mu, sigma);
real ll = bernoulli_logit_lpmf(y | beta[1] + beta[2] * to_vector(x));
return [lp + ll]';
} }
```

The shared parameter `mu_sigma`

contains the locations (`mu_sigma[1:2]`

) and scales (`mu_sigma[3:4]`

) of the priors, which are extracted in the first two lines of the program. The variable `lp`

is assigned the log density of the prior on `beta`

. The vector `beta`

is of size two, as are the vectors `mu`

and `sigma`

, so everything lines up for the vectorization. Next, the variable `ll`

is assigned to the log likelihood contribution for the group. Here `beta[1]`

is the intercept of the regression and `beta[2]`

the slope. The predictor array `x`

needs to be converted to a vector allow the multiplication.

The data block is identical to that of the previous program, but repeated here for convenience. A transformed data block computes the data structures needed for the mapping by organizing the data into arrays indexed by group.

```
data {
int<lower=0> K;
int<lower=0> N;
array[N] int<lower=1, upper=K> kk;
vector[N] x;
array[N] int<lower=0, upper=1> y;
}transformed data {
int<lower=0> J = N / K;
array[K, J] real x_r;
array[K, J] int<lower=0, upper=1> x_i;
{int pos = 1;
for (k in 1:K) {
int end = pos + J - 1;
x_r[k] = to_array_1d(x[pos:end]);
x_i[k] = to_array_1d(y[pos:end]);
pos += J;
}
} }
```

The integer `J`

is set to the number of observations per group.^{3}

The real data array `x_r`

holds the predictors and the integer data array `x_i`

holds the outcomes. The grouped data arrays are constructed by slicing the predictor vector `x`

(and converting it to an array) and slicing the outcome array `y`

.

Given the transformed data with groupings, the parameters are the same as the previous program. The model has the same priors for the hyperparameters `mu`

and `sigma`

, but moves the prior for `beta`

and the likelihood to the mapped function.

```
parameters {
array[K] vector[2] beta;
vector[2] mu;
vector<lower=0>[2] sigma;
}model {
0, 2);
mu ~ normal(0, 2);
sigma ~ normal(target += sum(map_rect(bl_glm, append_row(mu, sigma), beta, x_r, x_i));
}
```

The model as written here computes the priors for each group’s parameters along with the likelihood contribution for the group. An alternative mapping would leave the prior in the model block and only map the likelihood. In a serial setting this shouldn’t make much of a difference, but with parallelization, there is reduced communication (the prior’s parameters need not be transmitted) and also reduced parallelization with the version that leaves the prior in the model block.

### Ragged inputs and outputs

The previous examples included rectangular data structures and single outputs. Despite the name, this is not technically required by `map_rect`

.

#### Ragged inputs

If each group has a different number of observations, then the rectangular data structures for predictors and outcomes will need to be padded out to be rectangular. In addition, the size of the ragged structure will need to be passed as integer data. This holds for shards with varying numbers of parameters as well as varying numbers of data points.

#### Ragged outputs

The output of each mapped function is concatenated in order of inputs to produce the output of `map_rect`

. When every shard returns a singleton (size one) array, the result is the same size as the number of shards and is easy to deal with downstream. If functions return longer arrays, they can still be structured using the `to_matrix`

function if they are rectangular.

If the outputs are of varying sizes, then there will have to be some way to convert it back to a usable form based on the input, because there is no way to directly return sizes or a ragged structure.

## OpenCL

OpenCL (Open Computing Language) is a framework that enables writing programs that execute across heterogeneous platforms. An OpenCL program can be run on CPUs and GPUs. In order to run OpenCL programs, an OpenCL runtime be installed on the target system.

Stan’s OpenCL backend is currently supported in CmdStan and its wrappers. In order to use it, the model must be compiled with the `STAN_OPENCL`

makefile flag. Setting this flag means that the Stan-to-C++ translator (`stanc3`

) will be supplied the `--use-opencl`

flag and that the OpenCL enabled backend (Stan Math functions) will be enabled.

In Stan, the following distributions can be automatically run in parallel on both CPUs and GPUs with OpenCL:

- bernoulli_lpmf
- bernoulli_logit_lpmf
- bernoulli_logit_glm_lpmf*
- beta_lpdf
- beta_proportion_lpdf
- binomial_lpmf
- categorical_logit_glm_lpmf*
- cauchy_lpdf
- chi_square_lpdf
- double_exponential_lpdf
- exp_mod_normal_lpdf
- exponential_lpdf
- frechet_lpdf
- gamma_lpdf
- gumbel_lpdf
- inv_chi_square_lpdf
- inv_gamma_lpdf
- logistic_lpdf
- lognormal_lpdf
- neg_binomial_lpmf
- neg_binomial_2_lpmf
- neg_binomial_2_log_lpmf
- neg_binomial_2_log_glm_lpmf*
- normal_lpdf
- normal_id_glm_lpdf*
- ordered_logistic_glm_lpmf*
- pareto_lpdf
- pareto_type_2_lpdf
- poisson_lpmf
- poisson_log_lpmf
- poisson_log_glm_lpmf*
- rayleigh_lpdf
- scaled_inv_chi_square_lpdf
- skew_normal_lpdf
- std_normal_lpdf
- student_t_lpdf
- uniform_lpdf
- weibull_lpdf

* OpenCL is not used when the covariate argument to the GLM functions is a `row_vector`

.

## References

*Data Analysis Using Regression and Multilevel-Hierarchical Models*. Cambridge, United Kingdom: Cambridge University Press.

## Footnotes

The term “shard” is borrowed from databases, where it refers to a slice of the rows of a database. That is exactly what it is here if we think of rows of a dataframe. Stan’s shards are more general in that they need not correspond to rows of a dataframe.↩︎

This example is a simplified form of the model described in (Gelman and Hill 2007, sec. 14.2)↩︎

This makes the strong assumption that each group has the same number of observations!↩︎