Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
AsyncQueue.cs
1using System;
2using System.Collections.Generic;
3using System.Threading.Tasks;
4using Waher.Events;
5
7{
13 public class AsyncQueue<T> : IDisposable
14 where T : class
15 {
16 private readonly LinkedList<Item> queue = new LinkedList<Item>();
17 private readonly LinkedList<TaskCompletionSource<T>> subscribers = new LinkedList<TaskCompletionSource<T>>();
18 private readonly TaskCompletionSource<bool> terminatedTask = new TaskCompletionSource<bool>();
19 private readonly object synchObj = new object();
20 private volatile int countItems = 0;
21 private volatile int countSubscribers = 0;
22 private bool disposed = false;
23 private bool terminated = false;
24
29 public AsyncQueue()
30 {
31 }
32
33 private class Item
34 {
35 internal T Value { get; }
36 internal TaskCompletionSource<bool> Processed = new TaskCompletionSource<bool>();
37
38 internal Item(T Value)
39 {
40 this.Value = Value;
41 }
42 }
43
47 public int CountItems
48 {
49 get
50 {
51 lock (this.synchObj)
52 {
53 return this.countItems;
54 }
55 }
56 }
57
62 {
63 get
64 {
65 lock (this.synchObj)
66 {
67 return this.countSubscribers;
68 }
69 }
70 }
71
77 public Task<bool> Add(T Item)
78 {
79 return this.Add(Item, false);
80 }
81
87 public Task<bool> AddLast(T Item)
88 {
89 return this.Add(Item, false);
90 }
91
97 public Task<bool> AddFirst(T Item)
98 {
99 return this.Add(Item, true);
100 }
101
108 public Task<bool> Add(T Item, bool First)
109 {
110 lock (this.synchObj)
111 {
112 if (this.terminated || this.disposed)
113 throw new ObjectDisposedException("Queue has been terminated.");
114
115 if (this.subscribers.First is null)
116 {
117 Item Record = new Item(Item);
118
119 if (First)
120 this.queue.AddFirst(Record);
121 else
122 this.queue.AddLast(Record);
123
124 this.countItems++;
125
126 return Record.Processed.Task;
127 }
128 else
129 {
130 TaskCompletionSource<T> Waiter = this.subscribers.First.Value;
131 this.subscribers.RemoveFirst();
132 this.countSubscribers--;
133 Waiter.TrySetResult(Item);
134
135 return Task.FromResult(true);
136 }
137 }
138 }
139
145 public Task<T> Wait()
146 {
147 Item Record;
148
149 lock (this.synchObj)
150 {
151 if (this.disposed)
152 return Task.FromResult<T>(null);
153
154 if (this.queue.First is null)
155 {
156 TaskCompletionSource<T> Task = new TaskCompletionSource<T>();
157 this.subscribers.AddLast(Task);
158 this.countSubscribers++;
159 return Task.Task;
160 }
161 else
162 {
163 Record = this.queue.First.Value;
164 this.queue.RemoveFirst();
165 this.countItems--;
166
167 Record.Processed.TrySetResult(true);
168
169 if (this.terminated && this.queue.First is null)
170 this.disposed = true;
171 else
172 return Task.FromResult(Record.Value);
173 }
174 }
175
176 this.RaiseDisposed();
177
178 return Task.FromResult(Record.Value);
179 }
180
187 public bool TryPeekItem(out T Item)
188 {
189 return this.TryGetItem(false, out Item);
190 }
191
198 public bool TryGetItem(out T Item)
199 {
200 return this.TryGetItem(true, out Item);
201 }
202
203 private bool TryGetItem(bool Remove, out T Item)
204 {
205 lock (this.synchObj)
206 {
207 if (this.disposed || this.queue.First is null)
208 {
209 Item = null;
210 return false;
211 }
212 else
213 {
214 Item Record = this.queue.First.Value;
215 Item = Record.Value;
216
217 if (Remove)
218 {
219 this.queue.RemoveFirst();
220 this.countItems--;
221
222 Record.Processed.TrySetResult(true);
223
224 if (this.terminated && this.queue.First is null)
225 this.disposed = true;
226 else
227 return true;
228 }
229 else
230 return true;
231 }
232 }
233
234 this.RaiseDisposed();
235 return true;
236 }
237
241 public void Dispose()
242 {
243 lock (this.synchObj)
244 {
245 if (this.disposed)
246 return;
247
248 this.disposed = true;
249 this.terminated = true;
250
251 foreach (Item Record in this.queue)
252 Record.Processed.TrySetResult(false);
253
254 this.queue.Clear();
255 this.countItems = 0;
256
257 foreach (TaskCompletionSource<T> Task in this.subscribers)
258 Task.TrySetResult(null);
259
260 this.subscribers.Clear();
261 this.countSubscribers = 0;
262 }
263
264 this.RaiseDisposed();
265 }
266
272 public Task Terminate()
273 {
274 lock (this.synchObj)
275 {
276 this.terminated = true;
277
278 if (this.queue.First is null)
279 {
280 this.disposed = true;
281
282 foreach (Item Record in this.queue)
283 Record.Processed.TrySetResult(false);
284
285 this.queue.Clear();
286 this.countItems = 0;
287
288 foreach (TaskCompletionSource<T> Task in this.subscribers)
289 Task.TrySetResult(null);
290
291 this.subscribers.Clear();
292 this.countSubscribers = 0;
293 }
294 else
295 return this.terminatedTask.Task;
296 }
297
298 this.RaiseDisposed();
299 return this.terminatedTask.Task;
300 }
301
305 public event EventHandler Disposed = null;
306
307 private void RaiseDisposed()
308 {
309 this.terminatedTask.TrySetResult(true);
310
311 try
312 {
313 EventHandler h = this.Disposed;
314
315 if (!(h is null))
316 h(this, EventArgs.Empty);
317 }
318 catch (Exception ex)
319 {
320 Log.Exception(ex);
321 }
322 }
323
324 }
325}
Static class managing the application event log. Applications and services log events on this static ...
Definition: Log.cs:13
static void Exception(Exception Exception, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, params KeyValuePair< string, object >[] Tags)
Logs an exception. Event type will be determined by the severity of the exception.
Definition: Log.cs:1647
Asynchronous First-in-First-out (FIFO) Queue, for use when transporting items of type T between task...
Definition: AsyncQueue.cs:15
Task Terminate()
Terminates the queue, allowing subscribers to get queued items, but disallows new items to be added....
Definition: AsyncQueue.cs:272
Task< bool > AddFirst(T Item)
Adds an item first to the queue.
Definition: AsyncQueue.cs:97
Task< T > Wait()
Waits indefinitely (or until queue is disposed) for an item to be available. If Queue is disposed,...
Definition: AsyncQueue.cs:145
int CountSubscribers
Number of subscribers waiting for items.
Definition: AsyncQueue.cs:62
bool TryGetItem(out T Item)
Tries to get a queued item, if found. If not, the method returns immediately with a null item.
Definition: AsyncQueue.cs:198
void Dispose()
IDisposable.Dispose
Definition: AsyncQueue.cs:241
EventHandler Disposed
Event raised when queue has been disposed.
Definition: AsyncQueue.cs:305
Task< bool > AddLast(T Item)
Adds an item last to the queue.
Definition: AsyncQueue.cs:87
AsyncQueue()
Asynchronous Queue, for use when transporting items of class T between tasks.
Definition: AsyncQueue.cs:29
Task< bool > Add(T Item)
Adds an item last to the queue.
Definition: AsyncQueue.cs:77
Task< bool > Add(T Item, bool First)
Adds an item to the queue.
Definition: AsyncQueue.cs:108
int CountItems
Number of items in queue.
Definition: AsyncQueue.cs:48
bool TryPeekItem(out T Item)
Tries to get a queued item, if found. If not, the method returns immediately with a null item.
Definition: AsyncQueue.cs:187