TECA
The Toolkit for Extreme Climate Analysis
teca_threaded_programmable_algorithm.h
1 #ifndef teca_threaded_programmable_algorithm_h
2 #define teca_threaded_programmable_algorithm_h
3 
4 #include "teca_metadata.h"
5 #include "teca_dataset.h"
6 #include "teca_threaded_algorithm.h"
7 #include "teca_programmable_algorithm.h"
8 
9 /// An threaded algorithm implemented with user provided callbacks.
10 /**
11  * This version of the teca_programmable_algorithm is threaded. A thread pool
12  * (call set_thread_pool_size to initialize) executes the upstream pipeline
13  * asynchronously for each request made. Hence, this version of the
14  * programmable algorithm is most useful when there are multiple requests to
15  * be processed. Data from the set of requests can be processed incrementally
16  * when streaming (see set_stream_size to initialize). If one doesn't need
17  * these features it is better to use the teca_programmable_algorithm instead.
18  * See teca_threaded_algorithm for more details about threaded execution.
19  *
20  * The user can provide a callback for each of the three phases
21  * of pipeline execution. The number of input and output ports
22  * can also be set for filters (1 or more inputs, 1 or more outputs)
23  * sources, (no inputs, 1 or more outputs), or sinks (1 or more
24  * inputs, no outputs).
25  *
26  * 1) report phase. the report callback returns metadata
27  * describing data that can be produced. The report callback
28  * is optional. It's only needed if the algorithm will produce
29  * new data or transform metadata.
30  *
31  * the report callback must be callable with signature:
32  * teca_metadata(unsigned int)
33  *
34  * 2) request phase. the request callback generates a vector of
35  * requests(metadata objects) that inform the upstream of
36  * what data to generate. The request callback is optional.
37  * It's only needed if the algorithm needs data from the
38  * upstream or transform metadata.
39  *
40  * the request callback must be callable with the signature:
41  * std::vector<teca_metadata>(
42  * unsigned int,
43  * const std::vector<teca_metadata> &,
44  * const teca_metadata &)
45  *
46  * 3) execute phase. the execute callback is used to do useful
47  * work on incoming or outgoing data. Examples include
48  * generating new datasets, processing datasets, reading
49  * and writing data to/from disk, and so on. The execute
50  * callback is optional.
51  *
52  * the execute callback must be callable with the signature:
53  * const_p_teca_dataset(
54  * unsigned int, const std::vector<const_p_teca_dataset> &,
55  * const teca_metadata &, int)
56  *
57  * see also:
58  *
59  * set_number_of_input_connections
60  * set_number_of_output_ports
61  * set_report_callback
62  * set_request_callback
63  * set_execute_callback
64  */
66 {
67 public:
68  TECA_ALGORITHM_STATIC_NEW(teca_threaded_programmable_algorithm)
69  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_threaded_programmable_algorithm)
71 
72  // set/get the class name.
73  virtual int set_name(const std::string &name);
74 
75  const char *get_class_name() const override
76  { return this->class_name; }
77 
78  // set the number of input and outputs
81 
82  // set the number of threads. The default is -1.
84 
85  // set the stream size. the default -1 disables streaming.
86  // you should set this (2 is a good choice) if your algorithm
87  // can stream data.
89 
90  // install the default implementation
91  void use_default_report_action();
92  void use_default_request_action();
93  void use_default_execute_action();
94 
95  // set callback that responds to reporting stage
96  // of pipeline execution. the report callback must
97  // be callable with signature:
98  //
99  // teca_metadata (unsigned int,
100  // const std::vector<teca_metadata> &)
101  //
102  // the default implementation forwards downstream
103  TECA_ALGORITHM_CALLBACK_PROPERTY(
104  report_callback_t, report_callback)
105 
106  // set the callback that responds to the requesting
107  // stage of pipeline execution. the request callback
108  // must be callable with the signature:
109  //
110  // std::vector<teca_metadata> (
111  // unsigned int,
112  // const std::vector<teca_metadata> &,
113  // const teca_metadata &)
114  //
115  // the default implementation forwards upstream
116  TECA_ALGORITHM_CALLBACK_PROPERTY(
117  request_callback_t, request_callback)
118 
119  // set the callback that responds to the execution stage
120  // of pipeline execution. the execute callback must be
121  // callable with the signature:
122  //
123  // const_p_teca_dataset (
124  // unsigned int, const std::vector<const_p_teca_dataset> &,
125  // const teca_metadata &, int)
126  //
127  // the default implementation returns a nullptr
128  TECA_ALGORITHM_CALLBACK_PROPERTY(
129  threaded_execute_callback_t, execute_callback)
130 
131 protected:
133 
134 private:
135  teca_metadata get_output_metadata(unsigned int port,
136  const std::vector<teca_metadata> &input_md) override;
137 
138  std::vector<teca_metadata> get_upstream_request(
139  unsigned int port,
140  const std::vector<teca_metadata> &input_md,
141  const teca_metadata &request) override;
142 
143  const_p_teca_dataset execute(unsigned int port,
144  const std::vector<const_p_teca_dataset> &input_data,
145  const teca_metadata &request, int streaming) override;
146 
147 protected:
148  report_callback_t report_callback;
149  request_callback_t request_callback;
150  threaded_execute_callback_t execute_callback;
151  char class_name[64];
152 };
153 
154 #endif
teca_metadata
A generic container for meta data in the form of name=value pairs.
Definition: teca_metadata.h:18
teca_threaded_programmable_algorithm::get_class_name
const char * get_class_name() const override
Definition: teca_threaded_programmable_algorithm.h:75
teca_threaded_programmable_algorithm
An threaded algorithm implemented with user provided callbacks.
Definition: teca_threaded_programmable_algorithm.h:65
teca_algorithm::set_number_of_input_connections
void set_number_of_input_connections(unsigned int n)
teca_threaded_algorithm::set_stream_size
void set_stream_size(const int &v)
Definition: teca_threaded_algorithm.h:89
teca_threaded_algorithm::set_thread_pool_size
void set_thread_pool_size(int n_threads)
teca_threaded_algorithm
This is the base class defining a threaded algorithm.
Definition: teca_threaded_algorithm.h:46
teca_algorithm::set_number_of_output_ports
void set_number_of_output_ports(unsigned int n)