Automatic Differentiation
 
Loading...
Searching...
No Matches
stan::math::mpi_parallel_call< call_id, ReduceF, CombineF > Class Template Reference

Detailed Description

template<int call_id, typename ReduceF, typename CombineF>
class stan::math::mpi_parallel_call< call_id, ReduceF, CombineF >

The MPI parallel call class manages the distributed evaluation of a collection of tasks following the map - reduce - combine pattern.

The class organizes the distribution of all job related information from the root node to all worker nodes. The class discriminates between parameters and static data. The static data is only transmitted a single time and cached on each worker locally after the inital transfer.

The flow of commands are:

  1. The constructor of this class must be called on the root node where all parameters and static data is passed to the class.
  2. The constructor then tries to allocate the MPI cluster resource to obtain control over all workers in the cluster. If the cluster is locked already, then an exception is fired.
  3. The worker nodes are instructed to run the static distributed_apply method of this class. This static method then instantiates a mpi_parallel_call instance on the workers.
  4. The root then broadcasts and scatters all necessary data to the cluster. Static data (including meta information on data shapes) are locally cached such that static data is only transferred on the first evaluation. Note that the work is equally distributed among the workers. That is N jobs are distributed ot a cluster of size W in N/W chunks (the remainder is allocated to node 1 onwards which ensures that the root node 0 has one job less).
  5. Once the parameters and static data is distributed, the reduce operation is applied per defined job. Each job is allowed to return a different number of outputs such that the resulting data structure is a ragged array. The ragged array structure becomes known to mpi_parallel_call during the first evaluation and must not change for future calls.
  6. Finally the local results are gathered on the root node over MPI and given on the root node to the combine functor along with the ragged array data structure.

The MPI cluster resource is acquired with construction of mpi_parallel_call and is freed once the mpi_parallel_call goes out of scope (that is, deconstructed).

Note 1: During MPI operation everything must run synchronous. That is, if a job fails on any of the workers then the execution must still continue on all other workers. In order to maintain a synchronized state, even the worker with a failed job must return a valid output chunk since the gather commands issued on the root to collect results would otherwise fail. Thus, if a job fails on any worker the a respective status flag will be transferred after the reduce and the results gather step.

Note 2: During the first evaluation of the function the ragged array sizes need to be collected on the root from all workers. This is needed on the root such that the root knows how many outputs are computed on each worker. This information is then cached for all subsequent evaluations. However, caching this information can occur if and only if the evaluation of all functions was successfull on all workers during the first run. Thus, the first evaluation is handled with special care to ensure that caching of this meta info is only done when all workers have successfully evaluated the function and otherwise an exception is raised.

Template Parameters
call_idlabel for the static data
ReduceFreduce function called for each job,
See also
internal::map_rect_reduce
Template Parameters
CombineFcombine function called on the combined results on each job along with the ragged data structure information,
See also
internal::map_rect_combine

Definition at line 161 of file mpi_parallel_call.hpp.

#include <mpi_parallel_call.hpp>

Public Member Functions

template<typename T_shared_param , typename T_job_param >
 mpi_parallel_call (const T_shared_param &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 > > &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i)
 Initiates a parallel MPI call on the root.
 
 mpi_parallel_call ()
 
result_t reduce_combine ()
 Once all data is distributed and cached the reduce_combine evaluates all assigned function evaluations locally, transfers all results back to the root and finally combines on the root all results.
 

Static Public Member Functions

static void distributed_apply ()
 Entry point on the workers for the mpi_parallel_call.
 

Private Types

using result_t = typename CombineF::result_t
 
using cache_x_r = internal::mpi_parallel_call_cache< call_id, 1, std::vector< std::vector< double > > >
 
using cache_x_i = internal::mpi_parallel_call_cache< call_id, 2, std::vector< std::vector< int > > >
 
using cache_f_out = internal::mpi_parallel_call_cache< call_id, 3, std::vector< int > >
 
using cache_chunks = internal::mpi_parallel_call_cache< call_id, 4, std::vector< int > >
 

Private Member Functions

template<typename T_cache >
T_cache::cache_t & scatter_array_2d_cached (typename T_cache::cache_t &data)
 Performs a cached scatter of a 2D array (nested std::vector).
 
template<typename T_cache >
T_cache::cache_t & broadcast_array_1d_cached (typename T_cache::cache_t &data)
 Performs a cached broadcast of a 1D array (std::vector).
 
template<int meta_cache_id>
vector_d broadcast_vector (const vector_d &data)
 Broadcasts an Eigen vector to the cluster.
 
template<int meta_cache_id>
matrix_d scatter_matrix (const matrix_d &data)
 Scatters an Eigen matrix column wise over the cluster.
 
void setup_call (const vector_d &shared_params, const matrix_d &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i)
 

Private Attributes

boost::mpi::communicator world_
 
const std::size_t rank_ = world_.rank()
 
const std::size_t world_size_ = world_.size()
 
std::unique_lock< std::mutex > cluster_lock_
 
CombineF combine_
 
vector_d local_shared_params_dbl_
 
matrix_d local_job_params_dbl_
 

Static Private Attributes

static int num_outputs_per_job_ = -1
 

The documentation for this class was generated from the following file: