TECA
The Toolkit for Extreme Climate Analysis
teca_cuda_thread_pool.h
1 #ifndef teca_cuda_thread_pool_h
2 #define teca_cuda_thread_pool_h
3 
4 #include "teca_config.h"
5 #include "teca_common.h"
6 #include "teca_algorithm.h"
7 #include "teca_thread_util.h"
8 #include "teca_threadsafe_queue.h"
9 #include "teca_owned_future.h"
10 #include "teca_mpi.h"
11 
12 #include <sstream>
13 #include <vector>
14 #include <thread>
15 #include <atomic>
16 #include <mutex>
17 #include <future>
18 #include <chrono>
19 #include <algorithm>
20 #if defined(_GNU_SOURCE)
21 #include <pthread.h>
22 #include <sched.h>
23 #include <deque>
24 #endif
25 
26 
27 template <typename task_t, typename data_t>
29 
30 /// a shared pointer managing a teca_cuda_thread_pool instance.
31 template <typename task_t, typename data_t>
32 using p_teca_cuda_thread_pool = std::shared_ptr<teca_cuda_thread_pool<task_t, data_t>>;
33 
34 
35 /// A class to manage a fixed size pool of threads that dispatch work.
36 /** Each thread in the pool services a specific CUDA device or CPU core. During
37  * execution each thread assigns work via the device_id request key to the CUDA
38  * device or CPU which it services. The default number of threads per CUDA
39  * device is 8. This can be overriden via the n_threads_per_device parameter or
40  * the TECA_THREADS_PER_CUDA_DEVICE environment variable. Once a CUDA device
41  * reaches the maximum specified number of threads per device, no more threads
42  * will assign work to it. Once all available CUDA devices reach the maximum
43  * specified number of threads per device, all remaining threads in the pool
44  * will assign work to the CPU cores to which they are bound.
45  *
46  * Upstream algorithms must examine the device_id field in the request to
47  * determine which CUDA device or CPU they should use for calculations. The
48  * algorithm should allocate memory and invoke computations only on the
49  * assigned device. Algorithms that do not support calculation on CUDA GPU
50  * will ignore the assignment and make use of the CPU.
51  */
52 template <typename task_t, typename data_t>
54 {
55 public:
56  teca_cuda_thread_pool() = delete;
57  ~teca_cuda_thread_pool() noexcept;
58 
59  /** construct/destruct the thread pool.
60  *
61  * @param[in] comm communicator over which to map threads. Use
62  * MPI_COMM_SELF for local mapping and MPI_COMM_NULL
63  * to exclude this process from execution.
64  *
65  * @param[in] n_threads number of threads to create for the pool. -1 will
66  * create 1 thread per physical CPU core. all MPI
67  * ranks running on the same node are taken into
68  * account, resulting in 1 thread per core node wide.
69  *
70  * @param[in] threads_per_device number of threads to assign to each
71  * GPU/device. If 0 only CPUs will be used.
72  * If -1 the default of 8 threads per device
73  * will be used.
74  *
75  * @param[in] ranks_per_device the number of MPI ranks to allow access to
76  * to each device/GPU.
77  *
78  * @param[in] bind bind each thread to a specific core.
79  *
80  * @param[in] verbose print a report of the thread to core bindings
81  */
82  teca_cuda_thread_pool(MPI_Comm comm, int n_threads,
83  int threads_per_device, int ranks_per_device, bool bind,
84  bool verbose);
85 
86  // get rid of copy and asignment
87  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_cuda_thread_pool)
88 
89  /** add a data request task to the queue, returns a future from which the
90  * generated dataset can be accessed.
91  */
92  void push_task(task_t &task);
93 
94  /** wait for all of the requests to execute and transfer datasets in the
95  * order that corresponding requests were added to the queue.
96  */
97  template <template <typename ... > class container_t, typename ... args>
98  void wait_all(container_t<data_t, args ...> &data);
99 
100  /** wait for some of the requests to execute. datasets will be retruned as
101  * they become ready. n_to_wait specifies how many datasets to gather but
102  * there are three cases when the number of datasets returned differs from
103  * n_to_wait. when n_to_wait is larger than the number of tasks remaining,
104  * datasets from all of the remaining tasks is returned. when n_to_wait is
105  * smaller than the number of datasets ready, all of the currenttly ready
106  * data are returned. finally, when n_to_wait is < 1 the call blocks until
107  * all of the tasks complete and all of the data is returned.
108  */
109  template <template <typename ... > class container_t, typename ... args>
110  int wait_some(long n_to_wait, long long poll_interval,
111  container_t<data_t, args ...> &data);
112 
113  /// get the number of threads
114  unsigned int size() const noexcept
115  { return m_threads.size(); }
116 
117 private:
118  /// create n threads for the pool
119  void create_threads(MPI_Comm comm, int n_threads,
120  int threads_per_device, int ranks_per_device, bool bind,
121  bool verbose);
122 
123 private:
124  long m_num_futures;
125  std::mutex m_mutex;
126  std::atomic<bool> m_live;
128  std::vector<owned_future<data_t>> m_futures;
129  std::vector<std::thread> m_threads;
130 };
131 
132 // --------------------------------------------------------------------------
133 template <typename task_t, typename data_t>
135  int n_threads, int threads_per_device, int ranks_per_device, bool bind,
136  bool verbose) : m_live(true)
137 {
138  this->create_threads(comm, n_threads, threads_per_device,
139  ranks_per_device, bind, verbose);
140 }
141 
142 // --------------------------------------------------------------------------
143 template <typename task_t, typename data_t>
145  int n_requested, int threads_per_device, int ranks_per_device, bool bind,
146  bool verbose)
147 {
148  m_num_futures = 0;
149 
150  int n_threads = n_requested;
151 
152  // determine available CPU cores
153  std::deque<int> core_ids;
154  std::vector<int> device_ids;
155 
156  if (teca_thread_util::thread_parameters(comm, -1, n_requested,
157  threads_per_device, ranks_per_device, bind, verbose, n_threads,
158  core_ids, device_ids))
159  {
160  TECA_WARNING("Failed to detetermine thread parameters."
161  " Falling back to 1 thread, affinity disabled.")
162 
163  n_threads = 1;
164  bind = false;
165  }
166 
167  for (int i = 0; i < n_threads; ++i)
168  {
169  int device_id = device_ids[i];
170  m_threads.push_back(std::thread([this, device_id]()
171  {
172  // "main" for each thread in the pool
173  while (m_live.load())
174  {
175  task_t task;
176  if (m_queue.try_pop(task))
177  task(device_id);
178  else
179  std::this_thread::yield();
180  }
181  }));
182 #if defined(_GNU_SOURCE)
183  // bind each to a hyperthread
184  if (bind)
185  {
186  int core_id = core_ids[i];
187 
188  cpu_set_t core_mask;
189  CPU_ZERO(&core_mask);
190  CPU_SET(core_id, &core_mask);
191 
192  if (pthread_setaffinity_np(m_threads[i].native_handle(),
193  sizeof(cpu_set_t), &core_mask))
194  {
195  TECA_WARNING("Failed to set thread affinity.")
196  }
197  }
198 #endif
199  }
200 }
201 
202 // --------------------------------------------------------------------------
203 template <typename task_t, typename data_t>
205 {
206  m_live = false;
207  std::for_each(m_threads.begin(), m_threads.end(),
208  [](std::thread &t) { t.join(); });
209 }
210 
211 // --------------------------------------------------------------------------
212 template <typename task_t, typename data_t>
214 {
215  std::lock_guard<std::mutex> lock(m_mutex);
216 
217  ++m_num_futures;
218 
219  m_futures.push_back(task.get_future());
220  m_queue.push(std::move(task));
221 }
222 
223 // --------------------------------------------------------------------------
224 template <typename task_t, typename data_t>
225 template <template <typename ... > class container_t, typename ... args>
226 int teca_cuda_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
227  long long poll_interval, container_t<data_t, args ...> &data)
228 {
229  // wait for all
230  if (n_to_wait < 1)
231  {
232  this->wait_all(data);
233  return 0;
234  }
235 
236  // gather the requested number of datasets
237  size_t thread_valid = 1;
238  while (thread_valid && ((data.size() < static_cast<unsigned int>(n_to_wait))))
239  {
240  {
241  thread_valid = 0;
242  std::lock_guard<std::mutex> lock(m_mutex);
243  // scan the tasks once. capture any data that is ready
244  for (auto it = m_futures.begin(); it != m_futures.end(); ++it)
245  {
246  if (it->owner() && it->m_future.valid())
247  {
248  if ((it->m_future.wait_for(std::chrono::seconds::zero())
249  == std::future_status::ready))
250  {
251  data.push_back(it->m_future.get());
252  --m_num_futures;
253  }
254  else
255  {
256  ++thread_valid;
257  }
258  }
259  }
260  }
261 
262  // we have the requested number of datasets
263  if (data.size() >= static_cast<unsigned int>(n_to_wait))
264  break;
265 
266  // wait for the user supplied duration before re-scanning
267  if (thread_valid)
268  std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
269  }
270 
271  // last one finished clears the futures
272  if (!m_num_futures)
273  {
274  std::lock_guard<std::mutex> lock(m_mutex);
275  if (!m_num_futures)
276  m_futures.clear();
277  }
278 
279  // return the number of tasks remaining
280  return thread_valid;
281 }
282 
283 // --------------------------------------------------------------------------
284 template <typename task_t, typename data_t>
285 template <template <typename ... > class container_t, typename ... args>
286 void teca_cuda_thread_pool<task_t, data_t>::wait_all(container_t<data_t, args ...> &data)
287 {
288  {
289  std::lock_guard<std::mutex> lock(m_mutex);
290  // wait on all pending requests and gather the generated datasets
291  std::for_each(m_futures.begin(), m_futures.end(),
292  [&data,this] (owned_future<data_t> &f)
293  {
294  if (f.owner())
295  {
296  data.push_back(f.m_future.get());
297  --m_num_futures;
298  }
299  });
300  }
301 
302  // last one finished clears the futures
303  if (!m_num_futures)
304  {
305  std::lock_guard<std::mutex> lock(m_mutex);
306  if (!m_num_futures)
307  m_futures.clear();
308  }
309 }
310 
311 #endif
A class to manage a fixed size pool of threads that dispatch work.
Definition: teca_cuda_thread_pool.h:54
void push_task(task_t &task)
Definition: teca_cuda_thread_pool.h:213
unsigned int size() const noexcept
get the number of threads
Definition: teca_cuda_thread_pool.h:114
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.
TECA_EXPORT int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, int n_threads_per_device, int n_ranks_per_device, bool bind, bool verbose, int &n_threads, std::deque< int > &affinity, std::vector< int > &device_ids)
auto data(V &&... args)
Definition: teca_variant_array_util.h:255
a future that is owned by a single thread
Definition: teca_owned_future.h:10
#define TECA_WARNING(_msg)
Constructs a warning message and sends it to the stderr stream.
Definition: teca_common.h:164