1 #ifndef teca_cuda_thread_pool_h
2 #define teca_cuda_thread_pool_h
4 #include "teca_config.h"
6 #include "teca_algorithm.h"
8 #include "teca_threadsafe_queue.h"
9 #include "teca_owned_future.h"
20 #if defined(_GNU_SOURCE)
27 template <
typename task_t,
typename data_t>
31 template <
typename task_t,
typename data_t>
32 using p_teca_cuda_thread_pool = std::shared_ptr<teca_cuda_thread_pool<task_t, data_t>>;
52 template <
typename task_t,
typename data_t>
83 int threads_per_device,
int ranks_per_device,
bool bind,
92 void push_task(task_t &task);
97 template <
template <
typename ... >
class container_t, typename ... args>
98 void wait_all(container_t<data_t, args ...> &
data);
109 template <
template <
typename ... >
class container_t, typename ... args>
110 int wait_some(long n_to_wait, long long poll_interval,
111 container_t<data_t, args ...> &
data);
114 unsigned int size() const noexcept
115 {
return m_threads.size(); }
119 void create_threads(MPI_Comm comm,
int n_threads,
120 int threads_per_device,
int ranks_per_device,
bool bind,
126 std::atomic<bool> m_live;
128 std::vector<owned_future<data_t>> m_futures;
129 std::vector<std::thread> m_threads;
133 template <
typename task_t,
typename data_t>
135 int n_threads,
int threads_per_device,
int ranks_per_device,
bool bind,
136 bool verbose) : m_live(true)
138 this->create_threads(comm, n_threads, threads_per_device,
139 ranks_per_device, bind, verbose);
143 template <
typename task_t,
typename data_t>
145 int n_requested,
int threads_per_device,
int ranks_per_device,
bool bind,
150 int n_threads = n_requested;
153 std::deque<int> core_ids;
154 std::vector<int> device_ids;
157 threads_per_device, ranks_per_device, bind, verbose, n_threads,
158 core_ids, device_ids))
161 " Falling back to 1 thread, affinity disabled.")
167 for (
int i = 0; i < n_threads; ++i)
169 int device_id = device_ids[i];
170 m_threads.push_back(std::thread([
this, device_id]()
173 while (m_live.load())
176 if (m_queue.try_pop(task))
179 std::this_thread::yield();
182 #if defined(_GNU_SOURCE)
186 int core_id = core_ids[i];
189 CPU_ZERO(&core_mask);
190 CPU_SET(core_id, &core_mask);
192 if (pthread_setaffinity_np(m_threads[i].native_handle(),
193 sizeof(cpu_set_t), &core_mask))
203 template <
typename task_t,
typename data_t>
207 std::for_each(m_threads.begin(), m_threads.end(),
208 [](std::thread &t) { t.join(); });
212 template <
typename task_t,
typename data_t>
215 std::lock_guard<std::mutex> lock(m_mutex);
219 m_futures.push_back(task.get_future());
220 m_queue.push(std::move(task));
224 template <
typename task_t,
typename data_t>
225 template <
template <
typename ... >
class container_t, typename ... args>
227 long long poll_interval, container_t<data_t, args ...> &data)
232 this->wait_all(
data);
237 size_t thread_valid = 1;
238 while (thread_valid && ((
data.size() <
static_cast<unsigned int>(n_to_wait))))
242 std::lock_guard<std::mutex> lock(m_mutex);
244 for (
auto it = m_futures.begin(); it != m_futures.end(); ++it)
246 if (it->owner() && it->m_future.valid())
248 if ((it->m_future.wait_for(std::chrono::seconds::zero())
249 == std::future_status::ready))
251 data.push_back(it->m_future.get());
263 if (
data.size() >=
static_cast<unsigned int>(n_to_wait))
268 std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
274 std::lock_guard<std::mutex> lock(m_mutex);
284 template <
typename task_t,
typename data_t>
285 template <
template <
typename ... >
class container_t, typename ... args>
289 std::lock_guard<std::mutex> lock(m_mutex);
291 std::for_each(m_futures.begin(), m_futures.end(),
296 data.push_back(f.m_future.get());
305 std::lock_guard<std::mutex> lock(m_mutex);
A class to manage a fixed size pool of threads that dispatch work.
Definition: teca_cuda_thread_pool.h:54
void push_task(task_t &task)
Definition: teca_cuda_thread_pool.h:213
unsigned int size() const noexcept
get the number of threads
Definition: teca_cuda_thread_pool.h:114
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.
TECA_EXPORT int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, int n_threads_per_device, int n_ranks_per_device, bool bind, bool verbose, int &n_threads, std::deque< int > &affinity, std::vector< int > &device_ids)
auto data(V &&... args)
Definition: teca_variant_array_util.h:255
a future that is owned by a single thread
Definition: teca_owned_future.h:10
#define TECA_WARNING(_msg)
Constructs a warning message and sends it to the stderr stream.
Definition: teca_common.h:164