TECA
The Toolkit for Extreme Climate Analysis
teca_threaded_algorithm Class Reference

This is the base class defining a threaded algorithm. More...

#include <teca_threaded_algorithm.h>

Inheritance diagram for teca_threaded_algorithm:
[legend]
Collaboration diagram for teca_threaded_algorithm:
[legend]

Public Member Functions

std::shared_ptr< teca_threaded_algorithmshared_from_this ()
 
std::shared_ptr< teca_threaded_algorithm const > shared_from_this () const
 
 teca_threaded_algorithm (const teca_threaded_algorithm &src)=delete
 
 teca_threaded_algorithm (teca_threaded_algorithm &&src)=delete
 
teca_threaded_algorithmoperator= (const teca_threaded_algorithm &src)=delete
 
teca_threaded_algorithmoperator= (teca_threaded_algorithm &&src)=delete
 
const char * get_class_name () const override
 
void set_thread_pool_size (int n_threads)
 
unsigned int get_thread_pool_size () const noexcept
 Get the number of threads in the pool. More...
 
void set_data_request_queue (const p_teca_data_request_queue &queue)
 explicitly set the thread pool to submit requests to More...
 
bind_threads

set/get thread affinity mode. When 0 threads are not bound CPU cores, allowing for migration among all cores. This will likely degrade performance. Default is 1.

void set_bind_threads (const int &v)
 
const int & get_bind_threads () const
 
stream_size

set the smallest number of datasets to gather per call to execute. the default (-1) results in all datasets being gathered. In practice more datasets will be returned if ready

void set_stream_size (const int &v)
 
const int & get_stream_size () const
 
poll_interval

set the duration in nanoseconds to wait between checking for completed tasks

void set_poll_interval (const long long &v)
 
const long long & get_poll_interval () const
 
threads_per_device

Set the number of threads to service each GPU/device. Other threads will use the CPU.

void set_threads_per_device (const int &v)
 
const int & get_threads_per_device () const
 
ranks_per_device

Set the number of ranks that have access to each GPU/device. Other ranks will use the CPU.

void set_ranks_per_device (const int &v)
 
const int & get_ranks_per_device () const
 
propagate_device_assignment

When set device assignment is taken from down stream request. Otherwise the thread executing the pipeline will provide the assignment.

void set_propagate_device_assignment (const int &v)
 
const int & get_propagate_device_assignment () const
 
- Public Member Functions inherited from teca_algorithm
 teca_algorithm (const teca_algorithm &src)=delete
 
 teca_algorithm (teca_algorithm &&src)=delete
 
teca_algorithmoperator= (const teca_algorithm &src)=delete
 
teca_algorithmoperator= (teca_algorithm &&src)=delete
 
void set_communicator (MPI_Comm comm)
 
MPI_Comm get_communicator ()
 get the active communicator More...
 
virtual teca_algorithm_output_port get_output_port (unsigned int port=0)
 
void set_input_connection (const teca_algorithm_output_port &port)
 set an input to this algorithm More...
 
virtual void set_input_connection (unsigned int id, const teca_algorithm_output_port &port)
 set an input to this algorithm More...
 
virtual void remove_input_connection (unsigned int id)
 remove input connections More...
 
void clear_input_connections ()
 remove all input connections More...
 
const_p_teca_dataset get_output_data (unsigned int port=0)
 
void pop_cache (unsigned int port=0, int top=0)
 
void set_cache_size (unsigned int n)
 set the cache size. the default is 1. (threadsafe) More...
 
virtual int update ()
 execute the pipeline from this instance up. More...
 
virtual int update (unsigned int port)
 execute the pipeline from this instance up. More...
 
virtual teca_metadata update_metadata (unsigned int port=0)
 get meta data considering this instance up. More...
 
void set_executive (p_teca_algorithm_executive exe)
 set the executive More...
 
p_teca_algorithm_executive get_executive ()
 get the executive More...
 
virtual void to_stream (std::ostream &s) const
 
virtual void from_stream (std::istream &s)
 deserialize from the stream. More...
 
void set_verbose (const int &v)
 
const int & get_verbose () const
 

Static Public Member Functions

static p_teca_threaded_algorithm New ()
 

Protected Member Functions

virtual const_p_teca_dataset execute (unsigned int port, const std::vector< const_p_teca_dataset > &input_data, const teca_metadata &request, int streaming)
 
const_p_teca_dataset execute (unsigned int port, const std::vector< const_p_teca_dataset > &input_data, const teca_metadata &request) override
 
const_p_teca_dataset request_data (teca_algorithm_output_port &port, const teca_metadata &request) override
 
- Protected Member Functions inherited from teca_algorithm
void set_number_of_input_connections (unsigned int n)
 
void set_number_of_output_ports (unsigned int n)
 
virtual void set_modified ()
 
void set_modified (unsigned int port)
 an overload to set_modified by port More...
 
virtual teca_metadata get_output_metadata (unsigned int port, const std::vector< teca_metadata > &input_md)
 
virtual std::vector< teca_metadataget_upstream_request (unsigned int port, const std::vector< teca_metadata > &input_md, const teca_metadata &request)
 
virtual teca_metadata get_cache_key (unsigned int port, const teca_metadata &request) const
 
virtual teca_metadata get_output_metadata (teca_algorithm_output_port &current)
 
