TECA
The Toolkit for Extreme Climate Analysis
teca_threaded_programmable_algorithm.h
1 #ifndef teca_threaded_programmable_algorithm_h
2 #define teca_threaded_programmable_algorithm_h
3 
4 #include "teca_config.h"
5 #include "teca_metadata.h"
6 #include "teca_dataset.h"
7 #include "teca_threaded_algorithm.h"
8 #include "teca_programmable_algorithm.h"
9 
10 /// An threaded algorithm implemented with user provided callbacks.
11 /**
12  * This version of the teca_programmable_algorithm is threaded. A thread pool
13  * (call set_thread_pool_size to initialize) executes the upstream pipeline
14  * asynchronously for each request made. Hence, this version of the
15  * programmable algorithm is most useful when there are multiple requests to
16  * be processed. Data from the set of requests can be processed incrementally
17  * when streaming (see set_stream_size to initialize). If one doesn't need
18  * these features it is better to use the teca_programmable_algorithm instead.
19  * See teca_threaded_algorithm for more details about threaded execution.
20  *
21  * The user can provide a callback for each of the three phases
22  * of pipeline execution. The number of input and output ports
23  * can also be set for filters (1 or more inputs, 1 or more outputs)
24  * sources, (no inputs, 1 or more outputs), or sinks (1 or more
25  * inputs, no outputs).
26  *
27  * 1) report phase. the report callback returns metadata
28  * describing data that can be produced. The report callback
29  * is optional. It's only needed if the algorithm will produce
30  * new data or transform metadata.
31  *
32  * the report callback must be callable with signature:
33  * teca_metadata(unsigned int)
34  *
35  * 2) request phase. the request callback generates a vector of
36  * requests(metadata objects) that inform the upstream of
37  * what data to generate. The request callback is optional.
38  * It's only needed if the algorithm needs data from the
39  * upstream or transform metadata.
40  *
41  * the request callback must be callable with the signature:
42  * std::vector<teca_metadata>(
43  * unsigned int,
44  * const std::vector<teca_metadata> &,
45  * const teca_metadata &)
46  *
47  * 3) execute phase. the execute callback is used to do useful
48  * work on incoming or outgoing data. Examples include
49  * generating new datasets, processing datasets, reading
50  * and writing data to/from disk, and so on. The execute
51  * callback is optional.
52  *
53  * the execute callback must be callable with the signature:
54  * const_p_teca_dataset(
55  * unsigned int, const std::vector<const_p_teca_dataset> &,
56  * const teca_metadata &, int)
57  *
58  * see also:
59  *
60  * set_number_of_input_connections
61  * set_number_of_output_ports
62  * set_report_callback
63  * set_request_callback
64  * set_execute_callback
65  */
67 {
68 public:
69  TECA_ALGORITHM_STATIC_NEW(teca_threaded_programmable_algorithm)
70  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_threaded_programmable_algorithm)
72 
73  // set/get the class name.
74  virtual int set_name(const std::string &name);
75 
76  const char *get_class_name() const override
77  { return this->class_name; }
78 
79  // set the number of input and outputs
82 
83  // set the number of threads. The default is -1.
85 
86  // set the stream size. the default -1 disables streaming.
87  // you should set this (2 is a good choice) if your algorithm
88  // can stream data.
90 
91  // install the default implementation
92  void use_default_report_action();
93  void use_default_request_action();
94  void use_default_execute_action();
95 
96  // set callback that responds to reporting stage
97  // of pipeline execution. the report callback must
98  // be callable with signature:
99  //
100  // teca_metadata (unsigned int,
101  // const std::vector<teca_metadata> &)
102  //
103  // the default implementation forwards downstream
104  TECA_ALGORITHM_CALLBACK_PROPERTY(
105  report_callback_t, report_callback)
106 
107  // set the callback that responds to the requesting
108  // stage of pipeline execution. the request callback
109  // must be callable with the signature:
110  //
111  // std::vector<teca_metadata> (
112  // unsigned int,
113  // const std::vector<teca_metadata> &,
114  // const teca_metadata &)
115  //
116  // the default implementation forwards upstream
117  TECA_ALGORITHM_CALLBACK_PROPERTY(
118  request_callback_t, request_callback)
119 
120  // set the callback that responds to the execution stage
121  // of pipeline execution. the execute callback must be
122  // callable with the signature:
123  //
124  // const_p_teca_dataset (
125  // unsigned int, const std::vector<const_p_teca_dataset> &,
126  // const teca_metadata &, int)
127  //
128  // the default implementation returns a nullptr
129  TECA_ALGORITHM_CALLBACK_PROPERTY(
130  threaded_execute_callback_t, execute_callback)
131 
132 protected:
134 
135 private:
137  using teca_threaded_algorithm::execute;
138 
139  teca_metadata get_output_metadata(unsigned int port,
140  const std::vector<teca_metadata> &input_md) override;
141 
142  std::vector<teca_metadata> get_upstream_request(
143  unsigned int port,
144  const std::vector<teca_metadata> &input_md,
145  const teca_metadata &request) override;
146 
147  const_p_teca_dataset execute(unsigned int port,
148  const std::vector<const_p_teca_dataset> &input_data,
149  const teca_metadata &request, int streaming) override;
150 
151 protected:
152  report_callback_t report_callback;
153  request_callback_t request_callback;
154  threaded_execute_callback_t execute_callback;
155  char class_name[96];
156 };
157 
158 #endif
virtual std::vector< teca_metadata > get_upstream_request(unsigned int port, const std::vector< teca_metadata > &input_md, const teca_metadata &request)
void set_number_of_output_ports(unsigned int n)
virtual teca_metadata get_output_metadata(unsigned int port, const std::vector< teca_metadata > &input_md)
void set_number_of_input_connections(unsigned int n)
A generic container for meta data in the form of name=value pairs.
Definition: teca_metadata.h:22
This is the base class defining a threaded algorithm.
Definition: teca_threaded_algorithm.h:71
void set_stream_size(const int &v)
Definition: teca_threaded_algorithm.h:106
void set_thread_pool_size(int n_threads)
An threaded algorithm implemented with user provided callbacks.
Definition: teca_threaded_programmable_algorithm.h:67
const char * get_class_name() const override
Definition: teca_threaded_programmable_algorithm.h:76
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.