1 #ifndef teca_cuda_thread_pool_h
2 #define teca_cuda_thread_pool_h
4 #include "teca_config.h"
6 #include "teca_algorithm.h"
8 #include "teca_threadsafe_queue.h"
19 #if defined(_GNU_SOURCE)
26 template <
typename task_t,
typename data_t>
30 template <
typename task_t,
typename data_t>
31 using p_teca_cuda_thread_pool = std::shared_ptr<teca_cuda_thread_pool<task_t, data_t>>;
51 template <
typename task_t,
typename data_t>
77 int n_threads_per_device,
bool bind,
bool verbose);
85 void push_task(task_t &task);
90 template <template <typename ... > class container_t, typename ... args>
91 void wait_all(container_t<data_t, args ...> &data);
102 template <template <typename ... > class container_t, typename ... args>
103 int wait_some(
long n_to_wait,
long long poll_interval,
104 container_t<data_t, args ...> &data);
107 unsigned int size() const noexcept
108 {
return m_threads.
size(); }
112 void create_threads(MPI_Comm comm,
int n_threads,
113 int n_threads_per_device,
bool bind,
bool verbose);
116 std::atomic<bool> m_live;
118 std::vector<std::future<data_t>> m_futures;
119 std::vector<std::thread> m_threads;
123 template <
typename task_t,
typename data_t>
125 int n_threads,
int n_threads_per_device,
bool bind,
bool verbose) : m_live(true)
127 this->create_threads(comm, n_threads, n_threads_per_device, bind, verbose);
131 template <
typename task_t,
typename data_t>
133 int n_requested,
int n_per_device,
bool bind,
bool verbose)
135 int n_threads = n_requested;
138 std::deque<int> core_ids;
139 std::vector<int> device_ids;
142 n_requested, n_per_device, bind, verbose, n_threads,
143 core_ids, device_ids))
146 " Falling back to 1 thread, affinity disabled.")
152 for (
int i = 0; i < n_threads; ++i)
154 int device_id = device_ids[i];
155 m_threads.push_back(std::thread([
this, device_id]()
158 while (m_live.load())
161 if (m_queue.try_pop(task))
164 std::this_thread::yield();
167 #if defined(_GNU_SOURCE)
171 int core_id = core_ids[i];
174 CPU_ZERO(&core_mask);
175 CPU_SET(core_id, &core_mask);
177 if (pthread_setaffinity_np(m_threads[i].native_handle(),
178 sizeof(cpu_set_t), &core_mask))
188 template <
typename task_t,
typename data_t>
192 std::for_each(m_threads.begin(), m_threads.end(),
193 [](std::thread &t) { t.join(); });
197 template <
typename task_t,
typename data_t>
200 m_futures.push_back(task.get_future());
201 m_queue.push(std::move(task));
205 template <
typename task_t,
typename data_t>
206 template <
template <
typename ... >
class container_t, typename ... args>
208 long long poll_interval, container_t<data_t, args ...> &data)
210 long n_tasks = m_futures.
size();
215 this->wait_all(data);
219 else if (n_to_wait > n_tasks)
227 auto it = m_futures.begin();
228 while (it != m_futures.end())
230 std::future_status stat = it->wait_for(std::chrono::seconds::zero());
231 if (stat == std::future_status::ready)
233 data.push_back(it->get());
234 it = m_futures.erase(it);
244 if (data.size() <
static_cast<unsigned int>(n_to_wait))
245 std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
251 return m_futures.size();
255 template <
typename task_t,
typename data_t>
256 template <
template <
typename ... >
class container_t, typename ... args>
261 std::for_each(m_futures.begin(), m_futures.end(),
262 [&data] (std::future<data_t> &f)
264 data.push_back(f.get());