TECA
The Toolkit for Extreme Climate Analysis
teca_threadsafe_queue.h
1 #ifndef teca_threadsafe_queue_h
2 #define teca_threadsafe_queue_h
3 
4 #include "teca_config.h"
5 
6 #include <mutex>
7 #include <queue>
8 #include <condition_variable>
9 
10 /// A thread safe queue
11 template<typename T>
13 {
14 public:
17  void operator=(const teca_threadsafe_queue<T> &other);
18 
19  // report current size
20  typename std::queue<T>::size_type size() const;
21 
22  // push a value onto the queue
23  void push(const T &val);
24  void push(T &&val);
25 
26  // pop a value from the queue, will block until data is ready.
27  void pop(T &val);
28 
29  // pop a value from the queue if data is present, will return
30  // false if no data is in the queue.
31  bool try_pop(T &val);
32 
33  // swap the contents
34  void swap(teca_threadsafe_queue<T> &other);
35 
36  // clear
37  void clear();
38 
39 private:
40  mutable std::mutex m_mutex;
41  std::queue<T> m_queue;
42  std::condition_variable m_ready;
43  std::condition_variable m_empty;
44 };
45 
46 // --------------------------------------------------------------------------
47 template<typename T>
49  const teca_threadsafe_queue<T> &other)
50 {
51  std::lock_guard<std::mutex> lock(other.m_mutex);
52  std::queue<T> tmp(other.m_queue);
53  m_queue.swap(tmp);
54 }
55 
56 // --------------------------------------------------------------------------
57 template<typename T>
59  const teca_threadsafe_queue<T> &other)
60 {
61  std::lock(m_mutex, other.m_mutex);
62  std::lock_guard<std::mutex> lock(m_mutex, std::adopt_lock);
63  std::lock_guard<std::mutex> lock_other(other.m_mutex, std::adopt_lock);
64  std::queue<T> tmp(other.m_queue);
65  m_queue.swap(tmp);
66 }
67 
68 // --------------------------------------------------------------------------
69 template<typename T>
71 {
72  std::lock(m_mutex, other.m_mutex);
73  std::lock_guard<std::mutex> lock(m_mutex, std::adopt_lock);
74  std::lock_guard<std::mutex> lock_other(other.m_mutex, std::adopt_lock);
75  m_queue.swap(other.m_queue);
76 }
77 
78 // --------------------------------------------------------------------------
79 template<typename T>
80 typename std::queue<T>::size_type teca_threadsafe_queue<T>::size() const
81 {
82  std::lock_guard<std::mutex> lock(m_mutex);
83  return m_queue.size();
84 }
85 
86 // --------------------------------------------------------------------------
87 template<typename T>
88 void teca_threadsafe_queue<T>::push(const T &val)
89 {
90  std::lock_guard<std::mutex> lock(m_mutex);
91  m_queue.push(val);
92  m_ready.notify_one();
93 }
94 
95 // --------------------------------------------------------------------------
96 template<typename T>
98 {
99  std::lock_guard<std::mutex> lock(m_mutex);
100  m_queue.push(std::move(val));
101  m_ready.notify_one();
102 }
103 
104 // --------------------------------------------------------------------------
105 template<typename T>
107 {
108  std::unique_lock<std::mutex> lock(m_mutex);
109  m_ready.wait(lock, [this] { return !m_queue.empty(); });
110  val = std::move(m_queue.front());
111  m_queue.pop();
112 }
113 
114 // --------------------------------------------------------------------------
115 template<typename T>
117 {
118  std::unique_lock<std::mutex> lock(m_mutex);
119  if (m_queue.empty())
120  return false;
121  val = std::move(m_queue.front());
122  m_queue.pop();
123  return true;
124 }
125 
126 // --------------------------------------------------------------------------
127 template<typename T>
129 {
130  std::lock_guard<std::mutex> lock(m_mutex);
131  m_queue.clear();
132 }
133 
134 #endif
teca_threadsafe_queue
A thread safe queue.
Definition: teca_threadsafe_queue.h:12
teca_error::TECA_EXPORT
p_teca_error_handler error_handler TECA_EXPORT
The global error handler instance.