3#ifndef STAN_MATH_PRIM_FUNCTOR_MPI_PARALLEL_CALL_HPP 
    4#define STAN_MATH_PRIM_FUNCTOR_MPI_PARALLEL_CALL_HPP 
   40template <
int call_
id, 
int member, 
typename T>
 
   68      throw std::runtime_error(
"Cache can only store a single data item.");
 
   80      throw std::runtime_error(
"Cache not yet valid.");
 
   85template <
int call_
id, 
int member, 
typename T>
 
   88template <
int call_
id, 
int member, 
typename T>
 
  160template <
int call_
id, 
typename ReduceF, 
typename CombineF>
 
  172                                          std::vector<std::vector<double>>>;
 
  175                                          std::vector<std::vector<int>>>;
 
  203  template <
typename T_shared_param, 
typename T_job_param>
 
  205      const T_shared_param& shared_params,
 
  206      const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>&
 
  208      const std::vector<std::vector<double>>& x_r,
 
  209      const std::vector<std::vector<int>>& x_i)
 
  210      : 
combine_(shared_params, job_params) {
 
  212      throw std::runtime_error(
 
  213          "problem sizes may only be defined on the root.");
 
  216                         "continuous data", x_r);
 
  218                         "integer data", x_i);
 
  223                       cached_num_jobs, 
"number of jobs", job_params.size());
 
  230    const std::vector<int> job_dims = 
dims(job_params);
 
  233    const size_type num_job_params = num_jobs == 0 ? 0 : job_dims[1];
 
  236    matrix_d job_params_dbl(num_job_params, num_jobs);
 
  238    for (
int j = 0; j < num_jobs; ++j)
 
  239      job_params_dbl.col(j) = 
value_of(job_params[j]);
 
  241    setup_call(shared_params_dbl, job_params_dbl, x_r, x_i);
 
  247      throw std::runtime_error(
"problem sizes must be defined on the root.");
 
  250               std::vector<std::vector<int>>());
 
  271    const int num_jobs = 