virtual int validate_cache (teca_algorithm_output_port &current)
 
virtual void clear_modified (teca_algorithm_output_port current)
 
const_p_teca_dataset get_output_data (unsigned int port, const teca_metadata &request)
 
int cache_output_data (unsigned int port, const teca_metadata &request, const_p_teca_dataset &data)
 
void clear_cache (unsigned int port)
 clear the cache on the given output port More...
 
unsigned int get_number_of_input_connections ()
 get the number of input connections More...
 
teca_algorithm_output_portget_input_connection (unsigned int i)
 
void clear_modified (unsigned int port)
 clear the modified flag on the i'th output More...
 
int get_modified (unsigned int port) const
 return the output port's modified flag value More...
 

Additional Inherited Members

- Protected Attributes inherited from teca_algorithm
int verbose
 

Detailed Description

This is the base class defining a threaded algorithm.

The strategy employed is to parallelize over upstream data requests using a thread pool. Implementations override teca_algorithm::get_output_metadata, teca_algorithm::get_upstream_request, and teca_algorithm::execute. Pipeline execution is parallelized over the set of requests returned from the teca_algorithm::get_upstream_request override. The generated data is then fed incrementally to the teca_algorithm::execute override as it arrives in at least stream_size increments. Alternatively the generated data can be collected and fed to the execute override in one call. However, processing the data in one call is both slower and has a higher memory footprint making it prohibitive in many situations.

Member Function Documentation

◆ execute()

const_p_teca_dataset teca_threaded_algorithm::execute ( unsigned int  port,
const std::vector< const_p_teca_dataset > &  input_data,
const teca_metadata request 
)
overrideprotectedvirtual

implementations must override this method and produce the output dataset for the port named in the first argument. The second argument is a list of all of the input datasets. See also get_request. The third argument contains a request from the consumer which can specify information such as arrays, subset region, timestep etc. The implementation is free to handle the request as it sees fit.

Reimplemented from teca_algorithm.

◆ get_bind_threads()

const int& teca_threaded_algorithm::get_bind_threads ( ) const
inline

Get the value of the bind_threads algorithm property

◆ get_class_name()

const char* teca_threaded_algorithm::get_class_name ( ) const
inlineoverridevirtual

returns the name of the class

Implements teca_algorithm.

Reimplemented in teca_cf_writer, teca_cf_time_axis_data_reduce, and teca_threaded_programmable_algorithm.

◆ get_poll_interval()

const long long& teca_threaded_algorithm::get_poll_interval ( ) const
inline

Get the value of the poll_interval algorithm property

◆ get_propagate_device_assignment()

const int& teca_threaded_algorithm::get_propagate_device_assignment ( ) const
inline

Get the value of the propagate_device_assignment algorithm property

◆ get_ranks_per_device()

const int& teca_threaded_algorithm::get_ranks_per_device ( ) const
inline

Get the value of the ranks_per_device algorithm property

◆ get_stream_size()

const int& teca_threaded_algorithm::get_stream_size ( ) const
inline

Get the value of the stream_size algorithm property

◆ get_thread_pool_size()

unsigned int teca_threaded_algorithm::get_thread_pool_size ( ) const
noexcept

Get the number of threads in the pool.

◆ get_threads_per_device()

const int& teca_threaded_algorithm::get_threads_per_device ( ) const
inline

Get the value of the threads_per_device algorithm property

◆ New()

static p_teca_threaded_algorithm teca_threaded_algorithm::New ( )
inlinestatic

Returns an instance of teca_threaded_algorithm

◆ set_bind_threads()

void teca_threaded_algorithm::set_bind_threads ( const int &  v)
inline

Set the value of the bind_threads algorithm property

◆ set_data_request_queue()

void teca_threaded_algorithm::set_data_request_queue ( const p_teca_data_request_queue &  queue)

explicitly set the thread pool to submit requests to

◆ set_poll_interval()

void teca_threaded_algorithm::set_poll_interval ( const long long &  v)
inline

Set the value of the poll_interval algorithm property

◆ set_propagate_device_assignment()

void teca_threaded_algorithm::set_propagate_device_assignment ( const int &  v)
inline

Set the value of the propagate_device_assignment algorithm property

◆ set_ranks_per_device()

void teca_threaded_algorithm::set_ranks_per_device ( const int &  v)
inline

Set the value of the ranks_per_device algorithm property

◆ set_stream_size()

void teca_threaded_algorithm::set_stream_size ( const int &  v)
inline

Set the value of the stream_size algorithm property

◆ set_thread_pool_size()

void teca_threaded_algorithm::set_thread_pool_size ( int  n_threads)

Set the number of threads in the pool. setting to -1 results in a thread per core factoring in all MPI ranks running on the node.

◆ set_threads_per_device()

void teca_threaded_algorithm::set_threads_per_device ( const int &  v)
inline

Set the value of the threads_per_device algorithm property

◆ shared_from_this() [1/2]

std::shared_ptr< teca_threaded_algorithm > teca_threaded_algorithm::shared_from_this ( )
inline

Enables the static constructor

◆ shared_from_this() [2/2]

std::shared_ptr< teca_threaded_algorithm const> teca_threaded_algorithm::shared_from_this ( ) const
inline

Enables the static constructor


The documentation for this class was generated from the following file: