TECA
The Toolkit for Extreme Climate Analysis
teca_cuda_thread_pool.h
1 #ifndef teca_cuda_thread_pool_h
2 #define teca_cuda_thread_pool_h
3 
4 #include "teca_config.h"
5 #include "teca_common.h"
6 #include "teca_algorithm.h"
7 #include "teca_thread_util.h"
8 #include "teca_threadsafe_queue.h"
9 #include "teca_mpi.h"
10 
11 #include <sstream>
12 #include <vector>
13 #include <thread>
14 #include <atomic>
15 #include <mutex>
16 #include <future>
17 #include <chrono>
18 #include <algorithm>
19 #if defined(_GNU_SOURCE)
20 #include <pthread.h>
21 #include <sched.h>
22 #include <deque>
23 #endif
24 
25 
26 template <typename task_t, typename data_t>
28 
29 /// a shared pointer managing a teca_cuda_thread_pool instance.
30 template <typename task_t, typename data_t>
31 using p_teca_cuda_thread_pool = std::shared_ptr<teca_cuda_thread_pool<task_t, data_t>>;
32 
33 
34 /// A class to manage a fixed size pool of threads that dispatch work.
35 /** Each thread in the pool services a specific CUDA device or CPU core. During
36  * execution each thread assigns work via the device_id request key to the CUDA
37  * device or CPU which it services. The default number of threads per CUDA
38  * device is 8. This can be overriden via the n_threads_per_device parameter or
39  * the TECA_THREADS_PER_CUDA_DEVICE environment variable. Once a CUDA device
40  * reaches the maximum specified number of threads per device, no more threads
41  * will assign work to it. Once all available CUDA devices reach the maximum
42  * specified number of threads per device, all remaining threads in the pool
43  * will assign work to the CPU cores to which they are bound.
44  *
45  * Upstream algorithms must examine the device_id field in the request to
46  * determine which CUDA device or CPU they should use for calculations. The
47  * algorithm should allocate memory and invoke computations only on the
48  * assigned device. Algorithms that do not support calculation on CUDA GPU
49  * will ignore the assignment and make use of the CPU.
50  */
51 template <typename task_t, typename data_t>
53 {
54 public:
55  teca_cuda_thread_pool() = delete;
56  ~teca_cuda_thread_pool() noexcept;
57 
58  /** construct/destruct the thread pool.
59  *
60  * @param[in] comm communicator over which to map threads. Use
61  * MPI_COMM_SELF for local mapping and MPI_COMM_NULL
62  * to exclude this process from execution.
63  *
64  * @param[in] n_threads number of threads to create for the pool. -1 will
65  * create 1 thread per physical CPU core. all MPI
66  * ranks running on the same node are taken into
67  * account, resulting in 1 thread per core node wide.
68  *
69  * @param[in[ n_threads_per_device number of threads to assign to each CUDA
70  * device. -1 for all threads assigned.
71  *
72  * @param[in] bind bind each thread to a specific core.
73  *
74  * @param[in] verbose print a report of the thread to core bindings
75  */
76  teca_cuda_thread_pool(MPI_Comm comm, int n_threads,
77  int n_threads_per_device, bool bind, bool verbose);
78 
79  // get rid of copy and asignment
80  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_cuda_thread_pool)
81 
82  /** add a data request task to the queue, returns a future from which the
83  * generated dataset can be accessed.
84  */
85  void push_task(task_t &task);
86 
87  /** wait for all of the requests to execute and transfer datasets in the
88  * order that corresponding requests were added to the queue.
89  */
90  template <template <typename ... > class container_t, typename ... args>
91  void wait_all(container_t<data_t, args ...> &data);
92 
93  /** wait for some of the requests to execute. datasets will be retruned as
94  * they become ready. n_to_wait specifies how many datasets to gather but
95  * there are three cases when the number of datasets returned differs from
96  * n_to_wait. when n_to_wait is larger than the number of tasks remaining,
97  * datasets from all of the remaining tasks is returned. when n_to_wait is
98  * smaller than the number of datasets ready, all of the currenttly ready
99  * data are returned. finally, when n_to_wait is < 1 the call blocks until
100  * all of the tasks complete and all of the data is returned.
101  */
102  template <template <typename ... > class container_t, typename ... args>
103  int wait_some(long n_to_wait, long long poll_interval,
104  container_t<data_t, args ...> &data);
105 
106  /// get the number of threads
107  unsigned int size() const noexcept
108  { return m_threads.size(); }
109 
110 private:
111  /// create n threads for the pool
112  void create_threads(MPI_Comm comm, int n_threads,
113  int n_threads_per_device, bool bind, bool verbose);
114 
115 private:
116  std::atomic<bool> m_live;
118  std::vector<std::future<data_t>> m_futures;
119  std::vector<std::thread> m_threads;
120 };
121 
122 // --------------------------------------------------------------------------
123 template <typename task_t, typename data_t>
125  int n_threads, int n_threads_per_device, bool bind, bool verbose) : m_live(true)
126 {
127  this->create_threads(comm, n_threads, n_threads_per_device, bind, verbose);
128 }
129 
130 // --------------------------------------------------------------------------
131 template <typename task_t, typename data_t>
133  int n_requested, int n_per_device, bool bind, bool verbose)
134 {
135  int n_threads = n_requested;
136 
137  // determine available CPU cores
138  std::deque<int> core_ids;
139  std::vector<int> device_ids;
140 
142  n_requested, n_per_device, bind, verbose, n_threads,
143  core_ids, device_ids))
144  {
145  TECA_WARNING("Failed to detetermine thread parameters."
146  " Falling back to 1 thread, affinity disabled.")
147 
148  n_threads = 1;
149  bind = false;
150  }
151 
152  for (int i = 0; i < n_threads; ++i)
153  {
154  int device_id = device_ids[i];
155  m_threads.push_back(std::thread([this, device_id]()
156  {
157  // "main" for each thread in the pool
158  while (m_live.load())
159  {
160  task_t task;
161  if (m_queue.try_pop(task))
162  task(device_id);
163  else
164  std::this_thread::yield();
165  }
166  }));
167 #if defined(_GNU_SOURCE)
168  // bind each to a hyperthread
169  if (bind)
170  {
171  int core_id = core_ids[i];
172 
173  cpu_set_t core_mask;
174  CPU_ZERO(&core_mask);
175  CPU_SET(core_id, &core_mask);
176 
177  if (pthread_setaffinity_np(m_threads[i].native_handle(),
178  sizeof(cpu_set_t), &core_mask))
179  {
180  TECA_WARNING("Failed to set thread affinity.")
181  }
182  }
183 #endif
184  }
185 }
186 
187 // --------------------------------------------------------------------------
188 template <typename task_t, typename data_t>
190 {
191  m_live = false;
192  std::for_each(m_threads.begin(), m_threads.end(),
193  [](std::thread &t) { t.join(); });
194 }
195 
196 // --------------------------------------------------------------------------
197 template <typename task_t, typename data_t>
199 {
200  m_futures.push_back(task.get_future());
201  m_queue.push(std::move(task));
202 }
203 
204 // --------------------------------------------------------------------------
205 template <typename task_t, typename data_t>
206 template <template <typename ... > class container_t, typename ... args>
207 int teca_cuda_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
208  long long poll_interval, container_t<data_t, args ...> &data)
209 {
210  long n_tasks = m_futures.size();
211 
212  // wait for all
213  if (n_to_wait < 1)
214  {
215  this->wait_all(data);
216  return 0;
217  }
218  // wait for at most the number of queued tasks
219  else if (n_to_wait > n_tasks)
220  n_to_wait = n_tasks;
221 
222 
223  // gather the requested number of datasets
224  while (1)
225  {
226  // scan the tasks once. capture any data that is ready
227  auto it = m_futures.begin();
228  while (it != m_futures.end())
229  {
230  std::future_status stat = it->wait_for(std::chrono::seconds::zero());
231  if (stat == std::future_status::ready)
232  {
233  data.push_back(it->get());
234  it = m_futures.erase(it);
235  }
236  else
237  {
238  ++it;
239  }
240  }
241 
242  // if we have not accumulated the requested number of datasets
243  // wait for the user supplied duration before re-scanning
244  if (data.size() < static_cast<unsigned int>(n_to_wait))
245  std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
246  else
247  break;
248  }
249 
250  // return the number of tasks remaining
251  return m_futures.size();
252 }
253 
254 // --------------------------------------------------------------------------
255 template <typename task_t, typename data_t>
256 template <template <typename ... > class container_t, typename ... args>
257 void teca_cuda_thread_pool<task_t, data_t>::wait_all(container_t<data_t, args ...> &data)
258 {
259  // wait on all pending requests and gather the generated
260  // datasets
261  std::for_each(m_futures.begin(), m_futures.end(),
262  [&data] (std::future<data_t> &f)
263  {
264  data.push_back(f.get());
265  });
266  m_futures.clear();
267 }
268 
269 #endif
teca_thread_util.h
teca_common.h
teca_threadsafe_queue< task_t >
teca_thread_util::thread_parameters
TECA_EXPORT int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, int n_threads_per_device, bool bind, bool verbose, int &n_threads, std::deque< int > &affinity, std::vector< int > &device_ids)
teca_cuda_thread_pool::push_task
void push_task(task_t &task)
Definition: teca_cuda_thread_pool.h:198
teca_cuda_thread_pool::size
unsigned int size() const noexcept
get the number of threads
Definition: teca_cuda_thread_pool.h:107
teca_cuda_thread_pool
A class to manage a fixed size pool of threads that dispatch work.
Definition: teca_cuda_thread_pool.h:27
TECA_WARNING
#define TECA_WARNING(_msg)
Constructs a warning message and sends it to the stderr stream.
Definition: teca_common.h:149
teca_error::TECA_EXPORT
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.