|
TECA
The Toolkit for Extreme Climate Analysis
|
This is the base class defining a threaded algorithm. More...
#include <teca_threaded_algorithm.h>
Public Member Functions | |
| std::shared_ptr< teca_threaded_algorithm > | shared_from_this () |
| std::shared_ptr< teca_threaded_algorithm const > | shared_from_this () const |
| teca_threaded_algorithm (const teca_threaded_algorithm &src)=delete | |
| teca_threaded_algorithm (teca_threaded_algorithm &&src)=delete | |
| teca_threaded_algorithm & | operator= (const teca_threaded_algorithm &src)=delete |
| teca_threaded_algorithm & | operator= (teca_threaded_algorithm &&src)=delete |
| const char * | get_class_name () const override |
| void | set_thread_pool_size (int n_threads) |
| unsigned int | get_thread_pool_size () const noexcept |
| Get the number of threads in the pool. More... | |
| void | set_data_request_queue (const p_teca_data_request_queue &queue) |
| explicitly set the thread pool to submit requests to More... | |
bind_threads | |
set/get thread affinity mode. When 0 threads are not bound CPU cores, allowing for migration among all cores. This will likely degrade performance. Default is 1. | |
| void | set_bind_threads (const int &v) |
| const int & | get_bind_threads () const |
stream_size | |
set the smallest number of datasets to gather per call to execute. the default (-1) results in all datasets being gathered. In practice more datasets will be returned if ready | |
| void | set_stream_size (const int &v) |
| const int & | get_stream_size () const |
poll_interval | |
set the duration in nanoseconds to wait between checking for completed tasks | |
| void | set_poll_interval (const long long &v) |
| const long long & | get_poll_interval () const |
threads_per_device | |
Set the number of threads to service each GPU/device. Other threads will use the CPU. | |
| void | set_threads_per_device (const int &v) |
| const int & | get_threads_per_device () const |
ranks_per_device | |
Set the number of ranks that have access to each GPU/device. Other ranks will use the CPU. | |
| void | set_ranks_per_device (const int &v) |
| const int & | get_ranks_per_device () const |
propagate_device_assignment | |
When set device assignment is taken from down stream request. Otherwise the thread executing the pipeline will provide the assignment. | |
| void | set_propagate_device_assignment (const int &v) |
| const int & | get_propagate_device_assignment () const |
Public Member Functions inherited from teca_algorithm | |
| teca_algorithm (const teca_algorithm &src)=delete | |
| teca_algorithm (teca_algorithm &&src)=delete | |
| teca_algorithm & | operator= (const teca_algorithm &src)=delete |
| teca_algorithm & | operator= (teca_algorithm &&src)=delete |
| void | set_communicator (MPI_Comm comm) |
| MPI_Comm | get_communicator () |
| get the active communicator More... | |
| virtual teca_algorithm_output_port | get_output_port (unsigned int port=0) |
| void | set_input_connection (const teca_algorithm_output_port &port) |
| set an input to this algorithm More... | |
| virtual void | set_input_connection (unsigned int id, const teca_algorithm_output_port &port) |
| set an input to this algorithm More... | |
| virtual void | remove_input_connection (unsigned int id) |
| remove input connections More... | |
| void | clear_input_connections () |
| remove all input connections More... | |
| const_p_teca_dataset | get_output_data (unsigned int port=0) |
| void | pop_cache (unsigned int port=0, int top=0) |
| void | set_cache_size (unsigned int n) |
| set the cache size. the default is 1. (threadsafe) More... | |
| virtual int | update () |
| execute the pipeline from this instance up. More... | |
| virtual int | update (unsigned int port) |
| execute the pipeline from this instance up. More... | |
| virtual teca_metadata | update_metadata (unsigned int port=0) |
| get meta data considering this instance up. More... | |
| void | set_executive (p_teca_algorithm_executive exe) |
| set the executive More... | |
| p_teca_algorithm_executive | get_executive () |
| get the executive More... | |
| virtual void | to_stream (std::ostream &s) const |
| virtual void | from_stream (std::istream &s) |
| deserialize from the stream. More... | |
| void | set_verbose (const int &v) |
| const int & | get_verbose () const |
Static Public Member Functions | |
| static p_teca_threaded_algorithm | New () |
Protected Member Functions | |
| virtual const_p_teca_dataset | execute (unsigned int port, const std::vector< const_p_teca_dataset > &input_data, const teca_metadata &request, int streaming) |
| const_p_teca_dataset | execute (unsigned int port, const std::vector< const_p_teca_dataset > &input_data, const teca_metadata &request) override |
| const_p_teca_dataset | request_data (teca_algorithm_output_port &port, const teca_metadata &request) override |
Protected Member Functions inherited from teca_algorithm | |
| void | set_number_of_input_connections (unsigned int n) |
| void | set_number_of_output_ports (unsigned int n) |
| virtual void | set_modified () |
| void | set_modified (unsigned int port) |
| an overload to set_modified by port More... | |
| virtual teca_metadata | get_output_metadata (unsigned int port, const std::vector< teca_metadata > &input_md) |
| virtual std::vector< teca_metadata > | get_upstream_request (unsigned int port, const std::vector< teca_metadata > &input_md, const teca_metadata &request) |
| virtual teca_metadata | get_cache_key (unsigned int port, const teca_metadata &request) const |
| virtual teca_metadata | get_output_metadata (teca_algorithm_output_port ¤t) |
| virtual int | validate_cache (teca_algorithm_output_port ¤t) |
| virtual void | clear_modified (teca_algorithm_output_port current) |
| const_p_teca_dataset | get_output_data (unsigned int port, const teca_metadata &request) |
| int | cache_output_data (unsigned int port, const teca_metadata &request, const_p_teca_dataset &data) |
| void | clear_cache (unsigned int port) |
| clear the cache on the given output port More... | |
| unsigned int | get_number_of_input_connections () |
| get the number of input connections More... | |
| teca_algorithm_output_port & | get_input_connection (unsigned int i) |
| void | clear_modified (unsigned int port) |
| clear the modified flag on the i'th output More... | |
| int | get_modified (unsigned int port) const |
| return the output port's modified flag value More... | |
Additional Inherited Members | |
Protected Attributes inherited from teca_algorithm | |
| int | verbose |
This is the base class defining a threaded algorithm.
The strategy employed is to parallelize over upstream data requests using a thread pool. Implementations override teca_algorithm::get_output_metadata, teca_algorithm::get_upstream_request, and teca_algorithm::execute. Pipeline execution is parallelized over the set of requests returned from the teca_algorithm::get_upstream_request override. The generated data is then fed incrementally to the teca_algorithm::execute override as it arrives in at least stream_size increments. Alternatively the generated data can be collected and fed to the execute override in one call. However, processing the data in one call is both slower and has a higher memory footprint making it prohibitive in many situations.
|
overrideprotectedvirtual |
implementations must override this method and produce the output dataset for the port named in the first argument. The second argument is a list of all of the input datasets. See also get_request. The third argument contains a request from the consumer which can specify information such as arrays, subset region, timestep etc. The implementation is free to handle the request as it sees fit.
Reimplemented from teca_algorithm.
|
inline |
Get the value of the bind_threads algorithm property
|
inlineoverridevirtual |
returns the name of the class
Implements teca_algorithm.
Reimplemented in teca_cf_writer, teca_cf_time_axis_data_reduce, and teca_threaded_programmable_algorithm.
|
inline |
Get the value of the poll_interval algorithm property
|
inline |
Get the value of the propagate_device_assignment algorithm property
|
inline |
Get the value of the ranks_per_device algorithm property
|
inline |
Get the value of the stream_size algorithm property
|
noexcept |
Get the number of threads in the pool.
|
inline |
Get the value of the threads_per_device algorithm property
|
inlinestatic |
Returns an instance of teca_threaded_algorithm
|
inline |
Set the value of the bind_threads algorithm property
| void teca_threaded_algorithm::set_data_request_queue | ( | const p_teca_data_request_queue & | queue | ) |
explicitly set the thread pool to submit requests to
|
inline |
Set the value of the poll_interval algorithm property
|
inline |
Set the value of the propagate_device_assignment algorithm property
|
inline |
Set the value of the ranks_per_device algorithm property
|
inline |
Set the value of the stream_size algorithm property
| void teca_threaded_algorithm::set_thread_pool_size | ( | int | n_threads | ) |
Set the number of threads in the pool. setting to -1 results in a thread per core factoring in all MPI ranks running on the node.
|
inline |
Set the value of the threads_per_device algorithm property
|
inline |
Enables the static constructor
|
inline |
Enables the static constructor