3using System.Threading.Tasks;
16 private class MessageRec
19 public DateTime Timestamp;
20 public TaskCompletionSource<Message> Pending;
33 string Key = GetKey(
Message.GetType(), NcapId, TimId, ChannelId);
37 if (messages.TryGetValue(Key, out MessageRec Rec))
40 Rec.Timestamp = DateTime.UtcNow;
42 if (!(Rec.Pending is
null))
44 Rec.Pending.TrySetResult(
Message);
51 messages[Key] =
new MessageRec()
54 Timestamp = DateTime.UtcNow,
63 private static string GetKey(Type T,
byte[] NcapId,
byte[] TimId, ushort ChannelId)
65 StringBuilder sb =
new StringBuilder();
69 sb.Append(Convert.ToBase64String(NcapId));
74 sb.Append(Convert.ToBase64String(TimId));
79 sb.Append(ChannelId.ToString());
119 byte[] NcapId,
byte[] TimId, ushort ChannelId)
122 TaskCompletionSource<Message> Pending =
new TaskCompletionSource<Message>();
123 TaskCompletionSource<Message> Obsolete =
null;
124 string Key = GetKey(typeof(T), NcapId, TimId, ChannelId);
128 if (messages.TryGetValue(Key, out MessageRec Rec))
130 if (!(Rec.Message is
null) && DateTime.UtcNow.Subtract(Rec.Timestamp).TotalSeconds < StaleLimitSeconds)
131 return (T)Rec.Message;
133 Obsolete = Rec.Pending;
134 Rec.Pending = Pending;
138 messages[Key] =
new MessageRec()
141 Timestamp = DateTime.MinValue,
147 _ = Task.Delay(TimeoutMilliseconds).ContinueWith(_ =>
149 Pending.TrySetResult(
null);
152 Message Result = await Pending.Task;
158 if (messages.TryGetValue(Key, out MessageRec Rec) && Rec.Pending == Pending)
162 throw new TimeoutException();
Implements an in-memory cache.
Helps connect IEEE 1451.1.6 requests and responses across MQTT topics.
static bool DataReported(Message Message, byte[] NcapId, byte[] TimId, ushort ChannelId)
Called when new data has been received.
static async Task< T > WaitForMessage< T >(int TimeoutMilliseconds, int StaleLimitSeconds, byte[] NcapId, byte[] TimId, ushort ChannelId)
Waits for a message to be received.
static bool IsZero(byte[] A)
Checks if an ID is "zero", i.e. contains only zero bytes.