3#ifndef STAN_MATH_PRIM_FUNCTOR_MPI_CLUSTER_HPP
4#define STAN_MATH_PRIM_FUNCTOR_MPI_CLUSTER_HPP
8#include <boost/mpi/allocator.hpp>
9#include <boost/mpi/collectives.hpp>
10#include <boost/mpi/communicator.hpp>
11#include <boost/mpi/datatype.hpp>
12#include <boost/mpi/environment.hpp>
13#include <boost/mpi/nonblocking.hpp>
14#include <boost/mpi/operations.hpp>
16#include <boost/serialization/access.hpp>
17#include <boost/serialization/base_object.hpp>
18#include <boost/serialization/export.hpp>
19#include <boost/serialization/shared_ptr.hpp>
33 virtual const char*
what()
const throw() {
34 return "Stopping MPI listening mode.";
42 virtual const char*
what()
const throw() {
return "MPI resource is in use."; }
51 template <
class Archive>
52 void serialize(Archive& ar,
const unsigned int version) {
53 ar& BOOST_SERIALIZATION_BASE_OBJECT_NVP(
mpi_command);
56 boost::mpi::communicator world;
85 std::size_t chunk_size = 1) {
86 boost::mpi::communicator world;
87 const std::size_t world_size = world.size();
89 std::vector<int> chunks(world_size, num_jobs / world_size);
91 const std::size_t delta_r = chunks[0] == 0 ? 0 : 1;
93 for (std::size_t r = 0; r != num_jobs % world_size; ++r)
94 ++chunks[r + delta_r];
96 for (std::size_t i = 0; i != world_size; ++i)
97 chunks[i] *= chunk_size;
127 boost::mpi::environment
env;
159 std::unique_lock<std::mutex> worker_lock(
in_use());
161 std::shared_ptr<mpi_command> work;
163 boost::mpi::broadcast(
world_, work, 0);
178 mpi_broadcast_command<mpi_stop_worker>();
195 static std::mutex in_use_mutex;
210 std::shared_ptr<mpi_command>& command) {
211 boost::mpi::communicator world;
213 if (world.rank() != 0)
214 throw std::runtime_error(
"only root may broadcast commands.");
217 throw std::runtime_error(
"cluster is not listening to commands.");
222 if (!cluster_lock.owns_lock())
225 boost::mpi::broadcast(world, command, 0);
239 std::shared_ptr<mpi_command> command(
new T);
virtual const char * what() const
Exception thrown whenever the MPI resource is busy.
virtual const char * what() const
Exception used to stop workers nodes from further listening to commands send from the root.
static constexpr double e()
Return the base of the natural logarithm.
std::vector< int > mpi_map_chunks(std::size_t num_jobs, std::size_t chunk_size=1)
Maps jobs of given chunk size to workers and returning a vector of counts.
std::unique_lock< std::mutex > mpi_broadcast_command()
Broadcasts default constructible commands to the cluster.
The lgamma implementation in stan-math is based on either the reentrant safe lgamma_r implementation ...
void stop_listen()
Stops listening state of the cluster.
static bool & listening_status()
Returns the current listening state of the cluster.
boost::mpi::communicator world_
static std::mutex & in_use()
Returns a reference to the global in use mutex.
boost::mpi::environment env
void listen()
Switches cluster into listening mode.
MPI cluster holds MPI resources and must be initialized only once in any MPI program.
A MPI command object is used to execute code on worker nodes.
void serialize(Archive &ar, const unsigned int version)
friend class boost::serialization::access
MPI command used to stop childs nodes from listening for further commands.