TECA
The Toolkit for Extreme Climate Analysis
teca_cf_time_step_mapper.h
1 #ifndef teca_cf_time_step_mapper_h
2 #define teca_cf_time_step_mapper_h
3 
4 #include "teca_config.h"
5 #include "teca_metadata.h"
6 #include "teca_cf_layout_manager.h"
7 #include "teca_mpi.h"
8 
9 #include <iostream>
10 #include <sstream>
11 #include <cstring>
12 #include <cerrno>
13 #include <string>
14 #include <unordered_map>
15 #include <vector>
16 #include <memory>
17 
19 using p_teca_cf_time_step_mapper = std::shared_ptr<teca_cf_time_step_mapper>;
20 
21 /// Defines the interface for mapping time steps to files
23 {
24 public:
25  virtual ~teca_cf_time_step_mapper() {}
26 
27  /// returns true if the mapper has been successfully initialized
28  virtual bool initialized() { return this->file_comms.size(); }
29 
30  /** close all files, destroy file managers, and release communicators
31  * this should be done once all I/O is complete.
32  */
33  virtual int finalize();
34 
35  /// construct requests for this rank
36  virtual int get_upstream_requests(teca_metadata base_req,
37  std::vector<teca_metadata> &up_reqs);
38 
39  /// given a time step returns the associated layout manager
40  virtual p_teca_cf_layout_manager get_layout_manager(long time_step)
41  { (void) time_step; return nullptr; }
42 
43  /** given an inclusive range of time steps, get the corresponding layout
44  * managers that can be used to create, define and write the data to disk.
45  * When more than one manager is returned, each should be passed the data,
46  * giving it a chance to write the subset it is responsible for to disk.
47  *
48  * @param[in] temporal_extent the first and last step that will be written
49  * @param[out] manager a vector of teca_cf_layout_manager instances
50  * that need to be called to write the data.
51  *
52  * @returns zero if successful.
53  */
54  virtual int get_layout_manager(const unsigned long temporal_extent[2],
55  std::vector<p_teca_cf_layout_manager> &managers)
56  {
57  // forward to the single step implentation
58  managers.push_back(this->get_layout_manager(temporal_extent[0]));
59  return managers.back() ? 0 : -1;
60  }
61 
62  /// print a summary to the stream
63  virtual int to_stream(std::ostream &os) = 0;
64 
65  /** call the passed in functor once per file table entry, safe
66  * for MPI collective operations. The required functor signature
67  * is:
68  * int f(long file_id, teca_cf_layout_manager &manager)
69  *
70  * a return of non-zero from the functor will immediately stop the
71  * apply and the value will be returned, but no error will be
72  * reported.
73  */
74  template<typename op_t>
75  int file_table_apply(const op_t &op);
76 
77 protected:
78  teca_cf_time_step_mapper() : index_initializer_key(""),
79  index_request_key(""), start_time_step(0), end_time_step(-1),
80  n_files(0)
81  {}
82 
83  // remove these for convenience
86  void operator=(const teca_cf_time_step_mapper&) = delete;
87  void operator=(const teca_cf_time_step_mapper&&) = delete;
88 
89  /// create/free the per-file communicators
91  int free_file_comms();
92 
93 protected:
94  /// communicator to partition into per-file communicators
95  MPI_Comm comm;
96 
97  /// pipeline control key names
98  std::string index_initializer_key;
99  std::string index_request_key;
100 
101  /// user provided overrides
103  long end_time_step;
104 
105  /// time_steps to request by rank
107  std::vector<long> block_size;
108  std::vector<long> block_start;
109 
110  /// output files
111  long n_files;
112  std::vector<std::set<int>> file_ranks;
113 
114  /// per file communicators
115  std::vector<MPI_Comm> file_comms;
116 
117  /// the file table maps from a time step to a specific layout manager
118  using file_table_t = std::unordered_map<long, p_teca_cf_layout_manager>;
119  file_table_t file_table;
120 };
121 
122 
123 // --------------------------------------------------------------------------
124 template<typename op_t>
126 {
127  for (long i = 0; i < this->n_files; ++i)
128  {
129  MPI_Comm comm_i = this->file_comms[i];
130  if (comm_i != MPI_COMM_NULL)
131  {
132  if (int ierr = op(comm, i, this->file_table[i]))
133  return ierr;
134  }
135  }
136  return 0;
137 }
138 
139 #endif
Defines the interface for mapping time steps to files.
Definition: teca_cf_time_step_mapper.h:23
std::string index_initializer_key
pipeline control key names
Definition: teca_cf_time_step_mapper.h:98
long n_files
output files
Definition: teca_cf_time_step_mapper.h:111
std::unordered_map< long, p_teca_cf_layout_manager > file_table_t
the file table maps from a time step to a specific layout manager
Definition: teca_cf_time_step_mapper.h:118
long n_time_steps
time_steps to request by rank
Definition: teca_cf_time_step_mapper.h:106
std::vector< MPI_Comm > file_comms
per file communicators
Definition: teca_cf_time_step_mapper.h:115
MPI_Comm comm
communicator to partition into per-file communicators
Definition: teca_cf_time_step_mapper.h:95
int alloc_file_comms()
create/free the per-file communicators
virtual bool initialized()
returns true if the mapper has been successfully initialized
Definition: teca_cf_time_step_mapper.h:28
long start_time_step
user provided overrides
Definition: teca_cf_time_step_mapper.h:102
virtual int get_layout_manager(const unsigned long temporal_extent[2], std::vector< p_teca_cf_layout_manager > &managers)
Definition: teca_cf_time_step_mapper.h:54
virtual int to_stream(std::ostream &os)=0
print a summary to the stream
int file_table_apply(const op_t &op)
Definition: teca_cf_time_step_mapper.h:125
virtual p_teca_cf_layout_manager get_layout_manager(long time_step)
given a time step returns the associated layout manager
Definition: teca_cf_time_step_mapper.h:40
virtual int get_upstream_requests(teca_metadata base_req, std::vector< teca_metadata > &up_reqs)
construct requests for this rank
A generic container for meta data in the form of name=value pairs.
Definition: teca_metadata.h:22
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.