Automatic Differentiation
 
Loading...
Searching...
No Matches
map_rect_concurrent.hpp
Go to the documentation of this file.
1#ifndef STAN_MATH_REV_FUNCTOR_MAP_RECT_CONCURRENT_HPP
2#define STAN_MATH_REV_FUNCTOR_MAP_RECT_CONCURRENT_HPP
3
10
11#include <tbb/parallel_for.h>
12#include <tbb/blocked_range.h>
13
14#include <algorithm>
15#include <vector>
16
17namespace stan {
18namespace math {
19namespace internal {
20
21template <int call_id, typename F, typename T_shared_param,
22 typename T_job_param, require_eigen_col_vector_t<T_shared_param>*>
23Eigen::Matrix<return_type_t<T_shared_param, T_job_param>, Eigen::Dynamic, 1>
25 const T_shared_param& shared_params,
26 const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>&
27 job_params,
28 const std::vector<std::vector<double>>& x_r,
29 const std::vector<std::vector<int>>& x_i, std::ostream* msgs) {
30 using ReduceF
33
34 const int num_jobs = job_params.size();
35 const vector_d shared_params_dbl = value_of(shared_params);
36 std::vector<matrix_d> job_output(num_jobs);
37 std::vector<int> world_f_out(num_jobs, 0);
38
39 auto execute_chunk = [&](std::size_t start, std::size_t end) -> void {
40 for (std::size_t i = start; i != end; ++i) {
41 job_output[i] = ReduceF()(shared_params_dbl, value_of(job_params[i]),
42 x_r[i], x_i[i], msgs);
43 world_f_out[i] = job_output[i].cols();
44 }
45 };
46
47#ifdef STAN_THREADS
48 // we must use task isolation as described here:
49 // https://software.intel.com/content/www/us/en/develop/documentation/tbb-documentation/top/intel-threading-building-blocks-developer-guide/task-isolation.html
50 // this is to ensure that the thread local AD tape ressource is
51 // not being modified from a different task which may happen
52 // whenever this function is being used itself in a parallel
53 // context (like running multiple chains for Stan)
54 tbb::this_task_arena::isolate([&] {
55 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, num_jobs),
56 [&](const tbb::blocked_range<size_t>& r) {
57 execute_chunk(r.begin(), r.end());
58 });
59 });
60#else
61 execute_chunk(0, num_jobs);
62#endif
63
64 // collect results
65 const int num_world_output
66 = std::accumulate(world_f_out.begin(), world_f_out.end(), 0);
67 matrix_d world_output(job_output[0].rows(), num_world_output);
68
69 int offset = 0;
70 for (const auto& job : job_output) {
71 const int num_job_outputs = job.cols();
72
73 world_output.block(0, offset, world_output.rows(), num_job_outputs) = job;
74
75 offset += num_job_outputs;
76 }
77 CombineF combine(shared_params, job_params);
78 return combine(world_output, world_f_out);
79}
80
81} // namespace internal
82} // namespace math
83} // namespace stan
84
85#endif
int64_t rows(const T_x &x)
Returns the number of rows in the specified kernel generator expression.
Definition rows.hpp:22
Eigen::Matrix< return_type_t< T_shared_param, T_job_param >, Eigen::Dynamic, 1 > map_rect_concurrent(const T_shared_param &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 > > &job_params, const std::vector< std::vector< double > > &x_r, const std::vector< std::vector< int > > &x_i, std::ostream *msgs=nullptr)
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Type for (column) vector of double values.
Definition typedefs.hpp:24
T value_of(const fvar< T > &v)
Return the value of the specified variable.
Definition value_of.hpp:18
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Type for matrix of double values.
Definition typedefs.hpp:19
The lgamma implementation in stan-math is based on either the reentrant safe lgamma_r implementation ...