TECA
The Toolkit for Extreme Climate Analysis
teca_cf_writer.h
1 #ifndef teca_cf_writer_h
2 #define teca_cf_writer_h
3 
4 #include "teca_config.h"
5 #include "teca_shared_object.h"
6 #include "teca_threaded_algorithm.h"
7 #include "teca_metadata.h"
8 
9 #include <vector>
10 #include <string>
11 
12 TECA_SHARED_OBJECT_FORWARD_DECL(teca_cf_writer)
13 
14 /// A writer for Cartesian meshes in NetCDF CF2 format.
15 /**
16  * Writes data to NetCDF CF2 format. This algorithm is conceptually an
17  * execution engine capable of driving the above pipeline with our without
18  * threads and stream results in the order that they are generated placing them
19  * in the correct location in the output dataset. The output dataset is a
20  * collection of files each with a user specified number of time steps per
21  * file. The output dataset may be arranged using a fixed number of steps per
22  * file or daily, monthly, seasonal, or yearly file layouts. The total number
23  * of time steps in the output dataset is determined by the combination of the
24  * number of time steps in the input dataset and user defined subsetting if
25  * any. The writer uses MPI collective I/O to produce the files. In parallel
26  * time steps are mapped to ranks such that each rank has approximately the
27  * same number of time steps. Incoming steps are mapped to files. A given MPI
28  * rank may be writing to multiple files. The use of MPI collectives implies
29  * care must be taken in its use to avoid deadlocks.
30  *
31  * Due to the use of MPI collectives I/O certain information must be known
32  * during the report phase of pipeline execution, before the execute phase of
33  * pipeline execution begins. The information that is needed is:
34  *
35  * ### number of time steps ###
36  * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
37  * specified by the pipeline control index_initializer key found in metadata
38  * produced by the source (e.g CF reader)
39  * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
40  *
41  * ### extent ###
42  * ~~~~~~~~~~~~~~
43  * 6, 64 bit integers defining the 3 spatial dimensions of each timestep found
44  * in metadata produced by the source (e.g CF reader)
45  * ~~~~~~~~~~~~~~
46  *
47  * ### point arrays ###
48  * ~~~~~~~~~~~~~~~~~~~~
49  * list of strings naming the point centered arrays that will be written. set
50  * by the user prior to execution by writer properties.
51  * ~~~~~~~~~~~~~~~~~~~~
52  *
53  * ### information arrays ###
54  * ~~~~~~~~~~~~~~~~~~~~~~~~~~
55  * list of strings naming the non-geometric arrays that will written. set by
56  * the user prior to execution by writer properties. See also size attribute
57  * below.
58  * ~~~~~~~~~~~~~~~~~~~~~~~~~~
59  *
60  * ### type_code ###
61  * ~~~~~~~~~~~~~~~~~
62  * the teca_variant_array_code naming the type of each array. this will be in
63  * the array attributes metadata generated by the producer of the array (e.g
64  * any algorithm that adds an array should provide this metadata).
65  * ~~~~~~~~~~~~~~~~~
66  *
67  * ### size ###
68  * ~~~~~~~~~~~~
69  * a 64 bit integer declaring the size of each information array. this will be
70  * in the array attributes metadata generated by the producer of the array (e.g
71  * any algorithm that adds an array should provide this metadata).
72  * ~~~~~~~~~~~~
73  */
75 {
76 public:
77  TECA_ALGORITHM_STATIC_NEW(teca_cf_writer)
78  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_cf_writer)
79  TECA_ALGORITHM_CLASS_NAME(teca_cf_writer)
80  ~teca_cf_writer();
81 
82  // report/initialize to/from Boost program options
83  // objects.
84  TECA_GET_ALGORITHM_PROPERTIES_DESCRIPTION()
85  TECA_SET_ALGORITHM_PROPERTIES()
86 
87  /** @name file_name
88  * Set the output filename. For time series the substring %t% is replaced
89  * with the current time step or date. See comments on date_format below
90  * for info about date formatting.
91  */
92  ///@{
93  TECA_ALGORITHM_PROPERTY(std::string, file_name)
94  ///@}
95 
96 
97  /** @name date_format
98  * set the format for the date to write in the filename. this requires the
99  * input dataset to have unit/calendar information if none are available,
100  * the time index is used instead. (%F-%HZ)
101  */
102  ///@{
103  TECA_ALGORITHM_PROPERTY(std::string, date_format)
104  ///@}
105 
106  /** @name first_step
107  * Set the first step in the range of time step to process.
108  */
109  ///@{
110  TECA_ALGORITHM_PROPERTY(long, first_step)
111  ///@}
112 
113  /** @name last_step
114  * Set the last step in the range of time step to process.
115  */
116  ///@{
117  TECA_ALGORITHM_PROPERTY(long, last_step)
118  ///@}
119 
120  /** @name layout
121  * Set the layout mode to one of : number_of_steps, daily, monthly,
122  * seasonal, or yearly. This controls the size of the files written. In
123  * daily, monthly, seasonal, and yearly modes each file will contain the
124  * steps spanning the given duration. The number_of_steps mode writes a
125  * fixed number of steps per file which can be set using the
126  * steps_per_file property.
127  */
128  ///@{
129  enum {invalid=0, number_of_steps=1, daily=2, monthly=3, seasonal=4, yearly=5};
130  TECA_ALGORITHM_PROPERTY_V(int, layout)
131 
132  void set_layout_to_number_of_steps() { this->set_layout(number_of_steps); }
133  void set_layout_to_daily() { this->set_layout(daily); }
134  void set_layout_to_monthly() { this->set_layout(monthly); }
135  void set_layout_to_seasonal() { this->set_layout(seasonal); }
136  void set_layout_to_yearly() { this->set_layout(yearly); }
137 
138  /// set the layout mode from a string.
139  int set_layout(const std::string &layout);
140 
141  /// @returns 0 if the passed value is a valid layout mode
142  int validate_layout(int mode)
143  {
144  if ((mode == number_of_steps) || (mode == daily) ||
145  (mode == monthly) || (mode == seasonal) || (mode == yearly))
146  return 0;
147 
148  TECA_ERROR("Invalid layout mode " << mode)
149  return -1;
150  }
151 
152  /// @returns a string representation of the current layout
153  const char *get_layout_name() const;
154  ///@}
155 
156  /** @name steps_per_file
157  * Set how many time steps are written to each file when the layout mode is
158  * set to number_of_steps.
159  */
160  ///@{
161  TECA_ALGORITHM_PROPERTY(unsigned int, steps_per_file)
162  ///@}
163 
164  /** @name mode_flags
165  * sets the flags passed to NetCDF during file creation. (NC_CLOBBER)
166  */
167  ///@{
168  TECA_ALGORITHM_PROPERTY(int, mode_flags)
169  ///@}
170 
171  /** @name use_unlimited_dim
172  * if set the slowest varying dimension is specified to be NC_UNLIMITED.
173  * This has a negative impact on performance when reading the values in a
174  * single pass. However, unlimited dimensions are used ubiquitously thus
175  * by default it is set. For data being consumed by TECA performance will
176  * be better when using fixed dimensions. (1) This feature requires
177  * collective writes and is incompatible with out of order execution,
178  * and hence currently not supported.
179  */
180  ///@{
181  TECA_ALGORITHM_PROPERTY(int, use_unlimited_dim)
182  ///@}
183 
184  /** @name compression_level
185  * sets the compression level used for each variable compression is not
186  * used if the value is less than or equal to 0. This feature requires
187  * collective writes and is incompatible with out of order execution,
188  * and hence currently not supported.
189  */
190  ///@{
191  TECA_ALGORITHM_PROPERTY(int, compression_level)
192  ///@}
193 
194  /** @name collective_buffer
195  * Enables MPI I/O colective buffering. Collective buffering is only valid
196  * when the spatial partitioner is enabled and the number of spatial
197  * partitions is equal to the number of MPI ranks. If set to -1 (the
198  * default) collective buffering will automatically enabled when it is
199  * possible to do so.
200  */
201  ///@{
202  TECA_ALGORITHM_PROPERTY(int, collective_buffer)
203  ///@}
204 
205  /** @name flush_files
206  * Flush files before closing them, this may be necessary if accessing data
207  * immediately.
208  */
209  ///@{
210  TECA_ALGORITHM_PROPERTY(int, flush_files)
211  ///@}
212 
213  /** @name move_variables_to_root
214  * Move variables to root instead of creating groups
215  */
216  ///@{
217  TECA_ALGORITHM_PROPERTY(int, move_variables_to_root)
218  ///@}
219 
220  /** @name point_array
221  * Specify the arrays to write. A data array is only written to disk if
222  * it is included in this list. It is an error to not specify at least
223  * one point centered array to write
224  */
225  ///@{
226  TECA_ALGORITHM_VECTOR_PROPERTY(std::string, point_array)
227  ///@}
228 
229  /** @name information_array
230  * Set the list of non-geometric arrays to write.
231  */
232  ///@{
233  TECA_ALGORITHM_VECTOR_PROPERTY(std::string, information_array)
234  ///@}
235 
236  /** @name spatial_partitioner
237  * Enable spatial partitioner, When spatial partitioner is enabled both
238  * temporal and spatial dimensions of input data are partitioned for load
239  * balancing. The partitioner is controled by the
240  * number_of_spatial_partitions, number_of_temporal_partitions and,
241  * temporal_partition_size properties.
242  */
243  ///@{
244  enum
245  {
246  temporal, ///< map time steps to MPI ranks
247  spatial, ///< map spatial extents to MPI ranks, time is processed sequentially
248  space_time ///< both spatial and temporal extents to MPI ranks
249  };
250 
251  TECA_ALGORITHM_PROPERTY_V(int, partitioner)
252 
253  /// set the partitioner from a string
254  void set_partitioner(const std::string &part);
255 
256  /// enables temporal partitioner
257  void set_partitioner_to_temporal() { this->set_partitioner(temporal); }
258 
259  /// enables spatial partitioner
260  void set_partitioner_to_spatial() { this->set_partitioner(spatial); }
261 
262  /// enables space-time partitioner
263  void set_partitioner_to_space_time() { this->set_partitioner(space_time); }
264 
265  /// @returns 0 if the passed value is a valid partitioner mode
266  int validate_partitioner(int mode)
267  {
268  if ((mode == temporal) || (mode == spatial) || (mode ==space_time))
269  return 0;
270 
271  TECA_ERROR("Invalid partitioner mode " << mode)
272  return -1;
273  }
274 
275  /// @returns the name of the current partitioner mode
276  const char *get_partitioner_name() const;
277  ///@}
278 
279  /** @name number_of_spatial_partitions
280  * Set the number of spatial partitions. If less than one then the number of
281  * MPI ranks is used.
282  */
283  ///@{
284  TECA_ALGORITHM_PROPERTY(long, number_of_spatial_partitions)
285  ///@}
286 
287  /** @name partition_x
288  * enables/disables spatial partitioning in the x-direction
289  */
290  ///@{
291  TECA_ALGORITHM_PROPERTY(int, partition_x)
292  ///@}
293 
294  /** @name partition_y
295  * enables/disables spatial partitioning in the y-direction
296  */
297  ///@{
298  TECA_ALGORITHM_PROPERTY(int, partition_y)
299  ///@}
300 
301  /** @name partition_z
302  * enables/disables spatial partitioning in the z-direction
303  */
304  ///@{
305  TECA_ALGORITHM_PROPERTY(int, partition_z)
306  ///@}
307 
308  /** @name minimum_block_size_x
309  * Sets the minimum block size for spatial partitioning in the x-direction
310  */
311  ///@{
312  TECA_ALGORITHM_PROPERTY(long, minimum_block_size_x)
313  ///@}
314 
315  /** @name minimum_block_size_y
316  * Sets the minimum block size for spatial partitioning in the y-direction
317  */
318  ///@{
319  TECA_ALGORITHM_PROPERTY(long, minimum_block_size_y)
320  ///@}
321 
322  /** @name minimum_block_size_z
323  * Sets the minimum block size for spatial partitioning in the z-direction
324  */
325  ///@{
326  TECA_ALGORITHM_PROPERTY(long, minimum_block_size_z)
327  ///@}
328  //
329  /** @name number_of_temporal_partitions
330  * Set the number of temporal partitions. If set to less than one then the
331  * number of time steps is used. The temporal_partition_size property takes
332  * precedence, if it is set then the this property is ignored. The default
333  * value is zero.
334  */
335  ///@{
336  TECA_ALGORITHM_PROPERTY(long, number_of_temporal_partitions)
337  ///@}
338 
339  /** @name temporal_partition_size
340  * Set the size of the temporal partitions. If set to less than one then the
341  * number_of_temporal_partition property is used instead. The default value is
342  * zero.
343  */
344  ///@{
345  TECA_ALGORITHM_PROPERTY(long, temporal_partition_size)
346  ///@}
347 
348  /** @name index_executive_compatibility
349  * If set and spatial partitioner is enabled, the writer will make one
350  * request per time step using the index_request_key as the
351  * teca_index_executive would. This could be used parallelize existing
352  * algorithms over space and time.
353  */
354  ///@{
355  TECA_ALGORITHM_PROPERTY(int, index_executive_compatability)
356  ///@}
357 
358 protected:
359  teca_cf_writer();
360 
361 private:
362  using teca_algorithm::get_output_metadata;
363  using teca_algorithm::execute;
364 
365  const_p_teca_dataset execute(unsigned int port,
366  const std::vector<const_p_teca_dataset> &input_data,
367  const teca_metadata &request, int streaming) override;
368 
369  teca_metadata get_output_metadata(unsigned int port,
370  const std::vector<teca_metadata> &input_md) override;
371 
372  std::vector<teca_metadata> get_upstream_request(unsigned int port,
373  const std::vector<teca_metadata> &input_md,
374  const teca_metadata &request) override;
375 
376  // flush data to disk. this may be necessary if accessing data
377  // immediately.
378  int flush();
379 
380 private:
381  std::string file_name;
382  std::string date_format;
383  long number_of_spatial_partitions;
384  int partition_x;
385  int partition_y;
386  int partition_z;
387  long minimum_block_size_x;
388  long minimum_block_size_y;
389  long minimum_block_size_z;
390  long number_of_temporal_partitions;
391  long temporal_partition_size;
392  long first_step;
393  long last_step;
394  int layout;
395  int partitioner;
396  int index_executive_compatability;
397  unsigned int steps_per_file;
398  int mode_flags;
399  int use_unlimited_dim;
400  int collective_buffer;
401  int compression_level;
402  int flush_files;
403  int move_variables_to_root;
404 
405  std::vector<std::string> point_arrays;
406  std::vector<std::string> information_arrays;
407 
408  class internals_t;
409  internals_t *internals;
410 };
411 
412 #endif
The interface to TECA pipeline architecture.
Definition: teca_algorithm.h:244
A writer for Cartesian meshes in NetCDF CF2 format.
Definition: teca_cf_writer.h:75
const char * get_partitioner_name() const
void set_partitioner_to_spatial()
enables spatial partitioner
Definition: teca_cf_writer.h:260
void set_partitioner_to_space_time()
enables space-time partitioner
Definition: teca_cf_writer.h:263
int validate_partitioner(int mode)
Definition: teca_cf_writer.h:266
@ spatial
map spatial extents to MPI ranks, time is processed sequentially
Definition: teca_cf_writer.h:247
@ temporal
map time steps to MPI ranks
Definition: teca_cf_writer.h:246
const char * get_layout_name() const
int validate_layout(int mode)
Definition: teca_cf_writer.h:142
int set_layout(const std::string &layout)
set the layout mode from a string.
A generic container for meta data in the form of name=value pairs.
Definition: teca_metadata.h:22
This is the base class defining a threaded algorithm.
Definition: teca_threaded_algorithm.h:71
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.
#define TECA_ERROR(_msg)
Constructs an error message and sends it to the stderr stream.
Definition: teca_common.h:161