automotive-message-broker  0.13
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Modules Pages
asyncqueue.hpp
1 /*
2  Copyright (C) 2014 Intel Corporation
3 
4  This library is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Lesser General Public
6  License as published by the Free Software Foundation; either
7  version 2.1 of the License, or (at your option) any later version.
8 
9  This library is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  Lesser General Public License for more details.
13 
14  You should have received a copy of the GNU Lesser General Public
15  License along with this library; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <glib.h>
20 
21 #include <abstractpropertytype.h>
22 #include "listplusplus.h"
23 
24 #include <mutex>
25 #include <condition_variable>
26 #include <unordered_set>
27 #include <vector>
28 
29 namespace amb
30 {
31 
32 template <typename T, class Pred = std::equal_to<T> >
33 class Queue
34 {
35 public:
36  Queue(bool unique = false, bool blocking = false)
37  :mUnique(unique), mBlocking(blocking)
38  {
39 
40  }
41 
42  virtual ~Queue()
43  {
44 
45  }
46 
47  int count()
48  {
49  std::lock_guard<std::mutex> lock(mutex);
50 
51  return mQueue.size();
52  }
53 
54  T pop()
55  {
56  std::unique_lock<std::mutex> lock(mutex);
57 
58  if(mBlocking)
59  {
60  if(!mQueue.size())
61  {
62  cond.wait(lock);
63  }
64  }
65 
66  if(!mQueue.size())
67  throw std::runtime_error("nothing in queue");
68 
69  auto itr = mQueue.begin();
70 
71  T item = *itr;
72 
73  mQueue.erase(itr);
74 
75  return item;
76  }
77 
78  virtual void append(T item)
79  {
80  {
81  std::lock_guard<std::mutex> lock(mutex);
82  if(contains(mQueue, item))
83  {
84  mQueue.erase(std::find(mQueue.begin(), mQueue.end(), item));
85  }
86  mQueue.push_back(item);
87  }
88 
89  if(mBlocking)
90  {
91  cond.notify_all();
92  }
93  }
94 
95  void remove(T item)
96  {
97  std::lock_guard<std::mutex> lock(mutex);
98  removeOne(&mQueue, item);
99  }
100 
101 private:
102  bool mBlocking;
103  bool mUnique;
104  std::mutex mutex;
105  std::condition_variable cond;
106  std::vector<T> mQueue;
107 };
108 
109 template <typename T, class Pred = std::equal_to<T> >
111  GSource source;
112  Queue<T, Pred>* queue;
113  int minQueueSize;
114 };
115 
116 template <typename T, class Pred = std::equal_to<T> >
118 {
119 public:
120  typedef function<void (Queue<T, Pred> *)> AsyncQueueWatcherCallback;
121  AsyncQueueWatcher(Queue<T, Pred> * queue, AsyncQueueWatcherCallback cb, int queueSize = 0, AbstractPropertyType::Priority priority = AbstractPropertyType::Normal)
122  : callback(cb), mMaxQueueSize(queueSize)
123  {
124 
125  static GSourceFuncs funcs = {prepare, check, dispatch, finalize};
126  GSource* source = (GSource *) g_source_new(&funcs, sizeof(AsyncQueueSource<T, Pred>));
127 
129  watch->queue = queue;
130  watch->minQueueSize = queueSize;
131 
132  gint p = G_PRIORITY_DEFAULT;
133 
134  if(priority == AbstractPropertyType::Normal)
135  p = G_PRIORITY_DEFAULT;
136  else if(priority == AbstractPropertyType::High)
137  p = G_PRIORITY_HIGH;
138  else if(priority == AbstractPropertyType::Low)
139  p = G_PRIORITY_LOW;
140 
141  g_source_set_priority(source, p);
142  g_source_set_callback(source, nullptr, this, nullptr);
143 
144  g_source_attach(source, nullptr);
145  g_source_unref(source);
146  }
147 
148  AsyncQueueWatcherCallback callback;
149 
150 
151 protected:
152  AsyncQueueWatcher(){}
153 
154  int mMaxQueueSize;
155 
156 private:
157 
158  static gboolean prepare(GSource *source, gint *timeout)
159  {
161  *timeout = -1;
162 
163  if (!s)
164  return false;
165 
166  return s->queue->count() > s->minQueueSize;
167  }
168 
169  static gboolean check(GSource *source)
170  {
172 
173  if (!s)
174  return false;
175 
176  return s->queue->count() > s->minQueueSize;
177  }
178 
179  static gboolean dispatch(GSource *source, GSourceFunc callback, gpointer userData)
180  {
182 
183  if (!s)
184  return false;
185 
186  AsyncQueueWatcher<T, Pred>* watcher = static_cast<AsyncQueueWatcher<T, Pred>*>(userData);
187 
188  watcher->callback(s->queue);
189  return true;
190  }
191 
192  static void finalize(GSource* source)
193  {
194 
195  }
196 };
197 } // namespace amb
Definition: abstractpropertytype.h:80
Definition: asyncqueue.hpp:33
Definition: asyncqueue.hpp:117
Definition: asyncqueue.hpp:110
Definition: abstractpropertytype.h:252
Priority
The Priority enum describes prority of the property type.
Definition: abstractpropertytype.h:77
Definition: abstractpropertytype.h:81
Definition: abstractpropertytype.h:79