TECA
The Toolkit for Extreme Climate Analysis
teca_index_reduce.h
1 #ifndef teca_index_reduce_h
2 #define teca_index_reduce_h
3 
4 #include "teca_config.h"
5 #include "teca_dataset.h"
6 #include "teca_metadata.h"
7 #include "teca_shared_object.h"
8 #include "teca_threaded_algorithm.h"
9 
10 #include <vector>
11 
12 TECA_SHARED_OBJECT_FORWARD_DECL(teca_index_reduce)
13 
14 /// Base class for MPI+threads+GPUs map reduce reduction over a set of indices.
15 /**
16  * The available indices are partitioned across MPI ranks and threads. Threads
17  * are assigned to service GPUs or CPU cores. One can restrict operation to a
18  * range of time steps by setting first and last indices to process.
19  *
20  * ### metadata keys:
21  * #### Requires:
22  *
23  * | key | description |
24  * | ---- | ----------- |
25  * | index_initializer_key | Holds the name of the key that tells how many |
26  * | | indices are available. The named key must also |
27  * | | be present and should contain the number of |
28  * | | indices available. |
29  * | index_request_key | Holds the name of the key used to request a |
30  * | | specific index. Requests are generated with this |
31  * | | name set to a specific index to be processed |
32  * | | some upstream algorithm is expected to produce |
33  * | | the data associated with the given index. |
34  *
35  * #### Exports:
36  *
37  * | Key | Description |
38  * | ---------------------- | ----------- |
39  * | index_request_key | The name of the key holding the requested index |
40  * | <index_request_key> | the requested index |
41  * | device_id | the CPU (-1) or CUDA device (0 - n-1 devices) to |
42  * | | use for calculations |
43  * | bounds | the [x0 x1 y0 y1 z0 z1] spatial bounds requested |
44  * | | (optional) |
45  * | extent | the [i0 i1 j0 j1 k0 k1] index space grid extent |
46  * | | requested (optional) |
47  * | arrays | a list of arrays requested (optional) |
48  *
49  */
51 {
52 public:
53  using extent_type = std::vector<long>;
54  using bounds_type = std::vector<double>;
55 
56 
57  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_index_reduce)
58  virtual ~teca_index_reduce(){}
59 
60  // report/initialize to/from Boost program options
61  // objects.
62  TECA_GET_ALGORITHM_PROPERTIES_DESCRIPTION()
63  TECA_SET_ALGORITHM_PROPERTIES()
64 
65  /** @name extent
66  * If provided the extent defines the index space region
67  * [i0, i1, j0, j1, k0, k1] of data to request. If not provided the
68  * whole_extent is requested when the whole_extent key is present.
69  * Otherwise the request is not modified.
70  */
71  ///@{
72  TECA_ALGORITHM_PROPERTY(extent_type, extent)
73  ///@}
74 
75  /** @name bounds
76  * If provided the bounds defines the world coordinate space region
77  * [x0, x1, y0, y1, z0, z1] of data to request. bounds take precedence over
78  * extents. If bounds are not provided any provided extents will be used
79  * (see ::set_extents) otherwise the request is not modified.
80  */
81  ///@{
82  TECA_ALGORITHM_PROPERTY(bounds_type, bounds)
83  ///@}
84 
85  /** @name arrays
86  * set the list of arrays to request. If not provided then the request is
87  * not modified.
88  */
89  ///@{
90  TECA_ALGORITHM_VECTOR_PROPERTY(std::string, array)
91  ///@}
92 
93  /** @name start_index
94  * set the first index to process. Indices go from 0 to n-1. The default
95  * start_index is 0.
96  */
97  ///@{
98  TECA_ALGORITHM_PROPERTY(long, start_index)
99  ///@}
100 
101  /** @name end_index
102  * set the last index to process. Indices go from 0 to n-1. The default
103  * end_index is -1, which is used to indicate all indices should be
104  * processed.
105  */
106  ///@{
107  TECA_ALGORITHM_PROPERTY(long, end_index)
108  ///@}
109 
110 protected:
112 
113 protected:
114 
115  /** An override that implements the reduction. given two datasets a left
116  * and right, reduce into a single dataset and return.
117  *
118  * @param[in] device_id The device that should be used for the reduction.
119  * A value of -1 indicates the CPU should be used.
120  * @param[in] left a dataset to reduce
121  * @param[in] right a dataset to reduce
122  *
123  * @returns the reduced dataset
124  */
125  virtual p_teca_dataset reduce(int device_id,
126  const const_p_teca_dataset &left,
127  const const_p_teca_dataset &right) = 0;
128 
129  /** An override that is called when the reduction is complete. The default
130  * implementation passes data through. This might be used for instance to
131  * complete an averaging operation where the ::reduce override sums the
132  * data and the ::finalize override scales by 1/N, where N is the number of
133  * datasets summed.
134  *
135  * @param[in] device_id The device that should be used for the reduction.
136  * A value of -1 indicates the CPU should be used.
137  * @param[in] ds the reduced dataset
138  *
139  * @returns a dataset that has been finalized.
140  */
141  virtual p_teca_dataset finalize(int device_id,
142  const const_p_teca_dataset &ds)
143  {
144  (void) device_id;
145  return std::const_pointer_cast<teca_dataset>(ds);
146  }
147 
148  /** An override that allows derived classes to generate upstream requests
149  * that will be applied over all time steps. derived classes implement this
150  * method instead of ::get_upstream_request, which here is already
151  * implemented to handle the application of requests over the index set.
152  * The default implementation creates an empty request that is then
153  * populated with extent, bounds, and arrays if these have been provided.
154  */
155  virtual std::vector<teca_metadata> initialize_upstream_request(
156  unsigned int port, const std::vector<teca_metadata> &input_md,
157  const teca_metadata &request);
158 
159  /** An override that allows derived classes to report what they can
160  * produce. this will be called from ::get_output_metadata which will strip
161  * out time and partition time across MPI ranks. The default implementation
162  * passes the incoming metadata through.
163  */
164  virtual teca_metadata initialize_output_metadata(unsigned int port,
165  const std::vector<teca_metadata> &input_md);
166 
167 protected:
168 // customized pipeline behavior and parallel code.
169 // most derived classes won't need to override these.
170 
172  using teca_threaded_algorithm::execute;
173 
174  /** Generates an upstream request for each index. will call
175  * initialize_upstream_request and apply the results to all time steps.
176  */
177  std::vector<teca_metadata> get_upstream_request(
178  unsigned int port, const std::vector<teca_metadata> &input_md,
179  const teca_metadata &request) override;
180 
181  /** Uses MPI communication to collect remote data for required for the
182  * reduction. Calls ::reduce with each pair of datasets until the datasets
183  * across all threads and ranks are reduced into a single dataset, which is
184  * returned.
185  */
186  const_p_teca_dataset execute(unsigned int port,
187  const std::vector<const_p_teca_dataset> &input_data,
188  const teca_metadata &request, int streaming) override;
189 
190  /// consumes index metadata, and partitions indices across MPI ranks.
192  const std::vector<teca_metadata> &input_md) override;
193 
194 private:
195  /** The driver for reducing the local datasets. Calls the ::reduce override as
196  * needed.
197  */
198  const_p_teca_dataset reduce_local(int device_id,
199  std::vector<const_p_teca_dataset> local_data);
200 
201  /** The driver for reducing the remote datasets. Calls the ::reduce override
202  * as needed.
203  */
204  const_p_teca_dataset reduce_remote(int device_id,
205  const_p_teca_dataset local_data);
206 
207 private:
208  extent_type extent;
209  bounds_type bounds;
210  std::vector<std::string> arrays;
211  long start_index;
212  long end_index;
213 };
214 
215 #endif
virtual teca_metadata get_output_metadata(unsigned int port, const std::vector< teca_metadata > &input_md)
Base class for MPI+threads+GPUs map reduce reduction over a set of indices.
Definition: teca_index_reduce.h:51
std::vector< teca_metadata > get_upstream_request(unsigned int port, const std::vector< teca_metadata > &input_md, const teca_metadata &request) override
const_p_teca_dataset execute(unsigned int port, const std::vector< const_p_teca_dataset > &input_data, const teca_metadata &request, int streaming) override
virtual teca_metadata initialize_output_metadata(unsigned int port, const std::vector< teca_metadata > &input_md)
teca_metadata get_output_metadata(unsigned int port, const std::vector< teca_metadata > &input_md) override
consumes index metadata, and partitions indices across MPI ranks.
virtual std::vector< teca_metadata > initialize_upstream_request(unsigned int port, const std::vector< teca_metadata > &input_md, const teca_metadata &request)
A generic container for meta data in the form of name=value pairs.
Definition: teca_metadata.h:22
This is the base class defining a threaded algorithm.
Definition: teca_threaded_algorithm.h:71
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.
TECA_EXPORT bool left(n_t e0x, n_t e0y, n_t e1x, n_t e1y, n_t px, n_t py)
tests if a point is Left|On|Right of an infinite line.
Definition: teca_geometry.h:29