atomic_queue.h
1 /***************************************************************************
2  Copyright (C) 2002-2015 Kentaro Kitagawa
3  kitagawa@phys.s.u-tokyo.ac.jp
4 
5  This program is free software; you can redistribute it and/or
6  modify it under the terms of the GNU Library General Public
7  License as published by the Free Software Foundation; either
8  version 2 of the License, or (at your option) any later version.
9 
10  You should have received a copy of the GNU Library General
11  Public License and a list of authors along with this program;
12  see the files COPYING and AUTHORS.
13 ***************************************************************************/
14 #ifndef ATOMIC_QUEUE_H_
15 #define ATOMIC_QUEUE_H_
16 
17 #include "atomic.h"
18 #include <memory>
19 #include <string.h>
20 
21 //! Atomic FIFO with a pre-defined size for POD-type data of non-zero values (e.g. pointers).
22 //! \sa atomic_queue, atomic_pointer_queue
23 template <typename T, unsigned int SIZE, typename const_ref = T>
25 public:
26  struct nospace_error {};
27 
28  atomic_nonzero_pod_queue() : m_pFirst(m_ptrs), m_pLast(m_ptrs), m_count(0) {
29  memset(m_ptrs, 0, SIZE * sizeof(T));
30  }
31 
32  void push(T t) {
33  if( !atomicPush(t))
34  throw nospace_error();
35  }
36 
37  //! This is not reentrant.
38  void pop() {
39  *m_pFirst = 0;
40  --m_count;
41  }
42  //! This is not reentrant.
43  T front() {
44  atomic<T> *first = m_pFirst;
45  while((T)*first == 0) {
46  first++;
47  if(first == &m_ptrs[SIZE]) {
48  first = m_ptrs;
49  }
50  }
51  m_pFirst = first;
52  return *first;
53  }
54  //! This is not reentrant.
55  bool empty() const {
56  return m_count == 0;
57  }
58  unsigned int size() const {
59  return m_count;
60  }
61 
62  //! Tries to push an item.
63  //! \param item to be added.
64  //! \return true if succeeded.
65  bool atomicPush(T t) {
66  assert(t);//has to be nonzero.
67  //m_count++ atomically, and the room is reserved.
68  for(;;) {
69  unsigned int x = m_count;
70  if(x == SIZE) {
71  if(m_count == SIZE)
72  return false;
73  continue;
74  }
75  if(m_count.compare_set_strong(x, x + 1)) {
76  break;
77  }
78  }
79  for(;;) {
80  atomic<T> *last = m_pLast;
81  atomic<T> *last_org = last;
82  //finds zero.
83  while((T)*last != 0) {
84  last++;
85  if(last == &m_ptrs[SIZE]) {
86  last = m_ptrs;
87  }
88  }
89  //tags the end of the queue.
90  if(m_pLast.compare_set_strong(last_org, last)) {
91  //CAS from zero to the item.
92  if(last->compare_set_strong((T)0, t)) {
93  break;
94  }
95  }
96  }
97  return true;
98  }
99  //! Tries to pop the front item.
100  //! \param item to be released.
101  //! \return true if succeeded.
102  bool atomicPop(const_ref item) {
103  assert(item);
104  atomic<T> *first = m_pFirst;
105  if(first->compare_set_strong((T)item, (T)0)) {
106  --m_count;
107  return true;
108  }
109  return false;
110  }
111  //! Tries to obtain the front item.
112  const_ref atomicFront() {
113  for(;;) {
114  if(empty())
115  return 0L;
116  atomic<T> *first = m_pFirst;
117  atomic<T> *first_org = first;
118  for(;;) {
119  const_ref t = *first;
120  if(t) {
121  if(m_pFirst.compare_set_strong(first_org, first))
122  return t;
123  break;
124  }
125  first++;
126  if(first == &m_ptrs[SIZE]) {
127  first = m_ptrs;
128  if(empty())
129  return 0L;
130  }
131  }
132  }
133  }
134  T atomicPopAny() {
135  if(empty())
136  return 0L;
137  atomic<T> *first = m_pFirst;
138  for(;;) {
139  if( *first) {
140  T obj = first->exchange((T)0);
141  if(obj) {
142  m_pFirst = first;
143  --m_count;
144  return obj;
145  }
146  }
147  first++;
148  if(first == &m_ptrs[SIZE]) {
149  first = m_ptrs;
150  if(empty())
151  return 0L;
152  }
153  }
154  }
155 private:
156  atomic<T> m_ptrs[SIZE];
157  atomic<atomic<T> *> m_pFirst;
158  atomic<atomic<T> *> m_pLast;
159  atomic<unsigned int> m_count;
160 };
161 
162 //! Atomic FIFO with a pre-defined size for pointers.
163 template <typename T, unsigned int SIZE>
164 class atomic_pointer_queue : public atomic_nonzero_pod_queue<T*, SIZE, const T*> {};
165 
166 //! Atomic FIFO with a pre-defined size for copy-constructable class.
167 template <typename T, unsigned int SIZE>
169 public:
170  typedef typename atomic_pointer_queue<T, SIZE>::nospace_error nospace_error;
171 
172  atomic_queue() {}
173  ~atomic_queue() {
174  while( !empty()) pop();
175  }
176 
177  void push(const T&t) {
178  T *obj = new T(t);
179  try {
180  m_queue.push(obj);
181  }
182  catch (nospace_error &e) {
183  delete obj;
184  throw e;
185  }
186  }
187  //! This is not reentrant.
188  void pop() {
189  delete m_queue.front();
190  m_queue.pop();
191  }
192  //! This is not reentrant.
193  T &front() {
194  return *m_queue.front();
195  }
196  //! This is not reentrant.
197  bool empty() const {
198  return m_queue.empty();
199  }
200  unsigned int size() const {
201  return m_queue.size();
202  }
203 private:
205 };
206 
207 //! Atomic FIFO of a pre-defined size for copy-able class.
208 template <typename T, unsigned int SIZE>
210 public:
211  typedef typename atomic_pointer_queue<T, SIZE>::nospace_error nospace_error;
212  typedef uint_cas_max key;
213 
215  static_assert(SIZE < (1uLL << (sizeof(key) * 8 - 8)), "Size mismatch.");
216  for(unsigned int i = 0; i < SIZE; i++) {
217  m_reservoir.push(key_index_serial(i, 0));
218  }
219  }
221  while(!empty()) pop();
222  assert(m_reservoir.size() == SIZE);
223  }
224 
225  void push(const T&t) {
226  key pack = m_reservoir.atomicPopAny();
227  if( !pack)
228  throw nospace_error();
229  int idx = key2index(pack);
230  m_array[idx] = t;
231  int serial = key2serial(pack) + 1;
232  pack = key_index_serial(idx, serial);
233  try {
234  m_queue.push(pack);
235  }
236  catch (nospace_error&e) {
237  try {
238  m_reservoir.push(pack);
239  }
240  catch (nospace_error&) {
241  abort();
242  }
243  throw e;
244  }
245  }
246  //! This is not reentrant.
247  void pop() {
248  key pack = m_queue.front();
249  m_queue.pop();
250  try {
251  m_reservoir.push(pack);
252  }
253  catch (nospace_error&) {
254  abort();
255  }
256  }
257  //! This is not reentrant.
258  T &front() {
259  return m_array[key2index(m_queue.front())];
260  }
261  //! This is not reentrant.
262  bool empty() const {
263  return m_queue.empty();
264  }
265  unsigned int size() const {
266  return m_queue.size();
267  }
268 
269  //! Try to pop the front item.
270  //! \param item to be released.
271  //! \return true if succeeded.
272  bool atomicPop(key item) {
273  if(m_queue.atomicPop(item)) {
274  try {
275  m_reservoir.push(item);
276  }
277  catch (nospace_error&) {
278  abort();
279  }
280  return true;
281  }
282  return false;
283  }
284  //! Try to obtain the front item.
285  key atomicFront(T *val) {
286  key pack = m_queue.atomicFront();
287  if(pack)
288  *val = m_array[key2index(pack)];
289  return pack;
290  }
291 private:
292  int key2index(key i) {return (unsigned int)i / 0x100;}
293  int key2serial(key i) {return ((unsigned int)i % 0x100) - 1;}
294  key key_index_serial(int index, int serial) {return index * 0x100 + (serial % 0xff) + 1;}
295  atomic_nonzero_pod_queue<key, SIZE> m_queue, m_reservoir;
296  T m_array[SIZE];
297 };
298 
299 #endif /*ATOMIC_QUEUE_H_*/

Generated for KAME4 by  doxygen 1.8.3