|
TECA
The Toolkit for Extreme Climate Analysis
|
An threaded algorithm implemented with user provided callbacks. More...
#include <teca_threaded_programmable_algorithm.h>
Public Member Functions | |
| std::shared_ptr< teca_threaded_programmable_algorithm > | shared_from_this () |
| std::shared_ptr< teca_threaded_programmable_algorithm const > | shared_from_this () const |
| teca_threaded_programmable_algorithm (const teca_threaded_programmable_algorithm &src)=delete | |
| teca_threaded_programmable_algorithm (teca_threaded_programmable_algorithm &&src)=delete | |
| teca_threaded_programmable_algorithm & | operator= (const teca_threaded_programmable_algorithm &src)=delete |
| teca_threaded_programmable_algorithm & | operator= (teca_threaded_programmable_algorithm &&src)=delete |
| virtual int | set_name (const std::string &name) |
| const char * | get_class_name () const override |
| void | use_default_report_action () |
| void | use_default_request_action () |
| void | use_default_execute_action () |
| void | set_report_callback (const report_callback_t &v) |
| const report_callback_t & | get_report_callback () const |
| report_callback_t & | get_report_callback () |
| void | set_request_callback (const request_callback_t &v) |
| const request_callback_t & | get_request_callback () const |
| request_callback_t & | get_request_callback () |
| void | set_execute_callback (const threaded_execute_callback_t &v) |
| const threaded_execute_callback_t & | get_execute_callback () const |
| threaded_execute_callback_t & | get_execute_callback () |
| void | set_number_of_input_connections (unsigned int n) |
| void | set_number_of_output_ports (unsigned int n) |
| void | set_thread_pool_size (int n_threads) |
| void | set_stream_size (const int &v) |
Public Member Functions inherited from teca_threaded_algorithm | |
| std::shared_ptr< teca_threaded_algorithm > | shared_from_this () |
| std::shared_ptr< teca_threaded_algorithm const > | shared_from_this () const |
| teca_threaded_algorithm (const teca_threaded_algorithm &src)=delete | |
| teca_threaded_algorithm (teca_threaded_algorithm &&src)=delete | |
| teca_threaded_algorithm & | operator= (const teca_threaded_algorithm &src)=delete |
| teca_threaded_algorithm & | operator= (teca_threaded_algorithm &&src)=delete |
| void | set_thread_pool_size (int n_threads) |
| unsigned int | get_thread_pool_size () const noexcept |
| Get the number of threads in the pool. More... | |
| void | set_data_request_queue (const p_teca_data_request_queue &queue) |
| explicitly set the thread pool to submit requests to More... | |
| void | set_bind_threads (const int &v) |
| const int & | get_bind_threads () const |
| void | set_stream_size (const int &v) |
| const int & | get_stream_size () const |
| void | set_poll_interval (const long long &v) |
| const long long & | get_poll_interval () const |
| void | set_threads_per_device (const int &v) |
| const int & | get_threads_per_device () const |
| void | set_ranks_per_device (const int &v) |
| const int & | get_ranks_per_device () const |
| void | set_propagate_device_assignment (const int &v) |
| const int & | get_propagate_device_assignment () const |
Public Member Functions inherited from teca_algorithm | |
| teca_algorithm (const teca_algorithm &src)=delete | |
| teca_algorithm (teca_algorithm &&src)=delete | |
| teca_algorithm & | operator= (const teca_algorithm &src)=delete |
| teca_algorithm & | operator= (teca_algorithm &&src)=delete |
| void | set_communicator (MPI_Comm comm) |
| MPI_Comm | get_communicator () |
| get the active communicator More... | |
| virtual teca_algorithm_output_port | get_output_port (unsigned int port=0) |
| void | set_input_connection (const teca_algorithm_output_port &port) |
| set an input to this algorithm More... | |
| virtual void | set_input_connection (unsigned int id, const teca_algorithm_output_port &port) |
| set an input to this algorithm More... | |
| virtual void | remove_input_connection (unsigned int id) |
| remove input connections More... | |
| void | clear_input_connections () |
| remove all input connections More... | |
| const_p_teca_dataset | get_output_data (unsigned int port=0) |
| void | pop_cache (unsigned int port=0, int top=0) |
| void | set_cache_size (unsigned int n) |
| set the cache size. the default is 1. (threadsafe) More... | |
| virtual int | update () |
| execute the pipeline from this instance up. More... | |
| virtual int | update (unsigned int port) |
| execute the pipeline from this instance up. More... | |
| virtual teca_metadata | update_metadata (unsigned int port=0) |
| get meta data considering this instance up. More... | |
| void | set_executive (p_teca_algorithm_executive exe) |
| set the executive More... | |
| p_teca_algorithm_executive | get_executive () |
| get the executive More... | |
| virtual void | to_stream (std::ostream &s) const |
| virtual void | from_stream (std::istream &s) |
| deserialize from the stream. More... | |
| void | set_verbose (const int &v) |
| const int & | get_verbose () const |
Static Public Member Functions | |
| static p_teca_threaded_programmable_algorithm | New () |
Static Public Member Functions inherited from teca_threaded_algorithm | |
| static p_teca_threaded_algorithm | New () |
Protected Attributes | |
| report_callback_t | report_callback |
| request_callback_t | request_callback |
| threaded_execute_callback_t | execute_callback |
| char | class_name [96] |
Protected Attributes inherited from teca_algorithm | |
| int | verbose |
Additional Inherited Members | |
Protected Member Functions inherited from teca_threaded_algorithm | |
| const_p_teca_dataset | execute (unsigned int port, const std::vector< const_p_teca_dataset > &input_data, const teca_metadata &request) override |
| const_p_teca_dataset | request_data (teca_algorithm_output_port &port, const teca_metadata &request) override |
Protected Member Functions inherited from teca_algorithm | |
| void | set_number_of_input_connections (unsigned int n) |
| void | set_number_of_output_ports (unsigned int n) |
| virtual void | set_modified () |
| void | set_modified (unsigned int port) |
| an overload to set_modified by port More... | |
| virtual teca_metadata | get_cache_key (unsigned int port, const teca_metadata &request) const |
| virtual teca_metadata | get_output_metadata (teca_algorithm_output_port ¤t) |
| virtual int | validate_cache (teca_algorithm_output_port ¤t) |
| virtual void | clear_modified (teca_algorithm_output_port current) |
| const_p_teca_dataset | get_output_data (unsigned int port, const teca_metadata &request) |
| int | cache_output_data (unsigned int port, const teca_metadata &request, const_p_teca_dataset &data) |
| void | clear_cache (unsigned int port) |
| clear the cache on the given output port More... | |
| unsigned int | get_number_of_input_connections () |
| get the number of input connections More... | |
| teca_algorithm_output_port & | get_input_connection (unsigned int i) |
| void | clear_modified (unsigned int port) |
| clear the modified flag on the i'th output More... | |
| int | get_modified (unsigned int port) const |
| return the output port's modified flag value More... | |
An threaded algorithm implemented with user provided callbacks.
This version of the teca_programmable_algorithm is threaded. A thread pool (call set_thread_pool_size to initialize) executes the upstream pipeline asynchronously for each request made. Hence, this version of the programmable algorithm is most useful when there are multiple requests to be processed. Data from the set of requests can be processed incrementally when streaming (see set_stream_size to initialize). If one doesn't need these features it is better to use the teca_programmable_algorithm instead. See teca_threaded_algorithm for more details about threaded execution.
The user can provide a callback for each of the three phases of pipeline execution. The number of input and output ports can also be set for filters (1 or more inputs, 1 or more outputs) sources, (no inputs, 1 or more outputs), or sinks (1 or more inputs, no outputs).
1) report phase. the report callback returns metadata describing data that can be produced. The report callback is optional. It's only needed if the algorithm will produce new data or transform metadata.
the report callback must be callable with signature: teca_metadata(unsigned int)
2) request phase. the request callback generates a vector of requests(metadata objects) that inform the upstream of what data to generate. The request callback is optional. It's only needed if the algorithm needs data from the upstream or transform metadata.
the request callback must be callable with the signature: std::vector<teca_metadata>( unsigned int, const std::vector<teca_metadata> &, const teca_metadata &)
3) execute phase. the execute callback is used to do useful work on incoming or outgoing data. Examples include generating new datasets, processing datasets, reading and writing data to/from disk, and so on. The execute callback is optional.
the execute callback must be callable with the signature: const_p_teca_dataset( unsigned int, const std::vector<const_p_teca_dataset> &, const teca_metadata &, int)
see also:
set_number_of_input_connections set_number_of_output_ports set_report_callback set_request_callback set_execute_callback
|
inlineoverridevirtual |
returns the name of the class
Reimplemented from teca_threaded_algorithm.
|
inline |
Get the execute_callback algorithm property
|
inline |
Get the execute_callback algorithm property
|
inline |
Get the report_callback algorithm property
|
inline |
Get the report_callback algorithm property
|
inline |
Get the request_callback algorithm property
|
inline |
Get the request_callback algorithm property
|
inlinestatic |
Returns an instance of teca_threaded_programmable_algorithm
|
inline |
Set the execute_callback algorithm property
| void teca_algorithm::set_number_of_input_connections |
Set the number of input connections. implementations should call this from their constructors to setup the internal caches and data structures required for execution.
| void teca_algorithm::set_number_of_output_ports |
Set the number of output ports. implementations should call this from their constructors to setup the internal caches and data structures required for execution.
|
inline |
Set the report_callback algorithm property
|
inline |
Set the request_callback algorithm property
|
inline |
Set the value of the stream_size algorithm property
| void teca_threaded_algorithm::set_thread_pool_size |
Set the number of threads in the pool. setting to -1 results in a thread per core factoring in all MPI ranks running on the node.
|
inline |
Enables the static constructor
|
inline |
Enables the static constructor