TECA
The Toolkit for Extreme Climate Analysis
teca_threaded_programmable_algorithm Class Reference

An threaded algorithm implemented with user provided callbacks. More...

#include <teca_threaded_programmable_algorithm.h>

Inheritance diagram for teca_threaded_programmable_algorithm:
Collaboration diagram for teca_threaded_programmable_algorithm:

Public Member Functions

std::shared_ptr< teca_threaded_programmable_algorithmshared_from_this ()
 
std::shared_ptr< teca_threaded_programmable_algorithm const > shared_from_this () const
 
 teca_threaded_programmable_algorithm (const teca_threaded_programmable_algorithm &src)=delete
 
 teca_threaded_programmable_algorithm (teca_threaded_programmable_algorithm &&src)=delete
 
teca_threaded_programmable_algorithmoperator= (const teca_threaded_programmable_algorithm &src)=delete
 
teca_threaded_programmable_algorithmoperator= (teca_threaded_programmable_algorithm &&src)=delete
 
virtual int set_name (const std::string &name)
 
const char * get_class_name () const override
 
void use_default_report_action ()
 
void use_default_request_action ()
 
void use_default_execute_action ()
 
void set_report_callback (const report_callback_t &v)
 
const report_callback_t & get_report_callback () const
 
report_callback_t & get_report_callback ()
 
void set_request_callback (const request_callback_t &v)
 
const request_callback_t & get_request_callback () const
 
request_callback_t & get_request_callback ()
 
void set_execute_callback (const threaded_execute_callback_t &v)
 
const threaded_execute_callback_t & get_execute_callback () const
 
threaded_execute_callback_t & get_execute_callback ()
 
void set_number_of_input_connections (unsigned int n)
 
void set_number_of_output_ports (unsigned int n)
 
void set_thread_pool_size (int n_threads)
 
void set_stream_size (const int &v)
 
- Public Member Functions inherited from teca_threaded_algorithm
std::shared_ptr< teca_threaded_algorithmshared_from_this ()
 
std::shared_ptr< teca_threaded_algorithm const > shared_from_this () const
 
 teca_threaded_algorithm (const teca_threaded_algorithm &src)=delete
 
 teca_threaded_algorithm (teca_threaded_algorithm &&src)=delete
 
teca_threaded_algorithmoperator= (const teca_threaded_algorithm &src)=delete
 
teca_threaded_algorithmoperator= (teca_threaded_algorithm &&src)=delete
 
void set_thread_pool_size (int n_threads)
 
unsigned int get_thread_pool_size () const noexcept
 Get the number of threads in the pool. More...
 
void set_data_request_queue (const p_teca_data_request_queue &queue)
 
void set_verbose (const int &v)
 
const int & get_verbose () const
 
void set_bind_threads (const int &v)
 
const int & get_bind_threads () const
 
void set_stream_size (const int &v)
 
const int & get_stream_size () const
 
void set_poll_interval (const long long &v)
 
const long long & get_poll_interval () const
 
- Public Member Functions inherited from teca_algorithm
 teca_algorithm (const teca_algorithm &src)=delete
 
 teca_algorithm (teca_algorithm &&src)=delete
 
teca_algorithmoperator= (const teca_algorithm &src)=delete
 
teca_algorithmoperator= (teca_algorithm &&src)=delete
 
void set_communicator (MPI_Comm comm)
 
MPI_Comm get_communicator ()
 get the active communicator More...
 
virtual teca_algorithm_output_port get_output_port (unsigned int port=0)
 
void set_input_connection (const teca_algorithm_output_port &port)
 set an input to this algorithm More...
 
virtual void set_input_connection (unsigned int id, const teca_algorithm_output_port &port)
 set an input to this algorithm More...
 
virtual void remove_input_connection (unsigned int id)
 remove input connections More...
 
void clear_input_connections ()
 remove all input connections More...
 
const_p_teca_dataset get_output_data (unsigned int port=0)
 
void pop_cache (unsigned int port=0, int top=0)
 
void set_cache_size (unsigned int n)
 set the cache size. the default is 1. (threadsafe) More...
 
virtual int update ()
 execute the pipeline from this instance up. More...
 
virtual int update (unsigned int port)
 execute the pipeline from this instance up. More...
 
virtual teca_metadata update_metadata (unsigned int port=0)
 get meta data considering this instance up. More...
 
void set_executive (p_teca_algorithm_executive exe)
 set the executive More...
 
p_teca_algorithm_executive get_executive ()
 get the executive More...
 
virtual void to_stream (std::ostream &s) const
 
virtual void from_stream (std::istream &s)
 deserialize from the stream. More...
 
void set_verbose (const int &v)
 
const int & get_verbose () const
 

Static Public Member Functions

static p_teca_threaded_programmable_algorithm New ()
 
- Static Public Member Functions inherited from teca_threaded_algorithm
static p_teca_threaded_algorithm New ()
 

Protected Attributes

report_callback_t report_callback
 
request_callback_t request_callback
 
threaded_execute_callback_t execute_callback
 
char class_name [64]
 
- Protected Attributes inherited from teca_algorithm
int verbose
 

Additional Inherited Members

- Protected Member Functions inherited from teca_threaded_algorithm
const_p_teca_dataset execute (unsigned int port, const std::vector< const_p_teca_dataset > &input_data, const teca_metadata &request) override
 
const_p_teca_dataset request_data (teca_algorithm_output_port &port, const teca_metadata &request) override
 
- Protected Member Functions inherited from teca_algorithm
void set_number_of_input_connections (unsigned int n)
 
void set_number_of_output_ports (unsigned int n)
 
virtual void set_modified ()
 
void set_modified (unsigned int port)
 an overload to set_modified by port More...
 