sum(job_chunks);
 
  274        = std::accumulate(job_chunks.begin(), job_chunks.begin() + 
rank_, 0);
 
  278        local_outputs_per_job == -1 ? 0 : local_outputs_per_job,
 
  280    std::vector<int> local_f_out(num_local_jobs, -1);
 
  288      const int num_outputs
 
  289          = std::accumulate(f_out.begin() + first_job,
 
  290                            f_out.begin() + first_job + num_local_jobs, 0);
 
  291      local_output.resize(Eigen::NoChange, num_outputs);
 
  296      for (
int i = 0, offset = 0; i < num_local_jobs;
 
  297           offset += local_f_out[i], ++i) {
 
  300                        local_x_r[i], local_x_i[i], 0);
 
  301        local_f_out[i] = job_output.cols();
 
  303        if (local_outputs_per_job == -1) {
 
  304          local_outputs_per_job = job_output.rows();
 
  306          local_output.conservativeResize(local_outputs_per_job,
 
  310        if (local_output.cols() < offset + local_f_out[i])
 
  312          local_output.conservativeResize(Eigen::NoChange,
 
  313                                          2 * (offset + local_f_out[i]));
 
  315        local_output.block(0, offset, local_output.rows(), local_f_out[i])
 
  318    } 
catch (
const std::exception& 
e) {
 
  331      int cluster_status = 0;
 
  332      boost::mpi::reduce(
world_, local_ok, cluster_status, std::plus<int>(), 0);
 
  333      bool all_ok = cluster_status == 
static_cast<int>(
world_size_);
 
  334      boost::mpi::broadcast(
world_, all_ok, 0);
 
  338          throw std::domain_error(
"MPI error on first evaluation.");
 
  347      boost::mpi::broadcast(
world_, local_outputs_per_job, 0);
 
  353        std::vector<int> world_f_out(num_jobs, 0);
 
  354        boost::mpi::gatherv(
world_, local_f_out.data(), num_local_jobs,
 
  355                            world_f_out.data(), job_chunks, 0);
 
  359        std::copy(local_f_out.begin(), local_f_out.end(),
 
  360                  world_f_out.begin() + first_job);
 
  369    for (
int i = 0; i < num_local_jobs; ++i) {
 
  370      if (world_f_out[first_job + i] != local_f_out[i]) {
 
  378    const std::size_t size_world_f_out = 
sum(world_f_out);
 
  382    for (std::size_t i = 0, k = 0; i != 
world_size_; ++i)
 
  383      for (
int j = 0; j != job_chunks[i]; ++j, ++k)
 
  387    boost::mpi::gatherv(
world_, local_output.data(), chunks_result[
rank_],
 
  388                        world_result.data(), chunks_result, 0);
 
  391    int cluster_status = 0;
 
  392    boost::mpi::reduce(
world_, local_ok, cluster_status, std::plus<int>(), 0);
 
  393    bool all_ok = cluster_status == 
static_cast<int>(
world_size_);
 
  401      throw std::domain_error(
"Error during MPI evaluation.");
 
  403    return combine_(world_result, world_f_out);
 
  422  template <
typename T_cache>
 
  424      typename T_cache::cache_t& data) {
 
  426    if (T_cache::is_valid()) {
 
  427      return T_cache::data();
 
  432    std::vector<int> data_dims = 
dims(data);
 
  435    boost::mpi::broadcast(
world_, data_dims.data(), 2, 0);
 
  437    const std::vector<int> job_chunks = 
mpi_map_chunks(data_dims[0], 1);
 
  438    const std::vector<int> data_chunks
 
  442    decltype(flat_data) local_flat_data(data_chunks[
rank_]);
 
  444    if (data_dims[0] * data_dims[1] > 0) {
 
  446        boost::mpi::scatterv(
world_, flat_data.data(), data_chunks,
 
  447                             local_flat_data.data(), 0);
 
  449        boost::mpi::scatterv(
world_, local_flat_data.data(), data_chunks[
rank_],
 
  454    std::vector<
decltype(flat_data)> local_data;
 
  455    auto local_iter = local_flat_data.begin();
 
  456    for (
int i = 0; i != job_chunks[
rank_]; ++i) {
 
  457      typename T_cache::cache_t::value_type 
const data_elem(
 
  458          local_iter, local_iter + data_dims[1]);
 
  459      local_data.push_back(data_elem);
 
  460      local_iter += data_dims[1];
 
  464    T_cache::store(local_data);
 
  465    return T_cache::data();
 
  482  template <
typename T_cache>
 
  484      typename T_cache::cache_t& data) {
 
  485    if (T_cache::is_valid()) {
 
  486      return T_cache::data();
 
  489    std::size_t data_size = data.size();
 
  490    boost::mpi::broadcast(
world_, data_size, 0);
 
  492    std::vector<typename T_cache::cache_t::value_type> local_data = data;
 
  493    local_data.resize(data_size);
 
  495    boost::mpi::broadcast(
world_, local_data.data(), data_size, 0);
 
  496    T_cache::store(local_data);
 
  497    return T_cache::data();
 
  512  template <
int meta_cache_
id>
 
  516                                            std::vector<size_type>>;
 
  517    const std::vector<size_type>& data_size
 
  518        = broadcast_array_1d_cached<meta_cache>({data.size()});
 
  521    local_data.resize(data_size[0]);
 
  523    boost::mpi::broadcast(
world_, local_data.data(), data_size[0], 0);
 
  540  template <
int meta_cache_
id>
 
  544                                            std::vector<size_type>>;
 
  545    const std::vector<size_type>& 
dims 
  546        = broadcast_array_1d_cached<meta_cache>({data.rows(), data.cols()});
 
  550    const std::vector<int> job_chunks = 
mpi_map_chunks(total_cols, 1);
 
  553    if (
rows * total_cols > 0) {
 
  555        boost::mpi::scatterv(
world_, data.data(), data_chunks,
 
  556                             local_data.data(), 0);
 
  558        boost::mpi::scatterv(
world_, local_data.data(), data_chunks[
rank_], 0);
 
  566                  const std::vector<std::vector<double>>& x_r,
 
  567                  const std::vector<std::vector<int>>& x_i) {
 
  568    std::vector<int> job_chunks = 
mpi_map_chunks(job_params.cols(), 1);
 
  569    broadcast_array_1d_cached<cache_chunks>(job_chunks);
 
  575    scatter_array_2d_cached<cache_x_r>(x_r);
 
  576    scatter_array_2d_cached<cache_x_i>(x_i);
 
  580template <
int call_
id, 
typename ReduceF, 
typename CombineF>
 
mpi_parallel_call_cache()=delete
 
mpi_parallel_call_cache & operator=(const mpi_parallel_call_cache< call_id, member, T > &)=delete
 
mpi_parallel_call_cache(const mpi_parallel_call_cache< call_id, member, T > &)=delete
 
static void store(const T &data)
Store data to be cached locally.
 
static bool is_valid()
Query if cache is in valid which it is once data has been stored.
 
static cache_t & data()
Obtain const reference to locally cached data if cache is valid (throws otherwise).
 
Container for locally cached data which is essentially implemented as singleton.
 
typename CombineF::result_t result_t
 
vector_d broadcast_vector(const vector_d &data)
Broadcasts an Eigen vector to the cluster.
 
const std::size_t world_size_
 
matrix_d scatter_matrix(const matrix_d &data)
Scatters an Eigen matrix column wise over the cluster.
 
mpi_parallel_call(const T_shared_param &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 > > &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i)
Initiates a parallel MPI call on the root.
 
result_t reduce_combine()
Once all data is distributed and cached the reduce_combine evaluates all assigned function evaluation...
 
void setup_call(const vector_d &shared_params, const matrix_d &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i)
 
matrix_d local_job_params_dbl_
 
T_cache::cache_t & broadcast_array_1d_cached(typename T_cache::cache_t &data)
Performs a cached broadcast of a 1D array (std::vector).
 
static int num_outputs_per_job_
 
static void distributed_apply()
Entry point on the workers for the mpi_parallel_call.
 
vector_d local_shared_params_dbl_
 
boost::mpi::communicator world_
 
std::unique_lock< std::mutex > cluster_lock_
 
T_cache::cache_t & scatter_array_2d_cached(typename T_cache::cache_t &data)
Performs a cached scatter of a 2D array (nested std::vector).
 
The MPI parallel call class manages the distributed evaluation of a collection of tasks following the...
 
int64_t rows(const T_x &x)
Returns the number of rows in the specified kernel generator expression.
 
auto to_array_1d(T_x &&x)
Returns input matrix reshaped into a vector.
 
void dims(const T_x &x, std::vector< int > &result)
matrix_cl overload of the dims helper function in prim/fun/dims.hpp.
 
static constexpr double e()
Return the base of the natural logarithm.
 
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Type for (column) vector of double values.
 
T value_of(const fvar< T > &v)
Return the value of the specified variable.
 
void check_matching_sizes(const char *function, const char *name1, const T_y1 &y1, const char *name2, const T_y2 &y2)
Check if two structures at the same size.
 
std::vector< int > mpi_map_chunks(std::size_t num_jobs, std::size_t chunk_size=1)
Maps jobs of given chunk size to workers and returning a vector of counts.
 
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic >::Index size_type
Type for sizes and indexes in an Eigen matrix with double elements.
 
auto sum(const std::vector< T > &m)
Return the sum of the entries of the specified standard vector.
 
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Type for matrix of double values.
 
std::unique_lock< std::mutex > mpi_broadcast_command()
Broadcasts default constructible commands to the cluster.
 
void check_size_match(const char *function, const char *name_i, T_size1 i, const char *name_j, T_size2 j)
Check if the provided sizes match.
 
The lgamma implementation in stan-math is based on either the reentrant safe lgamma_r implementation ...
 
MPI command template which calls the static method distributed_apply of the given class F.