TECA
The Toolkit for Extreme Climate Analysis
teca_threaded_algorithm.h
1 #ifndef teca_threaded_algorithm_h
2 #define teca_threaded_algorithm_h
3 
4 #include "teca_config.h"
5 #include "teca_algorithm.h"
6 #include "teca_dataset.h"
7 #include "teca_shared_object.h"
8 
9 #include <thread>
10 #include <future>
11 
12 #if defined(TECA_HAS_CUDA)
13 template <typename task_t, typename data_t>
15 #else
16 template <typename task_t, typename data_t>
18 #endif
19 
20 class teca_metadata;
21 class teca_threaded_algorithm_internals;
22 
23 TECA_SHARED_OBJECT_FORWARD_DECL(teca_threaded_algorithm)
24 
25 /// Task type for tasks returing a pointer to teca_dataset
26 using teca_data_request_task = std::packaged_task<const_p_teca_dataset(int)>;
27 
28 class teca_data_request;
29 
30 /// A thread pool for processing teca_data_request_task
31 #if defined(TECA_HAS_CUDA)
34 #else
37 #endif
38 
39 /// A pointer to teca_data_request_queue
40 using p_teca_data_request_queue = std::shared_ptr<teca_data_request_queue>;
41 
42 /** Allocate and initialize a new thread pool.
43  * @param comm[in] The communicator to allocate thread across
44  * @param n_threads[in] The number of threads to create per MPI rank. Use -1 to
45  * map one thread per physical core on each node.
46  * @param n_threads_per_device[in] The number of threads to assign to servicing
47  * each CUDA device. -1 for all threads.
48  * @param bind[in] If set then thread will be bound to a specific core.
49  * @param verbose[in] If set then the mapping is sent to the stderr
50  */
52 p_teca_data_request_queue new_teca_data_request_queue(MPI_Comm comm,
53  int n_threads, int n_threads_per_device, bool bind, bool verbose);
54 
55 /// This is the base class defining a threaded algorithm.
56 /**
57  * The strategy employed is to parallelize over upstream
58  * data requests using a thread pool.
59  */
61 {
62 public:
63  TECA_ALGORITHM_STATIC_NEW(teca_threaded_algorithm)
64  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_threaded_algorithm)
65  TECA_ALGORITHM_CLASS_NAME(teca_threaded_algorithm)
66  virtual ~teca_threaded_algorithm() noexcept;
67 
68  // report/initialize to/from Boost program options
69  // objects.
70  TECA_GET_ALGORITHM_PROPERTIES_DESCRIPTION()
71  TECA_SET_ALGORITHM_PROPERTIES()
72 
73  /** Set the number of threads in the pool. setting to -1 results in a
74  * thread per core factoring in all MPI ranks running on the node.
75  */
76  void set_thread_pool_size(int n_threads);
77 
78  /// Get the number of threads in the pool.
79  unsigned int get_thread_pool_size() const noexcept;
80 
81  /** @name verbose
82  * set/get the verbosity level.
83  */
84  ///@{
85  TECA_ALGORITHM_PROPERTY(int, verbose)
86  ///@}
87 
88  /** @name bind_threads
89  * set/get thread affinity mode. When 0 threads are not bound CPU cores,
90  * allowing for migration among all cores. This will likely degrade
91  * performance. Default is 1.
92  */
93  ///@{
94  TECA_ALGORITHM_PROPERTY(int, bind_threads)
95  ///@}
96 
97  /** @name stream_size
98  * set the smallest number of datasets to gather per call to execute. the
99  * default (-1) results in all datasets being gathered. In practice more
100  * datasets will be returned if ready
101  */
102  ///@{
103  TECA_ALGORITHM_PROPERTY(int, stream_size)
104  ///@}
105 
106  /** @name poll_interval
107  * set the duration in nanoseconds to wait between checking for completed
108  * tasks
109  */
110  ///@{
111  TECA_ALGORITHM_PROPERTY(long long, poll_interval)
112  ///@}
113 
114  /** @name threads_per_cuda_device
115  * set the number of threads to service each CUDA device.
116  */
117  ///@{
118  TECA_ALGORITHM_PROPERTY(int, threads_per_cuda_device)
119  ///@}
120 
121  /// explicitly set the thread pool to submit requests to
122  void set_data_request_queue(const p_teca_data_request_queue &queue);
123 
124 protected:
126 
127  // streaming execute. streaming flag will be set when there is more
128  // data to process. it is not safe to use MPI when the streaming flag
129  // is set. on the last call streaming flag will not be set, at that
130  // point MPI may be used.
131  virtual
132  const_p_teca_dataset execute(unsigned int port,
133  const std::vector<const_p_teca_dataset> &input_data,
134  const teca_metadata &request, int streaming);
135 
136  // forward to streaming execute
137  const_p_teca_dataset execute(unsigned int port,
138  const std::vector<const_p_teca_dataset> &input_data,
139  const teca_metadata &request) override;
140 
141  // driver function that manages execution of the given
142  // request on the named port. each upstream request issued
143  // will be executed by the thread pool.
144  const_p_teca_dataset request_data(teca_algorithm_output_port &port,
145  const teca_metadata &request) override;
146 
147 private:
148  int bind_threads;
149  int stream_size;
150  long long poll_interval;
151  int threads_per_cuda_device;
152 
153  teca_threaded_algorithm_internals *internals;
154 };
155 
156 #endif
teca_metadata
A generic container for meta data in the form of name=value pairs.
Definition: teca_metadata.h:21
teca_cpu_thread_pool
A class to manage a fixed size pool of threads that dispatch work.
Definition: teca_cpu_thread_pool.h:25
teca_shared_object.h
teca_cuda_thread_pool
A class to manage a fixed size pool of threads that dispatch work.
Definition: teca_cuda_thread_pool.h:27
teca_error::TECA_EXPORT
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.
teca_threaded_algorithm
This is the base class defining a threaded algorithm.
Definition: teca_threaded_algorithm.h:60
teca_algorithm
The interface to TECA pipeline architecture.
Definition: teca_algorithm.h:237