TECA
The Toolkit for Extreme Climate Analysis
teca_threadsafe_queue.h
1 #ifndef teca_threadsafe_queue_h
2 #define teca_threadsafe_queue_h
3 
4 #include <mutex>
5 #include <queue>
6 #include <condition_variable>
7 
8 /// A thread safe queue
9 template<typename T>
11 {
12 public:
15  void operator=(const teca_threadsafe_queue<T> &other);
16 
17  // report current size
18  typename std::queue<T>::size_type size() const;
19 
20  // push a value onto the queue
21  void push(const T &val);
22  void push(T &&val);
23 
24  // pop a value from the queue, will block until data is ready.
25  void pop(T &val);
26 
27  // pop a value from the queue if data is present, will return
28  // false if no data is in the queue.
29  bool try_pop(T &val);
30 
31  // swap the contents
32  void swap(teca_threadsafe_queue<T> &other);
33 
34  // clear
35  void clear();
36 
37 private:
38  mutable std::mutex m_mutex;
39  std::queue<T> m_queue;
40  std::condition_variable m_ready;
41  std::condition_variable m_empty;
42 };
43 
44 // --------------------------------------------------------------------------
45 template<typename T>
47  const teca_threadsafe_queue<T> &other)
48 {
49  std::lock_guard<std::mutex> lock(other.m_mutex);
50  std::queue<T> tmp(other.m_queue);
51  m_queue.swap(tmp);
52 }
53 
54 // --------------------------------------------------------------------------
55 template<typename T>
57  const teca_threadsafe_queue<T> &other)
58 {
59  std::lock(m_mutex, other.m_mutex);
60  std::lock_guard<std::mutex> lock(m_mutex, std::adopt_lock);
61  std::lock_guard<std::mutex> lock_other(other.m_mutex, std::adopt_lock);
62  std::queue<T> tmp(other.m_queue);
63  m_queue.swap(tmp);
64 }
65 
66 // --------------------------------------------------------------------------
67 template<typename T>
69 {
70  std::lock(m_mutex, other.m_mutex);
71  std::lock_guard<std::mutex> lock(m_mutex, std::adopt_lock);
72  std::lock_guard<std::mutex> lock_other(other.m_mutex, std::adopt_lock);
73  m_queue.swap(other.m_queue);
74 }
75 
76 // --------------------------------------------------------------------------
77 template<typename T>
78 typename std::queue<T>::size_type teca_threadsafe_queue<T>::size() const
79 {
80  std::lock_guard<std::mutex> lock(m_mutex);
81  return m_queue.size();
82 }
83 
84 // --------------------------------------------------------------------------
85 template<typename T>
86 void teca_threadsafe_queue<T>::push(const T &val)
87 {
88  std::lock_guard<std::mutex> lock(m_mutex);
89  m_queue.push(val);
90  m_ready.notify_one();
91 }
92 
93 // --------------------------------------------------------------------------
94 template<typename T>
96 {
97  std::lock_guard<std::mutex> lock(m_mutex);
98  m_queue.push(std::move(val));
99  m_ready.notify_one();
100 }
101 
102 // --------------------------------------------------------------------------
103 template<typename T>
105 {
106  std::unique_lock<std::mutex> lock(m_mutex);
107  m_ready.wait(lock, [this] { return !m_queue.empty(); });
108  val = std::move(m_queue.front());
109  m_queue.pop();
110 }
111 
112 // --------------------------------------------------------------------------
113 template<typename T>
115 {
116  std::unique_lock<std::mutex> lock(m_mutex);
117  if (m_queue.empty())
118  return false;
119  val = std::move(m_queue.front());
120  m_queue.pop();
121  return true;
122 }
123 
124 // --------------------------------------------------------------------------
125 template<typename T>
127 {
128  std::lock_guard<std::mutex> lock(m_mutex);
129  m_queue.clear();
130 }
131 
132 #endif
teca_threadsafe_queue
A thread safe queue.
Definition: teca_threadsafe_queue.h:10