automotive-message-broker  0.12
 All Classes Functions Variables Typedefs Enumerations Enumerator Pages
asyncqueue.hpp
1 /*
2  Copyright (C) 2014 Intel Corporation
3 
4  This library is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Lesser General Public
6  License as published by the Free Software Foundation; either
7  version 2.1 of the License, or (at your option) any later version.
8 
9  This library is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  Lesser General Public License for more details.
13 
14  You should have received a copy of the GNU Lesser General Public
15  License along with this library; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 
19 #include <glib.h>
20 
21 #include <abstractpropertytype.h>
22 #include "listplusplus.h"
23 
24 #include <mutex>
25 #include <unordered_set>
26 
27 namespace amb
28 {
29 
30 template <typename T, class Pred = std::equal_to<T> >
31 class Queue
32 {
33 public:
34  Queue()
35  {
36 
37  }
38  virtual ~Queue()
39  {
40 
41  }
42 
43  int count()
44  {
45  std::lock_guard<std::mutex> lock(mutex);
46 
47  return mQueue.size();
48  }
49 
50  T pop()
51  {
52  std::lock_guard<std::mutex> lock(mutex);
53 
54  auto itr = mQueue.begin();
55 
56  T item = *itr;
57 
58  mQueue.erase(itr);
59 
60  return item;
61  }
62 
63  virtual void append(T item)
64  {
65  std::lock_guard<std::mutex> lock(mutex);
66 
67  mQueue.insert(item);
68  }
69 
70  void remove(T item)
71  {
72  std::lock_guard<std::mutex> lock(mutex);
73  removeOne(&mQueue, item);
74  }
75 
76 protected:
77  std::mutex mutex;
78  std::unordered_set<T,std::hash<T>, Pred> mQueue;
79 };
80 
81 template <typename T, class Pred = std::equal_to<T> >
83  GSource source;
84  Queue<T, Pred>* queue;
85  int minQueueSize;
86 };
87 
88 template <typename T, class Pred = std::equal_to<T> >
90 {
91 public:
92  typedef function<void (Queue<T, Pred> *)> AsyncQueueWatcherCallback;
93  AsyncQueueWatcher(Queue<T, Pred> * queue, AsyncQueueWatcherCallback cb, int queueSize = 0, AbstractPropertyType::Priority priority = AbstractPropertyType::Normal)
94  : callback(cb), mMaxQueueSize(queueSize)
95  {
96 
97  static GSourceFuncs funcs = {prepare, check, dispatch, finalize};
98  GSource* source = (GSource *) g_source_new(&funcs, sizeof(AsyncQueueSource<T, Pred>));
99 
101  watch->queue = queue;
102  watch->minQueueSize = queueSize;
103 
104  gint p = G_PRIORITY_DEFAULT;
105 
106  if(priority == AbstractPropertyType::Normal)
107  p = G_PRIORITY_DEFAULT;
108  else if(priority == AbstractPropertyType::High)
109  p = G_PRIORITY_HIGH;
110  else if(priority == AbstractPropertyType::Low)
111  p = G_PRIORITY_LOW;
112 
113  g_source_set_priority(source, p);
114  g_source_set_callback(source, nullptr, this, nullptr);
115 
116  g_source_attach(source, nullptr);
117  g_source_unref(source);
118  }
119 
120  AsyncQueueWatcherCallback callback;
121 
122 
123 protected:
124  AsyncQueueWatcher(){}
125 
126  int mMaxQueueSize;
127 
128 private:
129 
130  static gboolean prepare(GSource *source, gint *timeout)
131  {
133  *timeout = -1;
134 
135  if (!s)
136  return false;
137 
138  return s->queue->count() > s->minQueueSize;
139  }
140 
141  static gboolean check(GSource *source)
142  {
144 
145  if (!s)
146  return false;
147 
148  return s->queue->count() > s->minQueueSize;
149  }
150 
151  static gboolean dispatch(GSource *source, GSourceFunc callback, gpointer userData)
152  {
154 
155  if (!s)
156  return false;
157 
158  AsyncQueueWatcher<T, Pred>* watcher = static_cast<AsyncQueueWatcher<T, Pred>*>(userData);
159 
160  watcher->callback(s->queue);
161  return true;
162  }
163 
164  static void finalize(GSource* source)
165  {
166 
167  }
168 };
169 } // namespace amb
Definition: abstractpropertytype.h:79
Definition: asyncqueue.hpp:31
Definition: asyncqueue.hpp:89
Definition: asyncqueue.hpp:82
Priority
The Priority enum describes prority of the property type.
Definition: abstractpropertytype.h:76
Definition: abstractpropertytype.h:80
Definition: abstractpropertytype.h:78