TECA
The Toolkit for Extreme Climate Analysis
teca_cpu_thread_pool.h
1 #ifndef teca_cpu_thread_pool_h
2 #define teca_cpu_thread_pool_h
3 
4 #include "teca_config.h"
5 #include "teca_common.h"
6 #include "teca_algorithm.h"
7 #include "teca_thread_util.h"
8 #include "teca_threadsafe_queue.h"
9 #include "teca_owned_future.h"
10 #include "teca_mpi.h"
11 
12 #include <vector>
13 #include <thread>
14 #include <atomic>
15 #include <mutex>
16 #include <future>
17 #include <chrono>
18 #include <algorithm>
19 #if defined(_GNU_SOURCE)
20 #include <pthread.h>
21 #include <sched.h>
22 #include <deque>
23 #endif
24 
25 template <typename task_t, typename data_t>
27 
28 template <typename task_t, typename data_t>
29 using p_teca_cpu_thread_pool = std::shared_ptr<teca_cpu_thread_pool<task_t, data_t>>;
30 
31 /// A class to manage a fixed size pool of threads that dispatch work.
32 template <typename task_t, typename data_t>
34 {
35 public:
36  teca_cpu_thread_pool() = delete;
37 
38  /** construct/destruct the thread pool.
39  * arguments:
40  * @param[in] comm communicator over which to map threads. Use
41  * MPI_COMM_SELF for local mapping.
42  *
43  * @param[in] n number of threads to create for the pool. -1 will
44  * create 1 thread per physical CPU core. all MPI
45  * ranks running on the same node are taken into
46  * account, resulting in 1 thread per core node wide.
47  *
48  * @param[in] bind bind each thread to a specific core.
49  *
50  * @param[in] verbose print a report of the thread to core bindings
51  */
52  teca_cpu_thread_pool(MPI_Comm comm, int n, bool bind, bool verbose);
53  ~teca_cpu_thread_pool() noexcept;
54 
55  // get rid of copy and asignment
56  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_cpu_thread_pool)
57 
58  /** add a data request task to the queue, returns a future from which the
59  * generated dataset can be accessed.
60  */
61  void push_task(task_t &task);
62 
63  /** wait for all of the requests to execute and transfer datasets in the
64  * order that corresponding requests were added to the queue.
65  */
66  template <template <typename ... > class container_t, typename ... args>
67  void wait_all(container_t<data_t, args ...> &data);
68 
69  /** wait for some of the requests to execute. datasets will be returned as
70  * they become ready. n_to_wait specifies how many datasets to gather but
71  * there are three cases when the number of datasets returned differs from
72  * n_to_wait. when n_to_wait is larger than the number of tasks remaining,
73  * datasets from all of the remaining tasks is returned. when n_to_wait is
74  * smaller than the number of datasets ready, all of the currenttly ready
75  * data are returned. finally, when n_to_wait is < 1 the call blocks until
76  * all of the tasks complete and all of the data is returned.
77  */
78  template <template <typename ... > class container_t, typename ... args>
79  int wait_some(long n_to_wait, long long poll_interval,
80  container_t<data_t, args ...> &data);
81 
82  /// get the number of threads
83  unsigned int size() const noexcept
84  { return m_threads.size(); }
85 
86 private:
87  /// create n threads for the pool
88  void create_threads(MPI_Comm comm, int n_threads, bool bind, bool verbose);
89 
90 private:
91  long m_num_futures;
92  std::mutex m_mutex;
93  std::atomic<bool> m_live;
95  std::vector<owned_future<data_t>> m_futures;
96  std::vector<std::thread> m_threads;
97 };
98 
99 // --------------------------------------------------------------------------
100 template <typename task_t, typename data_t>
102  bool bind, bool verbose) : m_live(true)
103 {
104  this->create_threads(comm, n, bind, verbose);
105 }
106 
107 // --------------------------------------------------------------------------
108 template <typename task_t, typename data_t>
110  int n_requested, bool bind, bool verbose)
111 {
112 #if defined(TECA_HAS_MPI)
113  // this rank is excluded from computations
114  if (comm == MPI_COMM_NULL)
115  return;
116 #endif
117 
118  m_num_futures = 0;
119 
120  int n_threads = n_requested;
121 
122  std::deque<int> core_ids;
123  std::vector<int> device_ids;
124 
126  n_requested, -1, -1, bind, verbose, n_threads, core_ids,
127  device_ids))
128  {
129  TECA_WARNING("Failed to detetermine thread parameters."
130  " Falling back to 1 thread, affinity disabled.")
131 
132  n_threads = 1;
133  bind = false;
134  }
135 
136  // allocate the CPU processing threads
137  for (int i = 0; i < n_threads; ++i)
138  {
139  m_threads.push_back(std::thread([this]()
140  {
141  // "main" for each thread in the pool
142  while (m_live.load())
143  {
144  task_t task;
145  if (m_queue.try_pop(task))
146  task(-1);
147  else
148  std::this_thread::yield();
149  }
150  }));
151 #if defined(_GNU_SOURCE)
152  // bind each to a hyperthread
153  if (bind)
154  {
155  int core_id = core_ids.front();
156  core_ids.pop_front();
157 
158  cpu_set_t core_mask;
159  CPU_ZERO(&core_mask);
160  CPU_SET(core_id, &core_mask);
161 
162  if (pthread_setaffinity_np(m_threads[i].native_handle(),
163  sizeof(cpu_set_t), &core_mask))
164  {
165  TECA_WARNING("Failed to set thread affinity.")
166  }
167  }
168 #endif
169  }
170 }
171 
172 // --------------------------------------------------------------------------
173 template <typename task_t, typename data_t>
175 {
176  m_live = false;
177  std::for_each(m_threads.begin(), m_threads.end(),
178  [](std::thread &t) { t.join(); });
179 }
180 
181 // --------------------------------------------------------------------------
182 template <typename task_t, typename data_t>
184 {
185  std::lock_guard<std::mutex> lock(m_mutex);
186 
187  ++m_num_futures;
188 
189  m_futures.emplace_back(owned_future(task.get_future()));
190  m_queue.push(std::move(task));
191 }
192 
193 // --------------------------------------------------------------------------
194 template <typename task_t, typename data_t>
195 template <template <typename ... > class container_t, typename ... args>
196 int teca_cpu_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
197  long long poll_interval, container_t<data_t, args ...> &data)
198 {
199  // wait for all
200  if (n_to_wait < 1)
201  {
202  this->wait_all(data);
203  return 0;
204  }
205 
206  // gather the requested number of datasets
207  size_t thread_valid = 1;
208  while (thread_valid)
209  {
210  {
211  // scan the tasks once. capture any data that is ready
212  thread_valid = 0;
213  std::lock_guard<std::mutex> lock(m_mutex);
214  for (auto it = m_futures.begin(); it != m_futures.end(); ++it)
215  {
216  if (it->owner() && it->m_future.valid())
217  {
218  if ((it->m_future.wait_for(std::chrono::seconds::zero())
219  == std::future_status::ready))
220  {
221  data.push_back(it->m_future.get());
222  --m_num_futures;
223  }
224  else
225  {
226  ++thread_valid;
227  }
228  }
229  }
230  }
231 
232  // we have the requested number of datasets
233  if (data.size() >= static_cast<unsigned int>(n_to_wait))
234  break;
235 
236  // wait for the user supplied duration before re-scanning
237  if (thread_valid)
238  std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
239  }
240 
241  // last one finished clears the futures
242  if (!m_num_futures)
243  {
244  std::lock_guard<std::mutex> lock(m_mutex);
245  if (!m_num_futures)
246  m_futures.clear();
247  }
248 
249  // return the number of tasks remaining
250  return thread_valid;
251 }
252 
253 // --------------------------------------------------------------------------
254 template <typename task_t, typename data_t>
255 template <template <typename ... > class container_t, typename ... args>
256 void teca_cpu_thread_pool<task_t, data_t>::wait_all(container_t<data_t, args ...> &data)
257 {
258  {
259  std::lock_guard<std::mutex> lock(m_mutex);
260  // wait on all pending requests and gather the generated datasets
261  std::for_each(m_futures.begin(), m_futures.end(),
262  [&data,this] (owned_future<data_t> &f)
263  {
264  if (f.owner())
265  {
266  data.push_back(f.m_future.get());
267  --m_num_futures;
268  }
269  });
270  }
271 
272  // last one finished clears the futures
273  if (!m_num_futures)
274  {
275  std::lock_guard<std::mutex> lock(m_mutex);
276  if (!m_num_futures)
277  m_futures.clear();
278  }
279 }
280 
281 #endif
A class to manage a fixed size pool of threads that dispatch work.
Definition: teca_cpu_thread_pool.h:34
unsigned int size() const noexcept
get the number of threads
Definition: teca_cpu_thread_pool.h:83
void push_task(task_t &task)
Definition: teca_cpu_thread_pool.h:183
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.
TECA_EXPORT int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, int n_threads_per_device, int n_ranks_per_device, bool bind, bool verbose, int &n_threads, std::deque< int > &affinity, std::vector< int > &device_ids)
auto data(V &&... args)
Definition: teca_variant_array_util.h:255
a future that is owned by a single thread
Definition: teca_owned_future.h:10
#define TECA_WARNING(_msg)
Constructs a warning message and sends it to the stderr stream.
Definition: teca_common.h:164