TECA
The Toolkit for Extreme Climate Analysis
teca_algorithm.h
1 #ifndef teca_algorithm_h
2 #define teca_algorithm_h
3 
4 #include "teca_config.h"
5 #include "teca_shared_object.h"
6 #include "teca_dataset.h"
7 #include "teca_algorithm_executive.h"
8 #include "teca_metadata.h"
9 #include "teca_program_options.h"
10 #include "teca_mpi.h"
11 
12 #include <vector>
13 #include <utility>
14 #include <iosfwd>
15 #include <initializer_list>
16 #include <functional>
17 
18 class teca_algorithm_internals;
19 
20 TECA_SHARED_OBJECT_FORWARD_DECL(teca_algorithm)
21 
22 /// An output port packages an algorithm and a port number
23 using teca_algorithm_output_port
24  = std::pair<p_teca_algorithm, unsigned int>;
25 
26 /// get the algorithm from the output port
27 inline
28 p_teca_algorithm &get_algorithm(teca_algorithm_output_port &op)
29 { return op.first; }
30 
31 /// get port number from the output port
32 inline
33 unsigned int &get_port(teca_algorithm_output_port &op)
34 { return op.second; }
35 
36 /* this is a convenience macro to be used to declare a static New method that
37  * will be used to construct new objects in shared_ptr's. This manages the
38  * details of interoperability with std C++11 shared pointer
39  */
40 #define TECA_ALGORITHM_STATIC_NEW(T) \
41  \
42 /** Returns an instance of T */ \
43 static p_##T New() \
44 { \
45  return p_##T(new T); \
46 } \
47  \
48 /** Enables the static constructor */ \
49 std::shared_ptr<T> shared_from_this() \
50 { \
51  return std::static_pointer_cast<T>( \
52  teca_algorithm::shared_from_this()); \
53 } \
54  \
55 /** Enables the static constructor */ \
56 std::shared_ptr<T const> shared_from_this() const \
57 { \
58  return std::static_pointer_cast<T const>( \
59  teca_algorithm::shared_from_this()); \
60 }
61 
62 #define TECA_ALGORITHM_CLASS_NAME(T) \
63 /** returns the name of the class */ \
64 const char *get_class_name() const override \
65 { \
66  return #T; \
67 }
68 
69 /** this convenience macro removes copy and assignment operators
70  * which generally should not be defined for reference counted types
71  */
72 #define TECA_ALGORITHM_DELETE_COPY_ASSIGN(T) \
73  \
74  T(const T &src) = delete; \
75  T(T &&src) = delete; \
76  \
77  T &operator=(const T &src) = delete; \
78  T &operator=(T &&src) = delete;
79 
80 /** convenience macro to declare standard set_NAME/get_NAME methods
81  * where NAME is the name of a class member. will manage the
82  * algorithm's modified state for the user.
83  */
84 #define TECA_ALGORITHM_PROPERTY(T, NAME) \
85  \
86 /** Set the value of the NAME algorithm property */ \
87 void set_##NAME(const T &v) \
88 { \
89  if (this->NAME != v) \
90  { \
91  this->NAME = v; \
92  this->set_modified(); \
93  } \
94 } \
95  \
96 /** Get the value of the NAME algorithm property */ \
97 const T &get_##NAME() const \
98 { \
99  return this->NAME; \
100 }
101 
102 /** similar to TECA_ALGORITHM_PROPERTY but prior to setting NAME
103  * will call the member function int valididate_NAME(T v). If
104  * the value v is valid the fucntion should return 0. If the value
105  * is not zero the function should invoke TECA_ERROR with a
106  * descriptive message and return non-zero.
107  */
108 #define TECA_ALGORITHM_PROPERTY_V(T, NAME) \
109  \
110 /** Set the value of the NAME algorithm property */ \
111 void set_##NAME(const T &v) \
112 { \
113  if (this->validate_ ## NAME (v)) \
114  return; \
115  \
116  if (this->NAME != v) \
117  { \
118  this->NAME = v; \
119  this->set_modified(); \
120  } \
121 } \
122  \
123 /** Get the value of the NAME algorithm property */ \
124 const T &get_##NAME() const \
125 { \
126  return this->NAME; \
127 }
128 
129 /** convenience macro to declare standard set_NAME/get_NAME methods
130  * where NAME is the name of a class member. will manage the
131  * algorithm's modified state for the user.
132  */
133 #define TECA_ALGORITHM_VECTOR_PROPERTY(T, NAME) \
134  \
135 /** get the size of the NAME algorithm vector property */ \
136 size_t get_number_of_##NAME##s () \
137 { \
138  return this->NAME##s.size(); \
139 } \
140  \
141 /** append to the NAME algorithm vector property */ \
142 void append_##NAME(const T &v) \
143 { \
144  this->NAME##s.push_back(v); \
145  this->set_modified(); \
146 } \
147  \
148 /** set the NAME algorithm vector property to a single value */ \
149 void set_##NAME(const T &v) \
150 { \
151  this->set_##NAME##s({v}); \
152 } \
153  \
154 /** set the i-th element of the NAME algorithm vector property */ \
155 void set_##NAME(size_t i, const T &v) \
156 { \
157  if (this->NAME##s[i] != v) \
158  { \
159  this->NAME##s[i] = v; \
160  this->set_modified(); \
161  } \
162 } \
163  \
164 /** set the NAME algorithm vector property */ \
165 void set_##NAME##s(const std::vector<T> &v) \
166 { \
167  if (this->NAME##s != v) \
168  { \
169  this->NAME##s = v; \
170  this->set_modified(); \
171  } \
172 } \
173  \
174 /** set the NAME algorithm vector property */ \
175 void set_##NAME##s(const std::initializer_list<T> &&l) \
176 { \
177  std::vector<T> v(l); \
178  if (this->NAME##s != v) \
179  { \
180  this->NAME##s = v; \
181  this->set_modified(); \
182  } \
183 } \
184  \
185 /** get the i-th element of the NAME algorithm vector property */ \
186 const T &get_##NAME(size_t i) const \
187 { \
188  return this->NAME##s[i]; \
189 } \
190  \
191 /** get the NAME algorithm vector property */ \
192 const std::vector<T> &get_##NAME##s() const \
193 { \
194  return this->NAME##s; \
195 } \
196  \
197 /** clear the NAME algorithm vector property */ \
198 void clear_##NAME##s() \
199 { \
200  this->NAME##s.clear(); \
201 }
202 
203 /// helper that allows us to use std::function as a TECA_ALGORITHM_PROPERTY
204 template<typename T>
205 bool operator!=(const std::function<T> &lhs, const std::function<T> &rhs)
206 {
207  return &rhs != &lhs;
208 }
209 
210 /** This is a work around for older versions of Apple clang
211  * Apple LLVM version 4.2 (clang-425.0.28) (based on LLVM 3.2svn)
212  * Target: x86_64-apple-darwin12.6.0
213  */
214 #define TECA_ALGORITHM_CALLBACK_PROPERTY(T, NAME) \
215  \
216 /** Set the NAME algorithm property */ \
217 void set_##NAME(const T &v) \
218 { \
219  /*if (this->NAME != v)*/ \
220  /*{*/ \
221  this->NAME = v; \
222  this->set_modified(); \
223  /*}*/ \
224 } \
225  \
226 /** Get the NAME algorithm property */ \
227 const T &get_##NAME() const \
228 { \
229  return this->NAME; \
230 } \
231  \
232 /** Get the NAME algorithm property */ \
233 T &get_##NAME() \
234 { \
235  return this->NAME; \
236 }
237 
238 
239 /// The interface to TECA pipeline architecture.
240 /**
241  * All sources/readers filters, sinks/writers will implement this interface.
242  */
243 class TECA_EXPORT teca_algorithm : public std::enable_shared_from_this<teca_algorithm>
244 {
245 public:
246  // construct/destruct
247  virtual ~teca_algorithm() noexcept;
248 
249  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_algorithm)
250 
251  /// return the name of the class.
252  virtual const char *get_class_name() const = 0;
253 
254  /** set the communicator to use at this stage of the pipeline this has
255  * no influence on other stages. We duplicate the passed in communicator
256  * providing an isolated communication space for subsequent operations. By
257  * default the communicator is initialized to MPI_COMM_WORLD, here it is not
258  * duplicated. Thus to put an algorithm into a unique communication space
259  * one should explicitly set a communicator. When an algorithm should not
260  * use MPI, for instance when it is in a nested pipeline, one may set the
261  * communicator to MPI_COMM_SELF.
262  */
263  void set_communicator(MPI_Comm comm);
264 
265  /// get the active communicator
266  MPI_Comm get_communicator();
267 
268 #if defined(TECA_HAS_BOOST)
269  /** initialize the given options description with algorithm's properties
270  * implementors should call the base implementation when overriding.
271  * this should be called after the override adds its options.
272  */
273  virtual void get_properties_description(const std::string &prefix,
274  options_description &opts);
275 
276  /** initialize the algorithm from the given options variable map.
277  * implementors should call the base implementation when overriding.
278  * this should be called before the override sets its properties.
279  */
280  virtual void set_properties(const std::string &prefix,
281  variables_map &opts);
282 #endif
283 
284  /** @name verbose
285  * if set to a non-zero value, rank 0 will send status information to the
286  * terminal. The default setting of zero results in no output.
287  */
288  ///@{
289  TECA_ALGORITHM_PROPERTY(int, verbose)
290  ///@}
291 
292  /** get an output port from the algorithm. to be used during pipeline
293  * building
294  */
295  virtual
296  teca_algorithm_output_port get_output_port(unsigned int port = 0);
297 
298  /// set an input to this algorithm
299  void set_input_connection(const teca_algorithm_output_port &port)
300  { this->set_input_connection(0, port); }
301 
302  /// set an input to this algorithm
303  virtual
304  void set_input_connection(unsigned int id,
305  const teca_algorithm_output_port &port);
306 
307  /// remove input connections
308  virtual
309  void remove_input_connection(unsigned int id);
310 
311  /// remove all input connections
313 
314  /** access the cached data produced by this algorithm. when no request is
315  * specified the dataset on the top(most recent) of the cache is returned.
316  * When a request is specified it may optionally be filtered by the
317  * implementations cache key filter. see also get_cache_key (threadsafe)
318  */
319  const_p_teca_dataset get_output_data(unsigned int port = 0);
320 
321  /** remove a dataset from the top/bottom of the cache. the top of the cache
322  * has the most recently created dataset. top or bottom is selected via
323  * the boolean argument. (threadsafe)
324  */
325  void pop_cache(unsigned int port = 0, int top = 0);
326 
327  /// set the cache size. the default is 1. (threadsafe)
328  void set_cache_size(unsigned int n);
329 
330  /// execute the pipeline from this instance up.
331  virtual int update();
332 
333  /// execute the pipeline from this instance up.
334  virtual int update(unsigned int port);
335 
336  /// get meta data considering this instance up.
337  virtual teca_metadata update_metadata(unsigned int port = 0);
338 
339  /// set the executive
340  void set_executive(p_teca_algorithm_executive exe);
341 
342  /// get the executive
343  p_teca_algorithm_executive get_executive();
344 
345  /** serialize the configuration to a stream. this should store the public
346  * user modifiable properties so that runtime configuration may be saved
347  * and restored.
348  */
349  virtual void to_stream(std::ostream &s) const;
350 
351  /// deserialize from the stream.
352  virtual void from_stream(std::istream &s);
353 
354 protected:
355  teca_algorithm();
356 
357  /** Set the number of input connections. implementations should call this
358  * from their constructors to setup the internal caches and data structures
359  * required for execution.
360  */
361  void set_number_of_input_connections(unsigned int n);
362 
363  /** Set the number of output ports. implementations should call this from
364  * their constructors to setup the internal caches and data structures
365  * required for execution.
366  */
367  void set_number_of_output_ports(unsigned int n);
368 
369  /** set the modified flag on the given output port's cache. should be
370  * called when user modifies properties on the object that require the
371  * output to be regenerated.
372  */
373  virtual void set_modified();
374 
375  /// an overload to set_modified by port
376  void set_modified(unsigned int port);
377 
378 protected:
379 // this section contains methods that developers
380 // typically need to override when implementing
381 // teca_algorithm's such as reader, filters, and
382 // writers.
383 
384  /** implementations must override this method to provide information to
385  * downstream consumers about what data will be produced on each output
386  * port. The port to provide information about is named in the first
387  * argument the second argument contains a list of the metadata describing
388  * data on all of the inputs.
389  */
390  virtual
392  const std::vector<teca_metadata> &input_md);
393 
394  /** implementations must override this method and generate a set of
395  * requests describing the data required on the inputs to produce data for
396  * the named output port, given the upstream meta data and request. If no
397  * data is needed on an input then the list should contain a null request.
398  */
399  virtual
400  std::vector<teca_metadata> get_upstream_request(
401  unsigned int port, const std::vector<teca_metadata> &input_md,
402  const teca_metadata &request);
403 
404  /** implementations must override this method and produce the output dataset
405  * for the port named in the first argument. The second argument is a list
406  * of all of the input datasets. See also get_request. The third argument
407  * contains a request from the consumer which can specify information such
408  * as arrays, subset region, timestep etc. The implementation is free to
409  * handle the request as it sees fit.
410  */
411  virtual
412  const_p_teca_dataset execute(unsigned int port,
413  const std::vector<const_p_teca_dataset> &input_data,
414  const teca_metadata &request);
415 
416  /** implementations may choose to override this method to gain control of
417  * keys used in the cache. By default the passed in request is used as the
418  * key. This override gives implementor the chance to filter the passed in
419  * request.
420  */
421  virtual
422  teca_metadata get_cache_key(unsigned int port,
423  const teca_metadata &request) const;
424 
425 protected:
426 // this section contains methods that control the
427 // pipeline's behavior. these would typically only
428 // need to be overridden when designing a new class
429 // of algorithms.
430 
431  /** driver function that manage meta data reporting phase of pipeline
432  * execution.
433  */
434  virtual
436  teca_algorithm_output_port &current);
437 
438  /* driver function that manages execution of the given request on the named
439  * port
440  */
441  virtual
442  const_p_teca_dataset request_data(
443  teca_algorithm_output_port &port,
444  const teca_metadata &request);
445 
446  /** driver function that clears the output data cache where modified flag
447  * has been set from the current port upstream.
448  */
449  virtual
450  int validate_cache(teca_algorithm_output_port &current);
451 
452  /** driver function that clears the modified flag on the named port and all
453  * of it's upstream connections.
454  */
455  virtual
456  void clear_modified(teca_algorithm_output_port current);
457 
458 protected:
459 // api exposing internals for use in driver methods
460 
461  /** search the given port's cache for the dataset associated
462  * with the given request. see also get_cache_key. (threadsafe)
463  */
464  const_p_teca_dataset get_output_data(unsigned int port,
465  const teca_metadata &request);
466 
467  /** add or update the given request , dataset pair in the cache. see also
468  * get_cache_key. (threadsafe)
469  */
470  int cache_output_data(unsigned int port,
471  const teca_metadata &request, const_p_teca_dataset &data);
472 
473  /// clear the cache on the given output port
474  void clear_cache(unsigned int port);
475 
476  /// get the number of input connections
478 
479  /** get the output port associated with this algorithm's i'th input
480  * connection.
481  */
482  teca_algorithm_output_port &get_input_connection(unsigned int i);
483 
484  /// clear the modified flag on the i'th output
485  void clear_modified(unsigned int port);
486 
487  /// return the output port's modified flag value
488  int get_modified(unsigned int port) const;
489 
490 protected:
491  int verbose;
492 
493 private:
494  teca_algorithm_internals *internals;
495 
496  friend class teca_threaded_algorithm;
497  friend class teca_data_request;
498 };
499 
500 #endif
The interface to TECA pipeline architecture.
Definition: teca_algorithm.h:244
void set_input_connection(const teca_algorithm_output_port &port)
set an input to this algorithm
Definition: teca_algorithm.h:299
void clear_cache(unsigned int port)
clear the cache on the given output port
virtual std::vector< teca_metadata > get_upstream_request(unsigned int port, const std::vector< teca_metadata > &input_md, const teca_metadata &request)
unsigned int get_number_of_input_connections()
get the number of input connections
virtual teca_metadata update_metadata(unsigned int port=0)
get meta data considering this instance up.
void set_number_of_output_ports(unsigned int n)
void clear_modified(unsigned int port)
clear the modified flag on the i'th output
int cache_output_data(unsigned int port, const teca_metadata &request, const_p_teca_dataset &data)
virtual teca_metadata get_output_metadata(teca_algorithm_output_port &current)
virtual void set_modified()
p_teca_algorithm_executive get_executive()
get the executive
virtual void set_input_connection(unsigned int id, const teca_algorithm_output_port &port)
set an input to this algorithm
const_p_teca_dataset get_output_data(unsigned int port, const teca_metadata &request)
virtual void to_stream(std::ostream &s) const
void set_cache_size(unsigned int n)
set the cache size. the default is 1. (threadsafe)
int get_modified(unsigned int port) const
return the output port's modified flag value
void clear_input_connections()
remove all input connections
virtual int validate_cache(teca_algorithm_output_port &current)
void pop_cache(unsigned int port=0, int top=0)
teca_algorithm_output_port & get_input_connection(unsigned int i)
void set_executive(p_teca_algorithm_executive exe)
set the executive
virtual void clear_modified(teca_algorithm_output_port current)
virtual void from_stream(std::istream &s)
deserialize from the stream.
const_p_teca_dataset get_output_data(unsigned int port=0)
void set_modified(unsigned int port)
an overload to set_modified by port
virtual const_p_teca_dataset execute(unsigned int port, const std::vector< const_p_teca_dataset > &input_data, const teca_metadata &request)
virtual int update(unsigned int port)
execute the pipeline from this instance up.
virtual teca_metadata get_cache_key(unsigned int port, const teca_metadata &request) const
virtual teca_metadata get_output_metadata(unsigned int port, const std::vector< teca_metadata > &input_md)
virtual int update()
execute the pipeline from this instance up.
virtual void remove_input_connection(unsigned int id)
remove input connections
virtual teca_algorithm_output_port get_output_port(unsigned int port=0)
void set_number_of_input_connections(unsigned int n)
A generic container for meta data in the form of name=value pairs.
Definition: teca_metadata.h:22
This is the base class defining a threaded algorithm.
Definition: teca_threaded_algorithm.h:71
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.
auto data(V &&... args)
Definition: teca_variant_array_util.h:255
p_teca_algorithm & get_algorithm(teca_algorithm_output_port &op)
get the algorithm from the output port
Definition: teca_algorithm_output_port.h:12
unsigned int & get_port(teca_algorithm_output_port &op)
get port number from the output port
Definition: teca_algorithm_output_port.h:17