Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
MessageSwitch.cs
1using System;
2using System.Text;
3using System.Threading.Tasks;
6
8{
12 public static class MessageSwitch
13 {
14 private static readonly Cache<string, MessageRec> messages = new Cache<string, MessageRec>(int.MaxValue, TimeSpan.MaxValue, TimeSpan.FromDays(1));
15
16 private class MessageRec
17 {
18 public Message Message;
19 public DateTime Timestamp;
20 public TaskCompletionSource<Message> Pending;
21 }
22
31 public static bool DataReported(Message Message, byte[] NcapId, byte[] TimId, ushort ChannelId)
32 {
33 string Key = GetKey(Message.GetType(), NcapId, TimId, ChannelId);
34
35 lock (messages)
36 {
37 if (messages.TryGetValue(Key, out MessageRec Rec))
38 {
39 Rec.Message = Message;
40 Rec.Timestamp = DateTime.UtcNow;
41
42 if (!(Rec.Pending is null))
43 {
44 Rec.Pending.TrySetResult(Message);
45 Rec.Pending = null;
46 return true;
47 }
48 }
49 else
50 {
51 messages[Key] = new MessageRec()
52 {
54 Timestamp = DateTime.UtcNow,
55 Pending = null
56 };
57 }
58 }
59
60 return false;
61 }
62
63 private static string GetKey(Type T, byte[] NcapId, byte[] TimId, ushort ChannelId)
64 {
65 StringBuilder sb = new StringBuilder();
66
67 sb.Append(T.Name);
68 sb.Append('/');
69 sb.Append(Convert.ToBase64String(NcapId));
70
71 if (!IsZero(TimId))
72 {
73 sb.Append('/');
74 sb.Append(Convert.ToBase64String(TimId));
75
76 if (ChannelId > 0)
77 {
78 sb.Append('/');
79 sb.Append(ChannelId.ToString());
80 }
81 }
82
83 return sb.ToString();
84 }
85
91 public static bool IsZero(byte[] A)
92 {
93 if (A is null)
94 return true;
95
96 foreach (byte b in A)
97 {
98 if (b != 0)
99 return false;
100 }
101
102 return true;
103 }
104
118 public static async Task<T> WaitForMessage<T>(int TimeoutMilliseconds, int StaleLimitSeconds,
119 byte[] NcapId, byte[] TimId, ushort ChannelId)
120 where T : Message
121 {
122 TaskCompletionSource<Message> Pending = new TaskCompletionSource<Message>();
123 TaskCompletionSource<Message> Obsolete = null;
124 string Key = GetKey(typeof(T), NcapId, TimId, ChannelId);
125
126 lock (messages)
127 {
128 if (messages.TryGetValue(Key, out MessageRec Rec))
129 {
130 if (!(Rec.Message is null) && DateTime.UtcNow.Subtract(Rec.Timestamp).TotalSeconds < StaleLimitSeconds)
131 return (T)Rec.Message;
132
133 Obsolete = Rec.Pending;
134 Rec.Pending = Pending;
135 }
136 else
137 {
138 messages[Key] = new MessageRec()
139 {
140 Message = null,
141 Timestamp = DateTime.MinValue,
142 Pending = Pending
143 };
144 }
145 }
146
147 _ = Task.Delay(TimeoutMilliseconds).ContinueWith(_ =>
148 {
149 Pending.TrySetResult(null);
150 });
151
152 Message Result = await Pending.Task;
153
154 if (Result is null)
155 {
156 lock (messages)
157 {
158 if (messages.TryGetValue(Key, out MessageRec Rec) && Rec.Pending == Pending)
159 Rec.Pending = null;
160 }
161
162 throw new TimeoutException();
163 }
164
165 return (T)Result;
166 }
167
168 }
169}
Implements an in-memory cache.
Definition: Cache.cs:15
Helps connect IEEE 1451.1.6 requests and responses across MQTT topics.
static bool DataReported(Message Message, byte[] NcapId, byte[] TimId, ushort ChannelId)
Called when new data has been received.
static async Task< T > WaitForMessage< T >(int TimeoutMilliseconds, int StaleLimitSeconds, byte[] NcapId, byte[] TimId, ushort ChannelId)
Waits for a message to be received.
static bool IsZero(byte[] A)
Checks if an ID is "zero", i.e. contains only zero bytes.