virtual teca_metadata get_cache_key (unsigned int port, const teca_metadata &request) const
 
virtual teca_metadata get_output_metadata (teca_algorithm_output_port &current)
 
virtual int validate_cache (teca_algorithm_output_port &current)
 
virtual void clear_modified (teca_algorithm_output_port current)
 
const_p_teca_dataset get_output_data (unsigned int port, const teca_metadata &request)
 
int cache_output_data (unsigned int port, const teca_metadata &request, const_p_teca_dataset &data)
 
void clear_cache (unsigned int port)
 clear the cache on the given output port More...
 
unsigned int get_number_of_input_connections ()
 get the number of input connections More...
 
teca_algorithm_output_portget_input_connection (unsigned int i)
 
void clear_modified (unsigned int port)
 clear the modified flag on the i'th output More...
 
int get_modified (unsigned int port) const
 return the output port's modified flag value More...
 

Detailed Description

An threaded algorithm implemented with user provided callbacks.

This version of the teca_programmable_algorithm is threaded. A thread pool (call set_thread_pool_size to initialize) executes the upstream pipeline asynchronously for each request made. Hence, this version of the programmable algorithm is most useful when there are multiple requests to be processed. Data from the set of requests can be processed incrementally when streaming (see set_stream_size to initialize). If one doesn't need these features it is better to use the teca_programmable_algorithm instead. See teca_threaded_algorithm for more details about threaded execution.

The user can provide a callback for each of the three phases of pipeline execution. The number of input and output ports can also be set for filters (1 or more inputs, 1 or more outputs) sources, (no inputs, 1 or more outputs), or sinks (1 or more inputs, no outputs).

1) report phase. the report callback returns metadata describing data that can be produced. The report callback is optional. It's only needed if the algorithm will produce new data or transform metadata.

the report callback must be callable with signature: teca_metadata(unsigned int)

2) request phase. the request callback generates a vector of requests(metadata objects) that inform the upstream of what data to generate. The request callback is optional. It's only needed if the algorithm needs data from the upstream or transform metadata.

the request callback must be callable with the signature: std::vector<teca_metadata>( unsigned int, const std::vector<teca_metadata> &, const teca_metadata &)

3) execute phase. the execute callback is used to do useful work on incoming or outgoing data. Examples include generating new datasets, processing datasets, reading and writing data to/from disk, and so on. The execute callback is optional.

the execute callback must be callable with the signature: const_p_teca_dataset( unsigned int, const std::vector<const_p_teca_dataset> &, const teca_metadata &, int)

see also:

set_number_of_input_connections set_number_of_output_ports set_report_callback set_request_callback set_execute_callback

Member Function Documentation

◆ get_class_name()

const char* teca_threaded_programmable_algorithm::get_class_name ( ) const
inlineoverridevirtual

returns the name of the class

Reimplemented from teca_threaded_algorithm.

◆ get_execute_callback() [1/2]

threaded_execute_callback_t& teca_threaded_programmable_algorithm::get_execute_callback ( )
inline

Get the execute_callback algorithm property

◆ get_execute_callback() [2/2]

const threaded_execute_callback_t& teca_threaded_programmable_algorithm::get_execute_callback ( ) const
inline

Get the execute_callback algorithm property

◆ get_report_callback() [1/2]

report_callback_t& teca_threaded_programmable_algorithm::get_report_callback ( )
inline

Get the report_callback algorithm property

◆ get_report_callback() [2/2]

const report_callback_t& teca_threaded_programmable_algorithm::get_report_callback ( ) const
inline

Get the report_callback algorithm property

◆ get_request_callback() [1/2]

request_callback_t& teca_threaded_programmable_algorithm::get_request_callback ( )
inline

Get the request_callback algorithm property

◆ get_request_callback() [2/2]

const request_callback_t& teca_threaded_programmable_algorithm::get_request_callback ( ) const
inline

Get the request_callback algorithm property

◆ New()

static p_teca_threaded_programmable_algorithm teca_threaded_programmable_algorithm::New ( )
inlinestatic

Returns an instance of teca_threaded_programmable_algorithm

◆ set_execute_callback()

void teca_threaded_programmable_algorithm::set_execute_callback ( const threaded_execute_callback_t &  v)
inline

Set the execute_callback algorithm property

◆ set_number_of_input_connections()

void teca_algorithm::set_number_of_input_connections

Set the number of input connections. implementations should call this from their constructors to setup the internal caches and data structures required for execution.

◆ set_number_of_output_ports()

void teca_algorithm::set_number_of_output_ports

Set the number of output ports. implementations should call this from their constructors to setup the internal caches and data structures required for execution.

◆ set_report_callback()

void teca_threaded_programmable_algorithm::set_report_callback ( const report_callback_t &  v)
inline

Set the report_callback algorithm property

◆ set_request_callback()

void teca_threaded_programmable_algorithm::set_request_callback ( const request_callback_t &  v)
inline

Set the request_callback algorithm property

◆ set_stream_size()

void teca_threaded_algorithm::set_stream_size
inline

Set the value of the stream_size algorithm property

◆ set_thread_pool_size()

void teca_threaded_algorithm::set_thread_pool_size

Set the number of threads in the pool. setting to -1 results in a thread per core factoring in all MPI ranks running on the node.

◆ shared_from_this() [1/2]

std::shared_ptr< teca_threaded_programmable_algorithm > teca_threaded_programmable_algorithm::shared_from_this ( )
inline

Enables the static constructor

◆ shared_from_this() [2/2]

std::shared_ptr< teca_threaded_programmable_algorithm const> teca_threaded_programmable_algorithm::shared_from_this ( ) const
inline

Enables the static constructor


The documentation for this class was generated from the following file: