3#ifndef STAN_MATH_PRIM_FUNCTOR_MPI_PARALLEL_CALL_HPP
4#define STAN_MATH_PRIM_FUNCTOR_MPI_PARALLEL_CALL_HPP
40template <
int call_
id,
int member,
typename T>
68 throw std::runtime_error(
"Cache can only store a single data item.");
80 throw std::runtime_error(
"Cache not yet valid.");
85template <
int call_
id,
int member,
typename T>
88template <
int call_
id,
int member,
typename T>
160template <
int call_
id,
typename ReduceF,
typename CombineF>
172 std::vector<std::vector<double>>>;
175 std::vector<std::vector<int>>>;
203 template <
typename T_shared_param,
typename T_job_param>
205 const T_shared_param& shared_params,
206 const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>&
208 const std::vector<std::vector<double>>& x_r,
209 const std::vector<std::vector<int>>& x_i)
210 :
combine_(shared_params, job_params) {
212 throw std::runtime_error(
213 "problem sizes may only be defined on the root.");
216 "continuous data", x_r);
218 "integer data", x_i);
223 cached_num_jobs,
"number of jobs", job_params.size());
230 const std::vector<int> job_dims =
dims(job_params);
233 const size_type num_job_params = num_jobs == 0 ? 0 : job_dims[1];
236 matrix_d job_params_dbl(num_job_params, num_jobs);
238 for (
int j = 0; j < num_jobs; ++j)
239 job_params_dbl.col(j) =
value_of(job_params[j]);
241 setup_call(shared_params_dbl, job_params_dbl, x_r, x_i);
247 throw std::runtime_error(
"problem sizes must be defined on the root.");
250 std::vector<std::vector<int>>());
271 const int num_jobs =
sum(job_chunks);
274 = std::accumulate(job_chunks.begin(), job_chunks.begin() +
rank_, 0);
278 local_outputs_per_job == -1 ? 0 : local_outputs_per_job,
280 std::vector<int> local_f_out(num_local_jobs, -1);
288 const int num_outputs
289 = std::accumulate(f_out.begin() + first_job,
290 f_out.begin() + first_job + num_local_jobs, 0);
291 local_output.resize(Eigen::NoChange, num_outputs);
296 for (
int i = 0, offset = 0; i < num_local_jobs;
297 offset += local_f_out[i], ++i) {
300 local_x_r[i], local_x_i[i], 0);
301 local_f_out[i] = job_output.cols();
303 if (local_outputs_per_job == -1) {
304 local_outputs_per_job = job_output.rows();
306 local_output.conservativeResize(local_outputs_per_job,
310 if (local_output.cols() < offset + local_f_out[i])
312 local_output.conservativeResize(Eigen::NoChange,
313 2 * (offset + local_f_out[i]));
315 local_output.block(0, offset, local_output.rows(), local_f_out[i])
318 }
catch (
const std::exception&
e) {
331 int cluster_status = 0;
332 boost::mpi::reduce(
world_, local_ok, cluster_status, std::plus<int>(), 0);
333 bool all_ok = cluster_status ==
static_cast<int>(
world_size_);
334 boost::mpi::broadcast(
world_, all_ok, 0);
338 throw std::domain_error(
"MPI error on first evaluation.");
347 boost::mpi::broadcast(
world_, local_outputs_per_job, 0);
353 std::vector<int> world_f_out(num_jobs, 0);
354 boost::mpi::gatherv(
world_, local_f_out.data(), num_local_jobs,
355 world_f_out.data(), job_chunks, 0);
359 std::copy(local_f_out.begin(), local_f_out.end(),
360 world_f_out.begin() + first_job);
369 for (
int i = 0; i < num_local_jobs; ++i) {
370 if (world_f_out[first_job + i] != local_f_out[i]) {
378 const std::size_t size_world_f_out =
sum(world_f_out);
382 for (std::size_t i = 0, k = 0; i !=
world_size_; ++i)
383 for (
int j = 0; j != job_chunks[i]; ++j, ++k)
387 boost::mpi::gatherv(
world_, local_output.data(), chunks_result[
rank_],
388 world_result.data(), chunks_result, 0);
391 int cluster_status = 0;
392 boost::mpi::reduce(
world_, local_ok, cluster_status, std::plus<int>(), 0);
393 bool all_ok = cluster_status ==
static_cast<int>(
world_size_);
401 throw std::domain_error(
"Error during MPI evaluation.");
403 return combine_(world_result, world_f_out);
422 template <
typename T_cache>
424 typename T_cache::cache_t& data) {
426 if (T_cache::is_valid()) {
427 return T_cache::data();
432 std::vector<int> data_dims =
dims(data);
435 boost::mpi::broadcast(
world_, data_dims.data(), 2, 0);
437 const std::vector<int> job_chunks =
mpi_map_chunks(data_dims[0], 1);
438 const std::vector<int> data_chunks
442 decltype(flat_data) local_flat_data(data_chunks[
rank_]);
444 if (data_dims[0] * data_dims[1] > 0) {
446 boost::mpi::scatterv(
world_, flat_data.data(), data_chunks,
447 local_flat_data.data(), 0);
449 boost::mpi::scatterv(
world_, local_flat_data.data(), data_chunks[
rank_],
454 std::vector<
decltype(flat_data)> local_data;
455 auto local_iter = local_flat_data.begin();
456 for (
int i = 0; i != job_chunks[
rank_]; ++i) {
457 typename T_cache::cache_t::value_type
const data_elem(
458 local_iter, local_iter + data_dims[1]);
459 local_data.push_back(data_elem);
460 local_iter += data_dims[1];
464 T_cache::store(local_data);
465 return T_cache::data();
482 template <
typename T_cache>
484 typename T_cache::cache_t& data) {
485 if (T_cache::is_valid()) {
486 return T_cache::data();
489 std::size_t data_size = data.size();
490 boost::mpi::broadcast(
world_, data_size, 0);
492 std::vector<typename T_cache::cache_t::value_type> local_data = data;
493 local_data.resize(data_size);
495 boost::mpi::broadcast(
world_, local_data.data(), data_size, 0);
496 T_cache::store(local_data);
497 return T_cache::data();
512 template <
int meta_cache_
id>
516 std::vector<size_type>>;
517 const std::vector<size_type>& data_size
518 = broadcast_array_1d_cached<meta_cache>({data.size()});
521 local_data.resize(data_size[0]);
523 boost::mpi::broadcast(
world_, local_data.data(), data_size[0], 0);
540 template <
int meta_cache_
id>
544 std::vector<size_type>>;
545 const std::vector<size_type>&
dims
546 = broadcast_array_1d_cached<meta_cache>({data.rows(), data.cols()});
550 const std::vector<int> job_chunks =
mpi_map_chunks(total_cols, 1);
553 if (
rows * total_cols > 0) {
555 boost::mpi::scatterv(
world_, data.data(), data_chunks,
556 local_data.data(), 0);
558 boost::mpi::scatterv(
world_, local_data.data(), data_chunks[
rank_], 0);
566 const std::vector<std::vector<double>>& x_r,
567 const std::vector<std::vector<int>>& x_i) {
568 std::vector<int> job_chunks =
mpi_map_chunks(job_params.cols(), 1);
569 broadcast_array_1d_cached<cache_chunks>(job_chunks);
575 scatter_array_2d_cached<cache_x_r>(x_r);
576 scatter_array_2d_cached<cache_x_i>(x_i);
580template <
int call_
id,
typename ReduceF,
typename CombineF>
mpi_parallel_call_cache()=delete
mpi_parallel_call_cache & operator=(const mpi_parallel_call_cache< call_id, member, T > &)=delete
mpi_parallel_call_cache(const mpi_parallel_call_cache< call_id, member, T > &)=delete
static void store(const T &data)
Store data to be cached locally.
static bool is_valid()
Query if cache is in valid which it is once data has been stored.
static cache_t & data()
Obtain const reference to locally cached data if cache is valid (throws otherwise).
Container for locally cached data which is essentially implemented as singleton.
typename CombineF::result_t result_t
vector_d broadcast_vector(const vector_d &data)
Broadcasts an Eigen vector to the cluster.
const std::size_t world_size_
matrix_d scatter_matrix(const matrix_d &data)
Scatters an Eigen matrix column wise over the cluster.
mpi_parallel_call(const T_shared_param &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 > > &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i)
Initiates a parallel MPI call on the root.
result_t reduce_combine()
Once all data is distributed and cached the reduce_combine evaluates all assigned function evaluation...
void setup_call(const vector_d &shared_params, const matrix_d &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i)
matrix_d local_job_params_dbl_
T_cache::cache_t & broadcast_array_1d_cached(typename T_cache::cache_t &data)
Performs a cached broadcast of a 1D array (std::vector).
static int num_outputs_per_job_
static void distributed_apply()
Entry point on the workers for the mpi_parallel_call.
vector_d local_shared_params_dbl_
boost::mpi::communicator world_
std::unique_lock< std::mutex > cluster_lock_
T_cache::cache_t & scatter_array_2d_cached(typename T_cache::cache_t &data)
Performs a cached scatter of a 2D array (nested std::vector).
The MPI parallel call class manages the distributed evaluation of a collection of tasks following the...
int64_t rows(const T_x &x)
Returns the number of rows in the specified kernel generator expression.
auto to_array_1d(T_x &&x)
Returns input matrix reshaped into a vector.
void dims(const T_x &x, std::vector< int > &result)
matrix_cl overload of the dims helper function in prim/fun/dims.hpp.
static constexpr double e()
Return the base of the natural logarithm.
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Type for (column) vector of double values.
T value_of(const fvar< T > &v)
Return the value of the specified variable.
void check_matching_sizes(const char *function, const char *name1, const T_y1 &y1, const char *name2, const T_y2 &y2)
Check if two structures at the same size.
std::vector< int > mpi_map_chunks(std::size_t num_jobs, std::size_t chunk_size=1)
Maps jobs of given chunk size to workers and returning a vector of counts.
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic >::Index size_type
Type for sizes and indexes in an Eigen matrix with double elements.
auto sum(const std::vector< T > &m)
Return the sum of the entries of the specified standard vector.
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Type for matrix of double values.
std::unique_lock< std::mutex > mpi_broadcast_command()
Broadcasts default constructible commands to the cluster.
void check_size_match(const char *function, const char *name_i, T_size1 i, const char *name_j, T_size2 j)
Check if the provided sizes match.
The lgamma implementation in stan-math is based on either the reentrant safe lgamma_r implementation ...
MPI command template which calls the static method distributed_apply of the given class F.