TECA
The Toolkit for Extreme Climate Analysis
teca_threaded_algorithm.h
1 #ifndef teca_threaded_algorithm_h
2 #define teca_threaded_algorithm_h
3 
4 #include "teca_config.h"
5 #include "teca_algorithm.h"
6 #include "teca_dataset.h"
7 #include "teca_shared_object.h"
8 
9 #include <thread>
10 #include <future>
11 
12 #if defined(TECA_HAS_CUDA)
13 template <typename task_t, typename data_t>
15 #else
16 template <typename task_t, typename data_t>
18 #endif
19 
20 class teca_metadata;
21 class teca_threaded_algorithm_internals;
22 
23 TECA_SHARED_OBJECT_FORWARD_DECL(teca_threaded_algorithm)
24 
25 /// Task type for tasks returing a pointer to teca_dataset
26 using teca_data_request_task = std::packaged_task<const_p_teca_dataset(int)>;
27 
28 class teca_data_request;
29 
30 /// A thread pool for processing teca_data_request_task
31 #if defined(TECA_HAS_CUDA)
34 #else
37 #endif
38 
39 /// A pointer to teca_data_request_queue
40 using p_teca_data_request_queue = std::shared_ptr<teca_data_request_queue>;
41 
42 /** Allocate and initialize a new thread pool.
43  *
44  * @param comm[in] The communicator to allocate thread across
45  * @param n_threads[in] The number of threads to create per MPI rank. Use -1 to
46  * map one thread per physical core on each node.
47  * @param threads_per_device[in] The number of threads to assign to servicing
48  * each GPU/device.
49  * @param ranks_per_device[in] The number of ranks allowed to access each GPU/device.
50  * @param bind[in] If set then thread will be bound to a specific core.
51  * @param verbose[in] If set then the mapping is sent to the stderr
52  */
54 p_teca_data_request_queue new_teca_data_request_queue(MPI_Comm comm,
55  int n_threads, int threads_per_device, int ranks_per_device, bool bind,
56  bool verbose);
57 
58 /// This is the base class defining a threaded algorithm.
59 /** The strategy employed is to parallelize over upstream data requests using a
60  * thread pool. Implementations override teca_algorithm::get_output_metadata,
61  * teca_algorithm::get_upstream_request, and teca_algorithm::execute. Pipeline
62  * execution is parallelized over the set of requests returned from the
63  * teca_algorithm::get_upstream_request override. The generated data is then
64  * fed incrementally to the teca_algorithm::execute override as it arrives in
65  * at least stream_size increments. Alternatively the generated data can be
66  * collected and fed to the execute override in one call. However, processing
67  * the data in one call is both slower and has a higher memory footprint making
68  * it prohibitive in many situations.
69  */
71 {
72 public:
73  TECA_ALGORITHM_STATIC_NEW(teca_threaded_algorithm)
74  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_threaded_algorithm)
75  TECA_ALGORITHM_CLASS_NAME(teca_threaded_algorithm)
76  virtual ~teca_threaded_algorithm() noexcept;
77 
78  // report/initialize to/from Boost program options
79  // objects.
80  TECA_GET_ALGORITHM_PROPERTIES_DESCRIPTION()
81  TECA_SET_ALGORITHM_PROPERTIES()
82 
83  /** Set the number of threads in the pool. setting to -1 results in a
84  * thread per core factoring in all MPI ranks running on the node.
85  */
86  void set_thread_pool_size(int n_threads);
87 
88  /// Get the number of threads in the pool.
89  unsigned int get_thread_pool_size() const noexcept;
90 
91  /** @name bind_threads
92  * set/get thread affinity mode. When 0 threads are not bound CPU cores,
93  * allowing for migration among all cores. This will likely degrade
94  * performance. Default is 1.
95  */
96  ///@{
97  TECA_ALGORITHM_PROPERTY(int, bind_threads)
98  ///@}
99 
100  /** @name stream_size
101  * set the smallest number of datasets to gather per call to execute. the
102  * default (-1) results in all datasets being gathered. In practice more
103  * datasets will be returned if ready
104  */
105  ///@{
106  TECA_ALGORITHM_PROPERTY(int, stream_size)
107  ///@}
108 
109  /** @name poll_interval
110  * set the duration in nanoseconds to wait between checking for completed
111  * tasks
112  */
113  ///@{
114  TECA_ALGORITHM_PROPERTY(long long, poll_interval)
115  ///@}
116 
117  /** @name threads_per_device
118  * Set the number of threads to service each GPU/device. Other threads will
119  * use the CPU.
120  */
121  ///@{
122  TECA_ALGORITHM_PROPERTY(int, threads_per_device)
123  ///@}
124 
125  /** @name ranks_per_device
126  * Set the number of ranks that have access to each GPU/device. Other ranks
127  * will use the CPU.
128  */
129  ///@{
130  TECA_ALGORITHM_PROPERTY(int, ranks_per_device)
131  ///@}
132 
133  /// explicitly set the thread pool to submit requests to
134  void set_data_request_queue(const p_teca_data_request_queue &queue);
135 
136  /** @name propagate_device_assignment
137  * When set device assignment is taken from down stream request.
138  * Otherwise the thread executing the pipeline will provide the assignment.
139  */
140  ///@{
141  TECA_ALGORITHM_PROPERTY(int, propagate_device_assignment)
142  ///@}
143 
144 protected:
146 
147  // streaming execute. streaming flag will be set when there is more
148  // data to process. it is not safe to use MPI when the streaming flag
149  // is set. on the last call streaming flag will not be set, at that
150  // point MPI may be used.
151  virtual
152  const_p_teca_dataset execute(unsigned int port,
153  const std::vector<const_p_teca_dataset> &input_data,
154  const teca_metadata &request, int streaming);
155 
156  // forward to streaming execute
157  const_p_teca_dataset execute(unsigned int port,
158  const std::vector<const_p_teca_dataset> &input_data,
159  const teca_metadata &request) override;
160 
161  // driver function that manages execution of the given
162  // request on the named port. each upstream request issued
163  // will be executed by the thread pool.
164  const_p_teca_dataset request_data(teca_algorithm_output_port &port,
165  const teca_metadata &request) override;
166 
167 private:
168  int bind_threads;
169  int stream_size;
170  long long poll_interval;
171  int threads_per_device;
172  int ranks_per_device;
173  int propagate_device_assignment;
174 
175  teca_threaded_algorithm_internals *internals;
176 };
177 
178 #endif
The interface to TECA pipeline architecture.
Definition: teca_algorithm.h:244
A class to manage a fixed size pool of threads that dispatch work.
Definition: teca_cpu_thread_pool.h:34
A class to manage a fixed size pool of threads that dispatch work.
Definition: teca_cuda_thread_pool.h:54
A generic container for meta data in the form of name=value pairs.
Definition: teca_metadata.h:22
This is the base class defining a threaded algorithm.
Definition: teca_threaded_algorithm.h:71
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.