TECA
The Toolkit for Extreme Climate Analysis
teca_threaded_algorithm.h
1 #ifndef teca_threaded_algorithm_h
2 #define teca_threaded_algorithm_h
3 
4 #include "teca_algorithm.h"
5 #include "teca_dataset.h"
6 #include "teca_shared_object.h"
7 
8 #include <thread>
9 #include <future>
10 
11 template <typename task_t, typename data_t>
12 class teca_thread_pool;
13 
14 class teca_metadata;
15 class teca_threaded_algorithm_internals;
16 
17 TECA_SHARED_OBJECT_FORWARD_DECL(teca_threaded_algorithm)
18 
19 /// Task type for tasks returing a pointer to teca_dataset
20 using teca_data_request_task = std::packaged_task<const_p_teca_dataset()>;
21 
22 class teca_data_request;
23 
24 /// A thread pool for processing teca_data_request_task
27 
28 /// A pointer to teca_data_request_queue
29 using p_teca_data_request_queue = std::shared_ptr<teca_data_request_queue>;
30 
31 /** Allocate and initialize a new thread pool.
32  * comm [in] The communicator to allocate thread across
33  * n [in] The number of threads to create per MPI rank. Use -1 to
34  * map one thread per physical core on each node.
35  * bind [in] If set then thread will be bound to a specific core.
36  * verbose [in] If set then the mapping is sent to the stderr
37  */
38 p_teca_data_request_queue new_teca_data_request_queue(MPI_Comm comm,
39  int n, bool bind, bool verbose);
40 
41 /// This is the base class defining a threaded algorithm.
42 /**
43  * The strategy employed is to parallelize over upstream
44  * data requests using a thread pool.
45  */
47 {
48 public:
49  TECA_ALGORITHM_STATIC_NEW(teca_threaded_algorithm)
50  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_threaded_algorithm)
51  TECA_ALGORITHM_CLASS_NAME(teca_threaded_algorithm)
52  virtual ~teca_threaded_algorithm() noexcept;
53 
54  // report/initialize to/from Boost program options
55  // objects.
56  TECA_GET_ALGORITHM_PROPERTIES_DESCRIPTION()
57  TECA_SET_ALGORITHM_PROPERTIES()
58 
59  /** Set the number of threads in the pool. setting to -1 results in a
60  * thread per core factoring in all MPI ranks running on the node.
61  */
62  void set_thread_pool_size(int n_threads);
63 
64  /// Get the number of threads in the pool.
65  unsigned int get_thread_pool_size() const noexcept;
66 
67  /** @name verbose
68  * set/get the verbosity level.
69  */
70  ///@{
71  TECA_ALGORITHM_PROPERTY(int, verbose)
72  ///@}
73 
74  /** @name bind_threads
75  * set/get thread affinity mode. When 0 threads are not bound CPU cores,
76  * allowing for migration among all cores. This will likely degrade
77  * performance. Default is 1.
78  */
79  ///@{
80  TECA_ALGORITHM_PROPERTY(int, bind_threads)
81  ///@}
82 
83  /** @name stream_size
84  * set the smallest number of datasets to gather per call to execute. the
85  * default (-1) results in all datasets being gathered. In practice more
86  * datasets will be returned if ready
87  */
88  ///@{
89  TECA_ALGORITHM_PROPERTY(int, stream_size)
90  ///@}
91 
92  /** @name poll_interval
93  * set the duration in nanoseconds to wait between checking for completed
94  * tasks
95  */
96  ///@{
97  TECA_ALGORITHM_PROPERTY(long long, poll_interval)
98  ///@}
99 
100  // explicitly set the thread pool to submit requests to
101  void set_data_request_queue(const p_teca_data_request_queue &queue);
102 
103 protected:
105 
106  // streaming execute. streaming flag will be set when there is more
107  // data to process. it is not safe to use MPI when the streaming flag
108  // is set. on the last call streaming flag will not be set, at that
109  // point MPI may be used.
110  virtual
111  const_p_teca_dataset execute(unsigned int port,
112  const std::vector<const_p_teca_dataset> &input_data,
113  const teca_metadata &request, int streaming);
114 
115  // forward to streaming execute
116  const_p_teca_dataset execute(unsigned int port,
117  const std::vector<const_p_teca_dataset> &input_data,
118  const teca_metadata &request) override;
119 
120  // driver function that manages execution of the given
121  // request on the named port. each upstream request issued
122  // will be executed by the thread pool.
123  const_p_teca_dataset request_data(teca_algorithm_output_port &port,
124  const teca_metadata &request) override;
125 
126 private:
127  int bind_threads;
128  int stream_size;
129  long long poll_interval;
130 
131  teca_threaded_algorithm_internals *internals;
132 };
133 
134 #endif
teca_metadata
A generic container for meta data in the form of name=value pairs.
Definition: teca_metadata.h:18
teca_thread_pool
A class to manage a fixed size pool of threads that dispatch I/O work.
Definition: teca_thread_pool.h:24
teca_threaded_algorithm::get_thread_pool_size
unsigned int get_thread_pool_size() const noexcept
Get the number of threads in the pool.
teca_shared_object.h
teca_threaded_algorithm::set_thread_pool_size
void set_thread_pool_size(int n_threads)
teca_threaded_algorithm
This is the base class defining a threaded algorithm.
Definition: teca_threaded_algorithm.h:46
teca_algorithm
The interface to TECA pipeline architecture.
Definition: teca_algorithm.h:237