TECA
The Toolkit for Extreme Climate Analysis
teca_cpu_thread_pool.h
1 #ifndef teca_cpu_thread_pool_h
2 #define teca_cpu_thread_pool_h
3 
4 #include "teca_config.h"
5 #include "teca_common.h"
6 #include "teca_algorithm.h"
7 #include "teca_thread_util.h"
8 #include "teca_threadsafe_queue.h"
9 #include "teca_mpi.h"
10 
11 #include <vector>
12 #include <thread>
13 #include <atomic>
14 #include <mutex>
15 #include <future>
16 #include <chrono>
17 #include <algorithm>
18 #if defined(_GNU_SOURCE)
19 #include <pthread.h>
20 #include <sched.h>
21 #include <deque>
22 #endif
23 
24 template <typename task_t, typename data_t>
26 
27 template <typename task_t, typename data_t>
28 using p_teca_cpu_thread_pool = std::shared_ptr<teca_cpu_thread_pool<task_t, data_t>>;
29 
30 /// A class to manage a fixed size pool of threads that dispatch work.
31 template <typename task_t, typename data_t>
33 {
34 public:
35  teca_cpu_thread_pool() = delete;
36 
37  /** construct/destruct the thread pool.
38  * arguments:
39  * @param[in] comm communicator over which to map threads. Use
40  * MPI_COMM_SELF for local mapping.
41  *
42  * @param[in] n number of threads to create for the pool. -1 will
43  * create 1 thread per physical CPU core. all MPI
44  * ranks running on the same node are taken into
45  * account, resulting in 1 thread per core node wide.
46  *
47  * @param[in] bind bind each thread to a specific core.
48  *
49  * @param[in] verbose print a report of the thread to core bindings
50  */
51  teca_cpu_thread_pool(MPI_Comm comm, int n, bool bind, bool verbose);
52  ~teca_cpu_thread_pool() noexcept;
53 
54  // get rid of copy and asignment
55  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_cpu_thread_pool)
56 
57  /** add a data request task to the queue, returns a future from which the
58  * generated dataset can be accessed.
59  */
60  void push_task(task_t &task);
61 
62  /** wait for all of the requests to execute and transfer datasets in the
63  * order that corresponding requests were added to the queue.
64  */
65  template <template <typename ... > class container_t, typename ... args>
66  void wait_all(container_t<data_t, args ...> &data);
67 
68  /** wait for some of the requests to execute. datasets will be retruned as
69  * they become ready. n_to_wait specifies how many datasets to gather but
70  * there are three cases when the number of datasets returned differs from
71  * n_to_wait. when n_to_wait is larger than the number of tasks remaining,
72  * datasets from all of the remaining tasks is returned. when n_to_wait is
73  * smaller than the number of datasets ready, all of the currenttly ready
74  * data are returned. finally, when n_to_wait is < 1 the call blocks until
75  * all of the tasks complete and all of the data is returned.
76  */
77  template <template <typename ... > class container_t, typename ... args>
78  int wait_some(long n_to_wait, long long poll_interval,
79  container_t<data_t, args ...> &data);
80 
81  /// get the number of threads
82  unsigned int size() const noexcept
83  { return m_threads.size(); }
84 
85 private:
86  /// create n threads for the pool
87  void create_threads(MPI_Comm comm, int n_threads, bool bind, bool verbose);
88 
89 private:
90  std::atomic<bool> m_live;
92  std::vector<std::future<data_t>> m_futures;
93  std::vector<std::thread> m_threads;
94 };
95 
96 // --------------------------------------------------------------------------
97 template <typename task_t, typename data_t>
99  bool bind, bool verbose) : m_live(true)
100 {
101  this->create_threads(comm, n, bind, verbose);
102 }
103 
104 // --------------------------------------------------------------------------
105 template <typename task_t, typename data_t>
107  int n_requested, bool bind, bool verbose)
108 {
109 #if defined(TECA_HAS_MPI)
110  // this rank is excluded from computations
111  if (comm == MPI_COMM_NULL)
112  return;
113 #endif
114 
115  int n_threads = n_requested;
116 
117  std::deque<int> core_ids;
118  std::vector<int> device_ids;
119 
121  n_requested, bind, -1, verbose, n_threads, core_ids,
122  device_ids))
123  {
124  TECA_WARNING("Failed to detetermine thread parameters."
125  " Falling back to 1 thread, affinity disabled.")
126 
127  n_threads = 1;
128  bind = false;
129  }
130 
131  // allocate the CPU processing threads
132  for (int i = 0; i < n_threads; ++i)
133  {
134  m_threads.push_back(std::thread([this]()
135  {
136  // "main" for each thread in the pool
137  while (m_live.load())
138  {
139  task_t task;
140  if (m_queue.try_pop(task))
141  task(-1);
142  else
143  std::this_thread::yield();
144  }
145  }));
146 #if defined(_GNU_SOURCE)
147  // bind each to a hyperthread
148  if (bind)
149  {
150  int core_id = core_ids.front();
151  core_ids.pop_front();
152 
153  cpu_set_t core_mask;
154  CPU_ZERO(&core_mask);
155  CPU_SET(core_id, &core_mask);
156 
157  if (pthread_setaffinity_np(m_threads[i].native_handle(),
158  sizeof(cpu_set_t), &core_mask))
159  {
160  TECA_WARNING("Failed to set thread affinity.")
161  }
162  }
163 #endif
164  }
165 }
166 
167 // --------------------------------------------------------------------------
168 template <typename task_t, typename data_t>
170 {
171  m_live = false;
172  std::for_each(m_threads.begin(), m_threads.end(),
173  [](std::thread &t) { t.join(); });
174 }
175 
176 // --------------------------------------------------------------------------
177 template <typename task_t, typename data_t>
179 {
180  m_futures.push_back(task.get_future());
181  m_queue.push(std::move(task));
182 }
183 
184 // --------------------------------------------------------------------------
185 template <typename task_t, typename data_t>
186 template <template <typename ... > class container_t, typename ... args>
187 int teca_cpu_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
188  long long poll_interval, container_t<data_t, args ...> &data)
189 {
190  long n_tasks = m_futures.size();
191 
192  // wait for all
193  if (n_to_wait < 1)
194  {
195  this->wait_all(data);
196  return 0;
197  }
198  // wait for at most the number of queued tasks
199  else if (n_to_wait > n_tasks)
200  n_to_wait = n_tasks;
201 
202 
203  // gather the requested number of datasets
204  while (1)
205  {
206  // scan the tasks once. capture any data that is ready
207  auto it = m_futures.begin();
208  while (it != m_futures.end())
209  {
210  std::future_status stat = it->wait_for(std::chrono::seconds::zero());
211  if (stat == std::future_status::ready)
212  {
213  data.push_back(it->get());
214  it = m_futures.erase(it);
215  }
216  else
217  {
218  ++it;
219  }
220  }
221 
222  // if we have not accumulated the requested number of datasets
223  // wait for the user supplied duration before re-scanning
224  if (data.size() < static_cast<unsigned int>(n_to_wait))
225  std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
226  else
227  break;
228  }
229 
230  // return the number of tasks remaining
231  return m_futures.size();
232 }
233 
234 // --------------------------------------------------------------------------
235 template <typename task_t, typename data_t>
236 template <template <typename ... > class container_t, typename ... args>
237 void teca_cpu_thread_pool<task_t, data_t>::wait_all(container_t<data_t, args ...> &data)
238 {
239  // wait on all pending requests and gather the generated
240  // datasets
241  std::for_each(m_futures.begin(), m_futures.end(),
242  [&data] (std::future<data_t> &f)
243  {
244  data.push_back(f.get());
245  });
246  m_futures.clear();
247 }
248 
249 #endif
teca_cpu_thread_pool::size
unsigned int size() const noexcept
get the number of threads
Definition: teca_cpu_thread_pool.h:82
teca_cpu_thread_pool::push_task
void push_task(task_t &task)
Definition: teca_cpu_thread_pool.h:178
teca_thread_util.h
teca_common.h
teca_threadsafe_queue< task_t >
teca_thread_util::thread_parameters
TECA_EXPORT int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, int n_threads_per_device, bool bind, bool verbose, int &n_threads, std::deque< int > &affinity, std::vector< int > &device_ids)
teca_cpu_thread_pool
A class to manage a fixed size pool of threads that dispatch work.
Definition: teca_cpu_thread_pool.h:25
TECA_WARNING
#define TECA_WARNING(_msg)
Constructs a warning message and sends it to the stderr stream.
Definition: teca_common.h:149
teca_error::TECA_EXPORT
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.