Automatic Differentiation
 
Loading...
Searching...
No Matches
mpi_parallel_call.hpp
Go to the documentation of this file.
1#ifdef STAN_MPI
2
3#ifndef STAN_MATH_PRIM_FUNCTOR_MPI_PARALLEL_CALL_HPP
4#define STAN_MATH_PRIM_FUNCTOR_MPI_PARALLEL_CALL_HPP
5
11
12#include <mutex>
13#include <algorithm>
14#include <vector>
15#include <type_traits>
16#include <functional>
17
18namespace stan {
19namespace math {
20
21namespace internal {
22
40template <int call_id, int member, typename T>
42 static T local_;
43 static bool is_valid_;
44
45 public:
46 using cache_t = const T;
47
50 = delete;
53 = delete;
54
59 static bool is_valid() { return is_valid_; }
60
66 static void store(const T& data) {
67 if (is_valid_)
68 throw std::runtime_error("Cache can only store a single data item.");
69 local_ = data;
70 is_valid_ = true;
71 }
72
78 static cache_t& data() {
79 if (unlikely(!is_valid_))
80 throw std::runtime_error("Cache not yet valid.");
81 return local_;
82 }
83};
84
85template <int call_id, int member, typename T>
87
88template <int call_id, int member, typename T>
90
91} // namespace internal
92
160template <int call_id, typename ReduceF, typename CombineF>
162 boost::mpi::communicator world_;
163 const std::size_t rank_ = world_.rank();
164 const std::size_t world_size_ = world_.size();
165 std::unique_lock<std::mutex> cluster_lock_;
166
167 using result_t = typename CombineF::result_t;
168
169 // local caches which hold local slices of data
172 std::vector<std::vector<double>>>;
175 std::vector<std::vector<int>>>;
180
181 // # of outputs for given call_id+ReduceF+CombineF case
183
184 CombineF combine_;
185
188
189 public:
203 template <typename T_shared_param, typename T_job_param>
205 const T_shared_param& shared_params,
206 const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>&
207 job_params,
208 const std::vector<std::vector<double>>& x_r,
209 const std::vector<std::vector<int>>& x_i)
210 : combine_(shared_params, job_params) {
211 if (rank_ != 0)
212 throw std::runtime_error(
213 "problem sizes may only be defined on the root.");
214
215 check_matching_sizes("mpi_parallel_call", "job parameters", job_params,
216 "continuous data", x_r);
217 check_matching_sizes("mpi_parallel_call", "job parameters", job_params,
218 "integer data", x_i);
219
221 int cached_num_jobs = sum(cache_chunks::data());
222 check_size_match("mpi_parallel_call", "cached number of jobs",
223 cached_num_jobs, "number of jobs", job_params.size());
224 }
225
226 // make children aware of upcoming job & obtain cluster lock
229
230 const std::vector<int> job_dims = dims(job_params);
231
232 const size_type num_jobs = job_dims[0];
233 const size_type num_job_params = num_jobs == 0 ? 0 : job_dims[1];
234
235 const vector_d shared_params_dbl = value_of(shared_params);
236 matrix_d job_params_dbl(num_job_params, num_jobs);
237
238 for (int j = 0; j < num_jobs; ++j)
239 job_params_dbl.col(j) = value_of(job_params[j]);
240
241 setup_call(shared_params_dbl, job_params_dbl, x_r, x_i);
242 }
243
244 // called on remote sites
246 if (rank_ == 0)
247 throw std::runtime_error("problem sizes must be defined on the root.");
248
249 setup_call(vector_d(), matrix_d(), std::vector<std::vector<double>>(),
250 std::vector<std::vector<int>>());
251 }
252
256 static void distributed_apply() {
257 // call constructor for the remotes
259
260 job_chunk.reduce_combine();
261 }
262
270 const std::vector<int>& job_chunks = cache_chunks::data();
271 const int num_jobs = sum(job_chunks);
272
273 const int first_job
274 = std::accumulate(job_chunks.begin(), job_chunks.begin() + rank_, 0);
275 const int num_local_jobs = local_job_params_dbl_.cols();
276 int local_outputs_per_job = num_local_jobs == 0 ? 0 : num_outputs_per_job_;
277 matrix_d local_output(
278 local_outputs_per_job == -1 ? 0 : local_outputs_per_job,
279 num_local_jobs);
280 std::vector<int> local_f_out(num_local_jobs, -1);
281
282 typename cache_x_r::cache_t& local_x_r = cache_x_r::data();
283 typename cache_x_i::cache_t& local_x_i = cache_x_i::data();
284
285 // check if we know already output sizes
286 if (cache_f_out::is_valid()) {
287 typename cache_f_out::cache_t& f_out = cache_f_out::data();
288 const int num_outputs
289 = std::accumulate(f_out.begin() + first_job,
290 f_out.begin() + first_job + num_local_jobs, 0);
291 local_output.resize(Eigen::NoChange, num_outputs);
292 }
293
294 int local_ok = 1;
295 try {
296 for (int i = 0, offset = 0; i < num_local_jobs;
297 offset += local_f_out[i], ++i) {
298 const matrix_d job_output
300 local_x_r[i], local_x_i[i], 0);
301 local_f_out[i] = job_output.cols();
302
303 if (local_outputs_per_job == -1) {
304 local_outputs_per_job = job_output.rows();
305 // changing rows, but leave column size as is
306 local_output.conservativeResize(local_outputs_per_job,
307 Eigen::NoChange);
308 }
309
310 if (local_output.cols() < offset + local_f_out[i])
311 // leave row size, but change columns size
312 local_output.conservativeResize(Eigen::NoChange,
313 2 * (offset + local_f_out[i]));
314
315 local_output.block(0, offset, local_output.rows(), local_f_out[i])
316 = job_output;
317 }
318 } catch (const std::exception& e) {
319 // see note 1 above for an explanation why we do not rethrow
320 // here, but mereley flag it to keep the cluster synchronized
321 local_ok = 0;
322 }
323
324 // during first execution we distribute the output sizes from
325 // local jobs to the root. This needs to be done for the number of
326 // outputs of each result and the number of outputs per job.
327 if (num_outputs_per_job_ == -1) {
328 // before we can cache the sizes locally we must ensure
329 // that no exception has been fired from any node. Hence,
330 // share the info about the current status across all nodes.
331 int cluster_status = 0;
332 boost::mpi::reduce(world_, local_ok, cluster_status, std::plus<int>(), 0);
333 bool all_ok = cluster_status == static_cast<int>(world_size_);
334 boost::mpi::broadcast(world_, all_ok, 0);
335 if (!all_ok) {
336 // err out on the root
337 if (rank_ == 0) {
338 throw std::domain_error("MPI error on first evaluation.");
339 }
340 // and ensure on the workers that they return into their
341 // listening state
342 return result_t();
343 }
344
345 // execution was ok everywhere such that we can now distribute the
346 // outputs per job as recorded on the root on the first execution
347 boost::mpi::broadcast(world_, local_outputs_per_job, 0);
348 num_outputs_per_job_ = local_outputs_per_job;
349
350 // the f_out cache may not yet be synchronized which we can do
351 // now given that this first execution was just fine
352 if (!cache_f_out::is_valid()) {
353 std::vector<int> world_f_out(num_jobs, 0);
354 boost::mpi::gatherv(world_, local_f_out.data(), num_local_jobs,
355 world_f_out.data(), job_chunks, 0);
356 // on the root we now have all sizes from all children. Copy
357 // over the local sizes to the world vector on each local node
358 // in order to cache this information locally.
359 std::copy(local_f_out.begin(), local_f_out.end(),
360 world_f_out.begin() + first_job);
361 cache_f_out::store(world_f_out);
362 }
363 }
364
365 typename cache_f_out::cache_t& world_f_out = cache_f_out::data();
366
367 // check that cached sizes are the same as just collected from
368 // this evaluation
369 for (int i = 0; i < num_local_jobs; ++i) {
370 if (world_f_out[first_job + i] != local_f_out[i]) {
371 // in case of a mismatch we mark so, but flag the error only
372 // on the root, see note 1 above.
373 local_ok = 0;
374 break;
375 }
376 }
377
378 const std::size_t size_world_f_out = sum(world_f_out);
379 matrix_d world_result(num_outputs_per_job_, size_world_f_out);
380
381 std::vector<int> chunks_result(world_size_, 0);
382 for (std::size_t i = 0, k = 0; i != world_size_; ++i)
383 for (int j = 0; j != job_chunks[i]; ++j, ++k)
384 chunks_result[i] += world_f_out[k] * num_outputs_per_job_;
385
386 // collect results on root
387 boost::mpi::gatherv(world_, local_output.data(), chunks_result[rank_],
388 world_result.data(), chunks_result, 0);
389
390 // let root know if all went fine everywhere
391 int cluster_status = 0;
392 boost::mpi::reduce(world_, local_ok, cluster_status, std::plus<int>(), 0);
393 bool all_ok = cluster_status == static_cast<int>(world_size_);
394
395 // on the workers all is done now.
396 if (rank_ != 0)
397 return result_t();
398
399 // in case something went wrong we throw on the root
400 if (!all_ok)
401 throw std::domain_error("Error during MPI evaluation.");
402
403 return combine_(world_result, world_f_out);
404 }
405
406 private:
422 template <typename T_cache>
423 typename T_cache::cache_t& scatter_array_2d_cached(
424 typename T_cache::cache_t& data) {
425 // distribute data only if not in cache yet
426 if (T_cache::is_valid()) {
427 return T_cache::data();
428 }
429
430 // number of elements of each data item must be transferred to
431 // the workers
432 std::vector<int> data_dims = dims(data);
433 data_dims.resize(2);
434
435 boost::mpi::broadcast(world_, data_dims.data(), 2, 0);
436
437 const std::vector<int> job_chunks = mpi_map_chunks(data_dims[0], 1);
438 const std::vector<int> data_chunks
439 = mpi_map_chunks(data_dims[0], data_dims[1]);
440
441 auto flat_data = to_array_1d(data);
442 decltype(flat_data) local_flat_data(data_chunks[rank_]);
443
444 if (data_dims[0] * data_dims[1] > 0) {
445 if (rank_ == 0) {
446 boost::mpi::scatterv(world_, flat_data.data(), data_chunks,
447 local_flat_data.data(), 0);
448 } else {
449 boost::mpi::scatterv(world_, local_flat_data.data(), data_chunks[rank_],
450 0);
451 }
452 }
453
454 std::vector<decltype(flat_data)> local_data;
455 auto local_iter = local_flat_data.begin();
456 for (int i = 0; i != job_chunks[rank_]; ++i) {
457 typename T_cache::cache_t::value_type const data_elem(
458 local_iter, local_iter + data_dims[1]);
459 local_data.push_back(data_elem);
460 local_iter += data_dims[1];
461 }
462
463 // finally we cache it locally
464 T_cache::store(local_data);
465 return T_cache::data();
466 }
467
482 template <typename T_cache>
483 typename T_cache::cache_t& broadcast_array_1d_cached(
484 typename T_cache::cache_t& data) {
485 if (T_cache::is_valid()) {
486 return T_cache::data();
487 }
488
489 std::size_t data_size = data.size();
490 boost::mpi::broadcast(world_, data_size, 0);
491
492 std::vector<typename T_cache::cache_t::value_type> local_data = data;
493 local_data.resize(data_size);
494
495 boost::mpi::broadcast(world_, local_data.data(), data_size, 0);
496 T_cache::store(local_data);
497 return T_cache::data();
498 }
499
512 template <int meta_cache_id>
514 using meta_cache
515 = internal::mpi_parallel_call_cache<call_id, meta_cache_id,
516 std::vector<size_type>>;
517 const std::vector<size_type>& data_size
518 = broadcast_array_1d_cached<meta_cache>({data.size()});
519
520 vector_d local_data = data;
521 local_data.resize(data_size[0]);
522
523 boost::mpi::broadcast(world_, local_data.data(), data_size[0], 0);
524
525 return local_data;
526 }
527
540 template <int meta_cache_id>
542 using meta_cache
543 = internal::mpi_parallel_call_cache<call_id, meta_cache_id,
544 std::vector<size_type>>;
545 const std::vector<size_type>& dims
546 = broadcast_array_1d_cached<meta_cache>({data.rows(), data.cols()});
547 const size_type rows = dims[0];
548 const size_type total_cols = dims[1];
549
550 const std::vector<int> job_chunks = mpi_map_chunks(total_cols, 1);
551 const std::vector<int> data_chunks = mpi_map_chunks(total_cols, rows);
552 matrix_d local_data(rows, job_chunks[rank_]);
553 if (rows * total_cols > 0) {
554 if (rank_ == 0) {
555 boost::mpi::scatterv(world_, data.data(), data_chunks,
556 local_data.data(), 0);
557 } else {
558 boost::mpi::scatterv(world_, local_data.data(), data_chunks[rank_], 0);
559 }
560 }
561
562 return local_data;
563 }
564
565 void setup_call(const vector_d& shared_params, const matrix_d& job_params,
566 const std::vector<std::vector<double>>& x_r,
567 const std::vector<std::vector<int>>& x_i) {
568 std::vector<int> job_chunks = mpi_map_chunks(job_params.cols(), 1);
569 broadcast_array_1d_cached<cache_chunks>(job_chunks);
570
571 local_shared_params_dbl_ = broadcast_vector<-1>(shared_params);
572 local_job_params_dbl_ = scatter_matrix<-2>(job_params);
573
574 // distribute const data if not yet cached
575 scatter_array_2d_cached<cache_x_r>(x_r);
576 scatter_array_2d_cached<cache_x_i>(x_i);
577 }
578};
579
580template <int call_id, typename ReduceF, typename CombineF>
582
583} // namespace math
584} // namespace stan
585
586#endif
587
588#endif
mpi_parallel_call_cache & operator=(const mpi_parallel_call_cache< call_id, member, T > &)=delete
mpi_parallel_call_cache(const mpi_parallel_call_cache< call_id, member, T > &)=delete
static void store(const T &data)
Store data to be cached locally.
static bool is_valid()
Query if cache is in valid which it is once data has been stored.
static cache_t & data()
Obtain const reference to locally cached data if cache is valid (throws otherwise).
Container for locally cached data which is essentially implemented as singleton.
typename CombineF::result_t result_t
vector_d broadcast_vector(const vector_d &data)
Broadcasts an Eigen vector to the cluster.
matrix_d scatter_matrix(const matrix_d &data)
Scatters an Eigen matrix column wise over the cluster.
mpi_parallel_call(const T_shared_param &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 > > &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i)
Initiates a parallel MPI call on the root.
result_t reduce_combine()
Once all data is distributed and cached the reduce_combine evaluates all assigned function evaluation...
void setup_call(const vector_d &shared_params, const matrix_d &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i)
T_cache::cache_t & broadcast_array_1d_cached(typename T_cache::cache_t &data)
Performs a cached broadcast of a 1D array (std::vector).
static void distributed_apply()
Entry point on the workers for the mpi_parallel_call.
boost::mpi::communicator world_
std::unique_lock< std::mutex > cluster_lock_
T_cache::cache_t & scatter_array_2d_cached(typename T_cache::cache_t &data)
Performs a cached scatter of a 2D array (nested std::vector).
The MPI parallel call class manages the distributed evaluation of a collection of tasks following the...
#define unlikely(x)
int64_t rows(const T_x &x)
Returns the number of rows in the specified kernel generator expression.
Definition rows.hpp:22
auto to_array_1d(T_x &&x)
Returns input matrix reshaped into a vector.
void dims(const T_x &x, std::vector< int > &result)
matrix_cl overload of the dims helper function in prim/fun/dims.hpp.
Definition dims.hpp:21
static constexpr double e()
Return the base of the natural logarithm.
Definition constants.hpp:20
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Type for (column) vector of double values.
Definition typedefs.hpp:24
T value_of(const fvar< T > &v)
Return the value of the specified variable.
Definition value_of.hpp:18
void check_matching_sizes(const char *function, const char *name1, const T_y1 &y1, const char *name2, const T_y2 &y2)
Check if two structures at the same size.
std::vector< int > mpi_map_chunks(std::size_t num_jobs, std::size_t chunk_size=1)
Maps jobs of given chunk size to workers and returning a vector of counts.
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic >::Index size_type
Type for sizes and indexes in an Eigen matrix with double elements.
Definition typedefs.hpp:11
auto sum(const std::vector< T > &m)
Return the sum of the entries of the specified standard vector.
Definition sum.hpp:23
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Type for matrix of double values.
Definition typedefs.hpp:19
std::unique_lock< std::mutex > mpi_broadcast_command()
Broadcasts default constructible commands to the cluster.
void check_size_match(const char *function, const char *name_i, T_size1 i, const char *name_j, T_size2 j)
Check if the provided sizes match.
The lgamma implementation in stan-math is based on either the reentrant safe lgamma_r implementation ...
MPI command template which calls the static method distributed_apply of the given class F.