TECA
The Toolkit for Extreme Climate Analysis
teca_thread_pool.h
1 #ifndef teca_thread_pool_h
2 #define teca_thread_pool_h
3 
4 #include "teca_common.h"
5 #include "teca_algorithm.h"
6 #include "teca_thread_util.h"
7 #include "teca_threadsafe_queue.h"
8 #include "teca_mpi.h"
9 
10 #include <vector>
11 #include <thread>
12 #include <atomic>
13 #include <mutex>
14 #include <future>
15 #include <chrono>
16 #include <algorithm>
17 #if defined(_GNU_SOURCE)
18 #include <pthread.h>
19 #include <sched.h>
20 #include <deque>
21 #endif
22 
23 template <typename task_t, typename data_t>
25 
26 template <typename task_t, typename data_t>
27 using p_teca_thread_pool = std::shared_ptr<teca_thread_pool<task_t, data_t>>;
28 
29 /// A class to manage a fixed size pool of threads that dispatch I/O work.
30 template <typename task_t, typename data_t>
31 class teca_thread_pool
32 {
33 public:
34  teca_thread_pool() = delete;
35 
36  // construct/destruct the thread pool.
37  // arguments:
38  // comm communicator over which to map threads. Use MPI_COMM_SELF
39  // for local mapping.
40  //
41  // n number of threads to create for the pool. -1 will
42  // create 1 thread per physical CPU core. all MPI ranks running
43  // on the same node are taken into account, resulting in 1
44  // thread per core node wide.
45  //
46  // bind bind each thread to a specific core.
47  //
48  // verbose print a report of the thread to core bindings
49  teca_thread_pool(MPI_Comm comm, int n, bool bind, bool verbose);
50  ~teca_thread_pool() noexcept;
51 
52  // get rid of copy and asignment
53  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_thread_pool)
54 
55  // add a data request task to the queue, returns a future
56  // from which the generated dataset can be accessed.
57  void push_task(task_t &task);
58 
59  // wait for all of the requests to execute and transfer
60  // datasets in the order that corresponding requests
61  // were added to the queue.
62  template <template <typename ... > class container_t, typename ... args>
63  void wait_all(container_t<data_t, args ...> &data);
64 
65  // wait for some of the requests to execute. datasets will be retruned as
66  // they become ready. n_to_wait specifies how many datasets to gather but
67  // there are three cases when the number of datasets returned differs from
68  // n_to_wait. when n_to_wait is larger than the number of tasks remaining,
69  // datasets from all of the remaining tasks is returned. when n_to_wait is
70  // smaller than the number of datasets ready, all of the currenttly ready
71  // data are returned. finally, when n_to_wait is < 1 the call blocks until
72  // all of the tasks complete and all of the data is returned.
73  template <template <typename ... > class container_t, typename ... args>
74  int wait_some(long n_to_wait, long long poll_interval,
75  container_t<data_t, args ...> &data);
76 
77  // get the number of threads
78  unsigned int size() const noexcept
79  { return m_threads.size(); }
80 
81 private:
82  // create n threads for the pool
83  void create_threads(MPI_Comm comm, int n_threads, bool bind, bool verbose);
84 
85 private:
86  std::atomic<bool> m_live;
88 
89  std::vector<std::future<data_t>>
90  m_futures;
91 
92  std::vector<std::thread> m_threads;
93 };
94 
95 // --------------------------------------------------------------------------
96 template <typename task_t, typename data_t>
98  bool bind, bool verbose) : m_live(true)
99 {
100  this->create_threads(comm, n, bind, verbose);
101 }
102 
103 // --------------------------------------------------------------------------
104 template <typename task_t, typename data_t>
106  int n_requested, bool bind, bool verbose)
107 {
108  // this rank is excluded from computations
109  if (comm == MPI_COMM_NULL)
110  return;
111 
112  int n_threads = n_requested;
113 
114  std::deque<int> core_ids;
115 
117  n_requested, bind, verbose, n_threads, core_ids))
118  {
119  TECA_WARNING("Failed to detetermine thread parameters."
120  " Falling back to 1 thread, affinity disabled.")
121 
122  n_threads = 1;
123  bind = false;
124  }
125 
126  // allocate the threads
127  for (int i = 0; i < n_threads; ++i)
128  {
129  m_threads.push_back(std::thread([this]()
130  {
131  // "main" for each thread in the pool
132  while (m_live.load())
133  {
134  task_t task;
135  if (m_queue.try_pop(task))
136  task();
137  else
138  std::this_thread::yield();
139  }
140  }));
141 #if defined(_GNU_SOURCE)
142  // bind each to a hyperthread
143  if (bind)
144  {
145  int core_id = core_ids.front();
146  core_ids.pop_front();
147 
148  cpu_set_t core_mask;
149  CPU_ZERO(&core_mask);
150  CPU_SET(core_id, &core_mask);
151 
152  if (pthread_setaffinity_np(m_threads[i].native_handle(),
153  sizeof(cpu_set_t), &core_mask))
154  {
155  TECA_WARNING("Failed to set thread affinity.")
156  }
157  }
158 #endif
159  }
160 }
161 
162 // --------------------------------------------------------------------------
163 template <typename task_t, typename data_t>
165 {
166  m_live = false;
167  std::for_each(m_threads.begin(), m_threads.end(),
168  [](std::thread &t) { t.join(); });
169 }
170 
171 // --------------------------------------------------------------------------
172 template <typename task_t, typename data_t>
174 {
175  m_futures.push_back(task.get_future());
176  m_queue.push(std::move(task));
177 }
178 
179 // --------------------------------------------------------------------------
180 template <typename task_t, typename data_t>
181 template <template <typename ... > class container_t, typename ... args>
182 int teca_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
183  long long poll_interval, container_t<data_t, args ...> &data)
184 {
185  long n_tasks = m_futures.size();
186 
187  // wait for all
188  if (n_to_wait < 1)
189  {
190  this->wait_all(data);
191  return 0;
192  }
193  // wait for at most the number of queued tasks
194  else if (n_to_wait > n_tasks)
195  n_to_wait = n_tasks;
196 
197 
198  // gather the requested number of datasets
199  while (1)
200  {
201  // scan the tasks once. capture any data that is ready
202  auto it = m_futures.begin();
203  while (it != m_futures.end())
204  {
205  std::future_status stat = it->wait_for(std::chrono::seconds::zero());
206  if (stat == std::future_status::ready)
207  {
208  data.push_back(it->get());
209  it = m_futures.erase(it);
210  }
211  else
212  {
213  ++it;
214  }
215  }
216 
217  // if we have not accumulated the requested number of datasets
218  // wait for the user supplied duration before re-scanning
219  if (data.size() < static_cast<unsigned int>(n_to_wait))
220  std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
221  else
222  break;
223  }
224 
225  // return the number of tasks remaining
226  return m_futures.size();
227 }
228 
229 // --------------------------------------------------------------------------
230 template <typename task_t, typename data_t>
231 template <template <typename ... > class container_t, typename ... args>
232 void teca_thread_pool<task_t, data_t>::wait_all(container_t<data_t, args ...> &data)
233 {
234  // wait on all pending requests and gather the generated
235  // datasets
236  std::for_each(m_futures.begin(), m_futures.end(),
237  [&data] (std::future<data_t> &f)
238  {
239  data.push_back(f.get());
240  });
241  m_futures.clear();
242 }
243 
244 #endif
teca_thread_pool
A class to manage a fixed size pool of threads that dispatch I/O work.
Definition: teca_thread_pool.h:24
teca_thread_util.h
teca_common.h
teca_threadsafe_queue< task_t >
teca_thread_util::thread_parameters
int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, bool bind, bool verbose, int &n_threads, std::deque< int > &affinity)
TECA_WARNING
#define TECA_WARNING(_msg)
Constructs a warning message and sends it to the stderr stream.
Definition: teca_common.h:141