Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
MessageData.cs
1using System;
2using System.Collections.Generic;
3using System.Text;
4using System.Threading.Tasks;
10using Waher.Security;
16
18{
22 public enum DataMode
23 {
27 Binary,
28
32 Base64,
33
37 Hex
38 }
39
43 public class MessageData : MqttData
44 {
45 private readonly Dictionary<TedsAccessCode, KeyValuePair<DateTime, Teds>> teds = new Dictionary<TedsAccessCode, KeyValuePair<DateTime, Teds>>();
46 private readonly byte[] ncapId;
47 private readonly byte[] timId;
48 private readonly ushort channelId;
49 private readonly string communicationTopic;
50 private DataMode? dataMode = null;
51
55 public MessageData()
56 : base()
57 {
58 }
59
67 public MessageData(MqttTopic Topic, byte[] NcapId, byte[] TimId, ushort ChannelId)
68 : base(Topic)
69 {
70 this.ncapId = NcapId;
71 this.timId = TimId;
72 this.channelId = ChannelId;
73
74 this.communicationTopic = this.EvaluateCommunicationTopic();
75 }
76
86 : this(Topic, NcapId, TimId, ChannelId)
87 {
89 }
90
94 public byte[] NcapId => this.ncapId;
95
99 public byte[] TimId => this.timId;
100
104 public int ChannelId => this.channelId;
105
112 {
113 return MessageSwitch.DataReported(Message, this.ncapId, this.timId, this.channelId);
114 }
115
116 private string EvaluateCommunicationTopic()
117 {
118 string s = this.Topic?.FullTopic;
119 if (string.IsNullOrEmpty(s))
120 return s;
121
122 int i;
123
124 if (!MessageSwitch.IsZero(this.timId))
125 {
126 if (this.channelId > 0)
127 {
128 i = s.LastIndexOf('/');
129 if (i < 0)
130 return s;
131
132 s = s.Substring(0, i);
133 }
134
135 i = s.LastIndexOf('/');
136 if (i < 0)
137 return s;
138
139 s = s.Substring(0, i);
140 }
141
142 i = s.LastIndexOf('/');
143 if (i < 0)
144 return s;
145
146 return s.Substring(0, i);
147 }
148
152 public override Grade DefaultSupport => Grade.NotAtAll;
153
161 {
162 return null;
163 }
164
171 public override async Task<DataProcessingResult> DataReported(MqttTopic Topic, MqttContent Content)
172 {
173 byte[] Data;
174
175 if (!this.dataMode.HasValue)
176 {
177 string s = Content.DataString;
178
179 if (HexStringData.RegEx.IsMatch(s))
180 this.dataMode = DataMode.Hex;
181 else if (Base64Data.RegEx.IsMatch(s))
182 this.dataMode = DataMode.Base64;
183 else
184 this.dataMode = DataMode.Binary;
185 }
186
187 switch (this.dataMode.Value)
188 {
189 case DataMode.Binary:
190 default:
191 Data = Content.Data;
192 break;
193
194 case DataMode.Base64:
195 try
196 {
197 Data = Convert.FromBase64String(Content.DataString);
198 }
199 catch (Exception)
200 {
201 return DataProcessingResult.Processed;
202 }
203 break;
204
205 case DataMode.Hex:
206 try
207 {
208 Data = Hashes.StringToBinary(Content.DataString);
209 }
210 catch (Exception)
211 {
212 return DataProcessingResult.Processed;
213 }
214 break;
215 }
216
218 if (Message is null)
219 return DataProcessingResult.Processed;
220
221 return await Ncap.MessageReceived(this, Topic, Message);
222 }
223
227 public override Task<string> GetTypeName(Language Language)
228 {
229 return Language.GetStringAsync(typeof(RootTopic), 15, "IEEE 1451.1.6 message data");
230 }
231
235 public override void SnifferOutput(ICommunicationLayer Output)
236 {
237 this.Information(Output, "Transducer data");
238 }
239
251 public async Task<TransducerAccessMessage> RequestTransducerData(SamplingMode SamplingMode,
252 int TimeoutMilliseconds, int StaleLimitSeconds)
253 {
254 MqttBroker Broker = this.Topic.Broker;
255 MqttBrokerNode BrokerNode = Broker.Node;
256 StringBuilder ToSniffer = BrokerNode.HasSniffers ? new StringBuilder() : null;
257
258 byte[] Request = TransducerAccessMessage.SerializeRequest(this.ncapId,
259 this.timId, this.channelId, SamplingMode, TimeoutMilliseconds * 1e-3, ToSniffer);
260
261 Task<TransducerAccessMessage> Result = MessageSwitch.WaitForMessage<TransducerAccessMessage>(
262 TimeoutMilliseconds, StaleLimitSeconds, this.ncapId, this.timId, this.channelId);
263
264 if (!Result.IsCompleted)
265 {
266 if (!(ToSniffer is null))
267 await BrokerNode.Information(ToSniffer.ToString());
268
269 await Broker.Publish(this.communicationTopic, MqttQualityOfService.AtLeastOnce, false, Request);
270 }
271
272 return await Result;
273 }
274
286 public async Task<TedsAccessMessage> RequestTEDS(TedsAccessCode TedsAccessCode,
287 int TimeoutMilliseconds, int StaleLimitSeconds)
288 {
289 MqttBroker Broker = this.Topic.Broker;
290 MqttBrokerNode BrokerNode = Broker.Node;
291 StringBuilder ToSniffer = BrokerNode.HasSniffers ? new StringBuilder() : null;
292
293 byte[] Request = TedsAccessMessage.SerializeRequest(this.ncapId,
294 this.timId, this.channelId, TedsAccessCode, 0, TimeoutMilliseconds * 1e-3, ToSniffer);
295
296 Task<TedsAccessMessage> Result = MessageSwitch.WaitForMessage<TedsAccessMessage>(
297 TimeoutMilliseconds, StaleLimitSeconds, this.ncapId, this.timId, this.channelId);
298
299 if (!Result.IsCompleted)
300 {
301 if (!(ToSniffer is null))
302 await BrokerNode.Information(ToSniffer.ToString());
303
304 await Broker.Publish(this.communicationTopic, MqttQualityOfService.AtLeastOnce, false, Request);
305 }
306
307 return await Result;
308 }
309
310 private async Task<(Teds, DateTime)> GetTeds(TedsAccessCode Code, ThingReference ThingReference, ISensorReadout Request)
311 {
312 DateTime TP = DateTime.UtcNow;
313
314 this.GetTimeouts(out int TimeoutMilliseconds, out int _, out int RefreshTedsHours, out _);
315
316 lock (this.teds)
317 {
318 if (this.teds.TryGetValue(Code, out KeyValuePair<DateTime, Teds> P) && TP.Subtract(P.Key).TotalHours < RefreshTedsHours)
319 return (P.Value, P.Key);
320 }
321
322 TedsAccessMessage TedsMessage = await this.RequestTEDS(Code, TimeoutMilliseconds, 0); // Do not use cached message. Force readout.
323
324 (ushort ErrorCode, Teds Teds) = await TedsMessage.TryParseTeds();
325 if (!(Teds is null))
326 {
327 if (ErrorCode != 0)
328 await Request.ReportErrors(false, new ThingError(ThingReference, "Transducer Error code: " + ErrorCode.ToString("X4")));
329 else
330 {
331 lock (this.teds)
332 {
333 this.teds[Code] = new KeyValuePair<DateTime, Teds>(TP, Teds);
334 }
335
336 return (Teds, TP);
337 }
338 }
339 else
340 await Request.ReportErrors(true, new ThingError(ThingReference, "Unable to parse transducer data."));
341
342 return (null, DateTime.MinValue);
343 }
344
345 private void GetTimeouts(out int TimeoutMilliseconds, out int StaleSeconds, out int RefreshTedsHours, out Unit PreferredUnit)
346 {
347 if (this.Topic.Node is MqttNcapTopicNode NcapNode)
348 {
349 TimeoutMilliseconds = NcapNode.TimeoutMilliseconds;
350 StaleSeconds = NcapNode.StaleSeconds;
351 RefreshTedsHours = NcapNode.RefreshTedsHours;
352
353 if (!(NcapNode is MqttChannelTopicNode ChannelNode) ||
354 string.IsNullOrEmpty(ChannelNode.PreferredUnit) ||
355 !Unit.TryParse(ChannelNode.PreferredUnit, out PreferredUnit))
356 {
357 PreferredUnit = null;
358 }
359 }
360 else
361 {
362 TimeoutMilliseconds = 10000;
363 StaleSeconds = 60;
364 RefreshTedsHours = 24;
365 PreferredUnit = null;
366 }
367 }
368
376 public override async Task StartReadout(ThingReference ThingReference, ISensorReadout Request, string Prefix, bool Last)
377 {
378 try
379 {
380 this.GetTimeouts(out int TimeoutMilliseconds, out int StaleSeconds, out int RefreshTedsHours, out Unit PreferredUnit);
381
382 if (this.channelId > 0) // Channel
383 {
384 (Teds Teds, DateTime TedsTimestamp) = await this.GetTeds(TedsAccessCode.ChanTEDS, ThingReference, Request);
385
386 if (Request.IsIncluded(FieldType.Identity) || Request.IsIncluded(FieldType.Status))
387 {
388 Field[] Fields = Teds.GetFields(ThingReference, TedsTimestamp);
389 await Request.ReportFields(false, Fields);
390
391 (Teds MetaTeds, DateTime MetaTedsTimestamp) = await this.GetTeds(TedsAccessCode.MetaTEDS, ThingReference, Request);
392 Field[] Fields2 = MetaTeds.GetFields(ThingReference, MetaTedsTimestamp);
393 Field[] Fields3 = RemoveDuplicates(Fields2, Fields);
394
395 if (!(Fields3 is null))
396 await Request.ReportFields(false, Fields3);
397 }
398
399 TransducerAccessMessage TransducerMessage = await this.RequestTransducerData(SamplingMode.Immediate, TimeoutMilliseconds, StaleSeconds);
400
401 if (TransducerMessage.TryParseTransducerData(ThingReference, Teds, PreferredUnit, out ushort ErrorCode, out TransducerData TransducerData))
402 {
403 if (ErrorCode != 0)
404 await Request.ReportErrors(false, new ThingError(ThingReference, "Transducer Error code: " + ErrorCode.ToString("X4")));
405
406 await Request.ReportFields(true, TransducerData.Fields);
407 }
408 else
409 await Request.ReportErrors(true, new ThingError(ThingReference, "Unable to parse transducer data."));
410 }
411 else if (!MessageSwitch.IsZero(this.timId)) // TIM
412 {
413 }
414 else // NCAP
415 {
416 }
417 }
418 catch (Exception ex)
419 {
420 await Request.ReportErrors(true, new ThingError(ThingReference, ex.Message));
421 }
422 }
423
424 private static Field[] RemoveDuplicates(Field[] Fields, Field[] AlreadyReported)
425 {
426 Dictionary<string, Field> ByName = new Dictionary<string, Field>();
427
428 foreach (Field F in AlreadyReported)
429 ByName[F.Name] = F;
430
431 List<Field> Result = null;
432
433 foreach (Field F in Fields)
434 {
435 if (ByName.TryGetValue(F.Name, out Field F2) && F.ObjectValue.Equals(F2.ObjectValue))
436 continue;
437
438 if (Result is null)
439 Result = new List<Field>();
440
441 Result.Add(F);
442 }
443
444 return Result?.ToArray();
445 }
446 }
447}
Information about content received from the MQTT server.
Definition: MqttContent.cs:9
ICommunicationLayer CommunicationLayer
Communication layer on which the message was received.
Definition: MqttContent.cs:50
string DataString
String representation of UTF-8 encoded binary data.
Definition: MqttContent.cs:56
Contains information about a language.
Definition: Language.cs:17
Task< string > GetStringAsync(Type Type, int Id, string Default)
Gets the string value of a string ID. If no such string exists, a string is created with the default ...
Definition: Language.cs:209
Represents a unit.
Definition: Unit.cs:15
static bool TryParse(string UnitString, out Unit Unit)
Tries to parse a string into a unit.
Definition: Unit.cs:137
Contains methods for simple hash calculations.
Definition: Hashes.cs:59
static byte[] StringToBinary(string s)
Parses a hex string.
Definition: Hashes.cs:102
static byte[] SerializeRequest(byte[] NcapId, byte[] TimId, ushort ChannelId, TedsAccessCode TedsAccessCode, uint TedsOffset, double TimeoutSeconds, StringBuilder SnifferOutput)
Serializes a request for TEDS.
Task<(ushort ErrorCode, Teds Teds)> TryParseTeds()
Tries to parse a TEDS from the message.
Field[] GetFields(ThingReference Thing, DateTime Timestamp)
Gets the information in the record, as an array of fields.
Definition: Teds.cs:77
bool TryParseTransducerData(ThingReference Thing, Teds ChannelTeds, Unit PreferredUnit, out ushort ErrorCode, out TransducerData Data)
Tries to parse Transducer Data from the message.
static byte[] SerializeRequest(byte[] NcapId, byte[] TimId, ushort ChannelId, SamplingMode SamplingMode, double TimeoutSeconds, StringBuilder SnifferOutput)
Serializes a request for transducer data.
Encapsulates messages from an IEEE1451.1.6 device.
Definition: MessageData.cs:44
async Task< TransducerAccessMessage > RequestTransducerData(SamplingMode SamplingMode, int TimeoutMilliseconds, int StaleLimitSeconds)
Requests transducer data from an NCAP.
Definition: MessageData.cs:251
override Task< string > GetTypeName(Language Language)
Type name representing data.
Definition: MessageData.cs:227
override Grade DefaultSupport
Default support.
Definition: MessageData.cs:152
async Task< TedsAccessMessage > RequestTEDS(TedsAccessCode TedsAccessCode, int TimeoutMilliseconds, int StaleLimitSeconds)
Requests transducer data from an NCAP.
Definition: MessageData.cs:286
byte[] TimId
TIM ID (can be null, if addressing the NCAP)
Definition: MessageData.cs:99
override async Task< DataProcessingResult > DataReported(MqttTopic Topic, MqttContent Content)
Called when new data has been published.
Definition: MessageData.cs:171
MessageData(MqttTopic Topic, Message Message, byte[] NcapId, byte[] TimId, ushort ChannelId)
Encapsulates messages from an IEEE1451.1.6 device.
Definition: MessageData.cs:85
bool DataReported(Message Message)
Called when new data has been received.
Definition: MessageData.cs:111
int ChannelId
Channel ID (can be 0, if addressing the TIM, or NCAP)
Definition: MessageData.cs:104
MessageData()
Encapsulates messages from an IEEE1451.1.6 device.
Definition: MessageData.cs:55
override async Task StartReadout(ThingReference ThingReference, ISensorReadout Request, string Prefix, bool Last)
Starts a readout of the data.
Definition: MessageData.cs:376
override IMqttData CreateNew(MqttTopic Topic, MqttContent Content)
Creates a new instance of the data.
Definition: MessageData.cs:160
MessageData(MqttTopic Topic, byte[] NcapId, byte[] TimId, ushort ChannelId)
Encapsulates messages from an IEEE1451.1.6 device.
Definition: MessageData.cs:67
override void SnifferOutput(ICommunicationLayer Output)
Outputs the parsed data to the sniffer.
Definition: MessageData.cs:235
Helps connect IEEE 1451.1.6 requests and responses across MQTT topics.
static bool DataReported(Message Message, byte[] NcapId, byte[] TimId, ushort ChannelId)
Called when new data has been received.
static bool IsZero(byte[] A)
Checks if an ID is "zero", i.e. contains only zero bytes.
Abstract base class for IEEE 1451.1.6 NCAPs.
Definition: Ncap.cs:22
static async Task< DataProcessingResult > MessageReceived(MqttData This, MqttTopic Topic, Message Message)
Processes an IEEE 1451.0 message.
Definition: Ncap.cs:77
IEEE 1451.1.6 root topic node
Definition: RootTopic.cs:13
Static class for IEEE 1451-related parsing tasks.
static Task< Message > TryParseMessage(byte[] Data)
Tries to parse an IEEE 1451.0-encoded data.
Represents an MQTT topic with base64-encoded binary data.
Definition: Base64Data.cs:19
static readonly Regex RegEx
Parsed regular expression for BASE64-encoded data.
Definition: Base64Data.cs:28
Represents an MQTT topic with binary data encoded as decimal strings.
static readonly Regex RegEx
Parsed regular expression for hexadecimal string data.
Abstract base class for MQTT data encapsulations.
Definition: MqttData.cs:18
void Information(ICommunicationLayer Output, string Info)
Outputs information to sniffer.
Definition: MqttData.cs:91
MQTT Broker connection object.
Definition: MqttBroker.cs:17
Task Publish(string Topic, MqttQualityOfService QoS, bool Retain, byte[] Data)
Publishes binary data to a topic.
Definition: MqttBroker.cs:147
MqttBrokerNode Node
Reference to broker node.
Definition: MqttBroker.cs:63
MQTT Topic information.
Definition: MqttTopic.cs:19
MqttBroker Broker
MQTT Broker
Definition: MqttTopic.cs:56
string FullTopic
Full topic name
Definition: MqttTopic.cs:66
IMqttTopicNode Node
Reference to the MQTT Topic Node
Definition: MqttTopic.cs:51
Node representing a connection to an MQTT broker.
async Task Information(string Comment)
Called to inform the viewer of something.
Base class for all sensor data fields.
Definition: Field.cs:20
Contains information about an error on a thing
Definition: ThingError.cs:10
Contains a reference to a thing
override string ToString()
Interface for observable classes implementing communication protocols.
Interface for classes managing sensor data readouts.
bool IsIncluded(string FieldName)
Checks if a field with the given parameters is included in the readout.
Task ReportErrors(bool Done, params ThingError[] Errors)
Report error states to the client.
Task ReportFields(bool Done, params Field[] Fields)
Report read fields to the client.
Interface for MQTT Data encapsulations
Definition: IMqttData.cs:38
MqttQualityOfService
MQTT Quality of Service level.
Grade
Grade enumeration
Definition: Grade.cs:7
Prefix
SI prefixes. http://physics.nist.gov/cuu/Units/prefixes.html
Definition: Prefixes.cs:11
DataProcessingResult
Results from processing an incoming message.
Definition: IMqttData.cs:17
FieldType
Field Type flags
Definition: FieldType.cs:10