Stan Math Library
4.9.0
Automatic Differentiation
|
The MPI parallel call class manages the distributed evaluation of a collection of tasks following the map - reduce - combine pattern.
The class organizes the distribution of all job related information from the root node to all worker nodes. The class discriminates between parameters and static data. The static data is only transmitted a single time and cached on each worker locally after the inital transfer.
The flow of commands are:
The MPI cluster resource is acquired with construction of mpi_parallel_call and is freed once the mpi_parallel_call goes out of scope (that is, deconstructed).
Note 1: During MPI operation everything must run synchronous. That is, if a job fails on any of the workers then the execution must still continue on all other workers. In order to maintain a synchronized state, even the worker with a failed job must return a valid output chunk since the gather commands issued on the root to collect results would otherwise fail. Thus, if a job fails on any worker the a respective status flag will be transferred after the reduce and the results gather step.
Note 2: During the first evaluation of the function the ragged array sizes need to be collected on the root from all workers. This is needed on the root such that the root knows how many outputs are computed on each worker. This information is then cached for all subsequent evaluations. However, caching this information can occur if and only if the evaluation of all functions was successfull on all workers during the first run. Thus, the first evaluation is handled with special care to ensure that caching of this meta info is only done when all workers have successfully evaluated the function and otherwise an exception is raised.
call_id | label for the static data |
ReduceF | reduce function called for each job, |
CombineF | combine function called on the combined results on each job along with the ragged data structure information, |
Definition at line 161 of file mpi_parallel_call.hpp.
#include <mpi_parallel_call.hpp>
Public Member Functions | |
template<typename T_shared_param , typename T_job_param > | |
mpi_parallel_call (const T_shared_param &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 > > &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i) | |
Initiates a parallel MPI call on the root. | |
mpi_parallel_call () | |
result_t | reduce_combine () |
Once all data is distributed and cached the reduce_combine evaluates all assigned function evaluations locally, transfers all results back to the root and finally combines on the root all results. | |
Static Public Member Functions | |
static void | distributed_apply () |
Entry point on the workers for the mpi_parallel_call. | |
Private Types | |
using | result_t = typename CombineF::result_t |
using | cache_x_r = internal::mpi_parallel_call_cache< call_id, 1, std::vector< std::vector< double > > > |
using | cache_x_i = internal::mpi_parallel_call_cache< call_id, 2, std::vector< std::vector< int > > > |
using | cache_f_out = internal::mpi_parallel_call_cache< call_id, 3, std::vector< int > > |
using | cache_chunks = internal::mpi_parallel_call_cache< call_id, 4, std::vector< int > > |
Private Member Functions | |
template<typename T_cache > | |
T_cache::cache_t & | scatter_array_2d_cached (typename T_cache::cache_t &data) |
Performs a cached scatter of a 2D array (nested std::vector). | |
template<typename T_cache > | |
T_cache::cache_t & | broadcast_array_1d_cached (typename T_cache::cache_t &data) |
Performs a cached broadcast of a 1D array (std::vector). | |
template<int meta_cache_id> | |
vector_d | broadcast_vector (const vector_d &data) |
Broadcasts an Eigen vector to the cluster. | |
template<int meta_cache_id> | |
matrix_d | scatter_matrix (const matrix_d &data) |
Scatters an Eigen matrix column wise over the cluster. | |
void | setup_call (const vector_d &shared_params, const matrix_d &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i) |
Private Attributes | |
boost::mpi::communicator | world_ |
const std::size_t | rank_ = world_.rank() |
const std::size_t | world_size_ = world_.size() |
std::unique_lock< std::mutex > | cluster_lock_ |
CombineF | combine_ |
vector_d | local_shared_params_dbl_ |
matrix_d | local_job_params_dbl_ |
Static Private Attributes | |
static int | num_outputs_per_job_ = -1 |