Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
Ncap.cs
1using System;
2using System.Collections.Generic;
3using System.Text;
4using System.Threading.Tasks;
15
17{
21 public abstract class Ncap : MqttData
22 {
23 private byte[] value;
24 private bool firstMessage = true;
25
29 public Ncap()
30 : base()
31 {
32 }
33
39 public Ncap(MqttTopic Topic, byte[] Value)
40 : base(Topic)
41 {
42 this.value = Value;
43 }
44
52 public async Task<DataProcessingResult> DataReported(MqttTopic Topic, MqttContent Content, byte[] Data)
53 {
54 Message Message = await Ieee1451Parser.TryParseMessage(Data, Topic is null ? null : Content.CommunicationLayer);
55 if (Message is null)
56 return this.firstMessage ? DataProcessingResult.Incompatible : DataProcessingResult.Processed;
57
58 this.firstMessage = false;
59 this.value = Data;
60 this.Timestamp = DateTime.UtcNow;
61 this.QoS = Content.Header.QualityOfService;
62 this.Retain = Content.Header.Retain;
63
64 if (Topic is null)
65 return DataProcessingResult.Processed;
66
67 return await MessageReceived(this, Topic, Message);
68 }
69
77 public static async Task<DataProcessingResult> MessageReceived(MqttData This, MqttTopic Topic, Message Message)
78 {
80
81 try
82 {
83 switch (Message.MessageType)
84 {
85 case MessageType.Reply:
86 Result = await ProcessReply(This, Topic, Message);
87 break;
88
89 case MessageType.Command:
90 await ProcessRequest(This, Message);
91 break;
92
93 case MessageType.Announcement:
94 case MessageType.Notification:
95 case MessageType.Callback:
96 default:
97 break;
98 }
99
101 }
102 catch (Exception ex)
103 {
104 await LogErrorAsync(This, string.Empty, ex.Message);
105 }
106
107 return Result;
108 }
109
110 internal static async Task<DataProcessingResult> ProcessReply(MqttData This, MqttTopic Topic, Message Message)
111 {
112 MqttTopic SubTopic;
113 bool ContainsMomentary = false;
114
116 {
117 ThingReference Ref = new ThingReference(This.Topic.Node);
118 if (TransducerAccessMessage.TryParseTransducerData(Ref, null, null, out ushort ErrorCode, out TransducerData Data))
119 {
120 await RemoveErrorAsync(This, "TransducerResponseError");
121
122 SubTopic = await This.Topic.Broker.GetTopic(Data.ChannelInfo.GetTopic(This.Topic.FullTopic), true, false);
123
124 if (ErrorCode == 0)
125 {
126 await SubTopic.Node.RemoveErrorAsync("TranducerError");
127 ContainsMomentary = true;
128 }
129 else
130 await SubTopic.Node.LogErrorAsync("TranducerError", "Transducer error: " + ErrorCode.ToString("X4"));
131 }
132 else
133 {
134 await LogErrorAsync(This, "TransducerResponseError", "Unable to parse Transducer response.");
135 return DataProcessingResult.Processed;
136 }
137 }
139 {
140 (ushort ErrorCode, Teds Teds) = await TedsAccessMessage.TryParseTeds(true);
141 if (!(Teds is null))
142 {
143 await RemoveErrorAsync(This, "TedsResponseError");
144
145 SubTopic = await This.Topic.Broker.GetTopic(Teds.ChannelInfo.GetTopic(This.Topic.FullTopic), true, false);
146
147 if (ErrorCode == 0)
148 await SubTopic.Node.RemoveErrorAsync("TedsError");
149 else
150 await SubTopic.Node.LogErrorAsync("TedsError", "TEDS error: " + ErrorCode.ToString("X4"));
151 }
152 else
153 {
154 await LogErrorAsync(This, "TedsResponseError", "Unable to parse TEDS response.");
155 return DataProcessingResult.Processed;
156 }
157 }
159 {
160 if (DiscoveryMessage.TryParseMessage(out ushort ErrorCode, out DiscoveryData Data))
161 {
162 await RemoveErrorAsync(This, "DiscoveryResponseError");
163
164 string TopicString;
165 bool Created;
166
168 {
169 case DiscoveryService.NCAPDiscovery:
170 if (Data is DiscoveryDataEntity NcapEntity)
171 {
172 TopicString = NcapEntity.Channel.GetTopic(This.Topic.FullTopic);
173
174 SubTopic = await This.Topic.Broker.GetTopic(TopicString, false, false);
175 if (Created = SubTopic is null)
176 SubTopic = await This.Topic.Broker.GetTopic(TopicString, true, false);
177
178 if (SubTopic.Node is MqttNcapTopicNode TopicNode)
179 await TopicNode.NameReceived(NcapEntity.Name);
180
181 if (ErrorCode == 0)
182 await SubTopic.Node.RemoveErrorAsync("DiscoveryError");
183 else
184 await SubTopic.Node.LogErrorAsync("DiscoveryError", "Discovery error: " + ErrorCode.ToString("X4"));
185
186 if (Created)
187 {
188 MqttBroker Broker = Topic.Broker;
189 MqttBrokerNode BrokerNode = Broker.Node;
190 StringBuilder ToSniffer = BrokerNode.HasSniffers ? new StringBuilder() : null;
191 byte[] Request = DiscoveryMessage.SerializeRequest(NcapEntity.Channel.NcapId, ToSniffer);
192
193 if (!(ToSniffer is null))
194 await BrokerNode.Information(ToSniffer.ToString());
195
196 await Broker.Publish(This.Topic.FullTopic, MqttQualityOfService.AtLeastOnce, false, Request);
197 }
198 }
199 break;
200
201 case DiscoveryService.NCAPTIMDiscovery:
202 if (Data is DiscoveryDataEntities TimEntities)
203 {
204 int i, c = Math.Min(TimEntities.Names.Length, TimEntities.Identities.Length);
205
206 for (i = 0; i < c; i++)
207 {
208 ChannelAddress TimAddress = new ChannelAddress()
209 {
210 ApplicationId = TimEntities.Channel.ApplicationId,
211 NcapId = TimEntities.Channel.NcapId,
212 TimId = TimEntities.Identities[i],
213 ChannelId = 0
214 };
215
216 TopicString = TimAddress.GetTopic(This.Topic.FullTopic);
217 SubTopic = await This.Topic.Broker.GetTopic(TopicString, false, false);
218 if (Created = SubTopic is null)
219 SubTopic = await This.Topic.Broker.GetTopic(TopicString, true, false);
220
221 if (SubTopic.Node is MqttNcapTopicNode TopicNode)
222 await TopicNode.NameReceived(TimEntities.Names[i]);
223
224 if (ErrorCode == 0)
225 await SubTopic.Node.RemoveErrorAsync("DiscoveryError");
226 else
227 await SubTopic.Node.LogErrorAsync("DiscoveryError", "Discovery error: " + ErrorCode.ToString("X4"));
228
229 if (Created)
230 {
231 MqttBroker Broker = Topic.Broker;
232 MqttBrokerNode BrokerNode = Broker.Node;
233 StringBuilder ToSniffer = BrokerNode.HasSniffers ? new StringBuilder() : null;
234 byte[] Request = DiscoveryMessage.SerializeRequest(TimEntities.Channel.NcapId, TimEntities.Identities[i], ToSniffer);
235
236 if (!(ToSniffer is null))
237 await BrokerNode.Information(ToSniffer.ToString());
238
239 await Broker.Publish(This.Topic.FullTopic, MqttQualityOfService.AtLeastOnce, false, Request);
240 }
241 }
242 }
243 break;
244
245 case DiscoveryService.NCAPTIMTransducerDiscovery:
246 if (Data is DiscoveryDataChannels Channels)
247 {
248 int i, c = Math.Min(Channels.Names.Length, Channels.Channels.Length);
249
250 for (i = 0; i < c; i++)
251 {
252 ChannelAddress TimAddress = new ChannelAddress()
253 {
254 ApplicationId = Channels.Channel.ApplicationId,
255 NcapId = Channels.Channel.NcapId,
256 TimId = Channels.Channel.TimId,
257 ChannelId = Channels.Channels[i],
258 };
259
260 TopicString = TimAddress.GetTopic(This.Topic.FullTopic);
261 SubTopic = await This.Topic.Broker.GetTopic(TopicString, true, false);
262 if (SubTopic.Node is MqttNcapTopicNode TopicNode)
263 await TopicNode.NameReceived(Channels.Names[i]);
264
265 if (ErrorCode == 0)
266 await SubTopic.Node.RemoveErrorAsync("DiscoveryError");
267 else
268 await SubTopic.Node.LogErrorAsync("DiscoveryError", "Discovery error: " + ErrorCode.ToString("X4"));
269 }
270 }
271 break;
272 }
273 }
274 else
275 await LogErrorAsync(This, "DiscoveryResponseError", "Unable to parse Discovery response.");
276
277 return DataProcessingResult.Processed;
278 }
279 else
280 return DataProcessingResult.Processed;
281
282 if (!(SubTopic?.Node is MqttNcapTopicNode NcapTopicNode))
283 return DataProcessingResult.Processed;
284
285 if (!NcapTopicNode.ResponseReceived(Topic, Message) && ContainsMomentary)
286 {
287 // TODO: Report new momentary values on node.
288 }
289
290 return DataProcessingResult.Processed;
291 }
292
293 private static async Task ProcessRequest(MqttData This, Message Message)
294 {
295 MqttTopic SubTopic;
296
298 {
300 {
301 case TransducerAccessService.SyncReadTransducerSampleDataFromAChannelOfATIM:
303 out SamplingMode SamplingMode, out double TimeoutSeconds))
304 {
305 await RemoveErrorAsync(This, "TransducerRequestError");
306
307 StringBuilder sb = new StringBuilder();
308
309 sb.Append(This.Topic.FullTopic);
310 sb.Append('/');
311 sb.Append(Hashes.BinaryToString(Address.NcapId));
312
313 if (!MessageSwitch.IsZero(Address.TimId))
314 {
315 sb.Append('/');
316 sb.Append(Hashes.BinaryToString(Address.TimId));
317
318 if (Address.ChannelId != 0)
319 {
320 sb.Append('/');
321 sb.Append(Address.ChannelId.ToString());
322 }
323 }
324
325 SubTopic = await This.Topic.Broker.GetTopic(sb.ToString(), true, false);
326
327 if (!(SubTopic?.Node is ITransducerNode TransducerNode))
328 return;
329
330 await TransducerNode.TransducerDataRequest(TransducerAccessMessage, SamplingMode, TimeoutSeconds);
331 }
332 else
333 {
334 await LogErrorAsync(This, "TransducerRequestError", "Unable to parse Transducer request.");
335 return;
336 }
337 break;
338
339 case TransducerAccessService.SyncReadTransducerBlockDataFromAChannelOfATIM:
340 case TransducerAccessService.SyncReadTransducerSampleDataFromMulitipleChannelsOfATIM:
341 case TransducerAccessService.SyncReadTransducerBlockDataFromMulitipleChannelsOfATIM:
342 case TransducerAccessService.SyncReadTransducerSampleDataFromMultipleChannelsOfMultipleTIMs:
343 case TransducerAccessService.SyncReadTransducerBlockDataFromMultipleChannelsOfMultipleTIMs:
344 case TransducerAccessService.SyncWriteTransducerSampleDataFromAChannelOfATIM:
345 case TransducerAccessService.SyncWriteTransducerBlockDataFromAChannelOfATIM:
346 case TransducerAccessService.SyncWriteTransducerSampleDataFromMulitipleChannelsOfATIM:
347 case TransducerAccessService.SyncWriteTransducerBlockDataFromMulitipleChannelsOfATIM:
348 case TransducerAccessService.SyncWriteTransducerSampleDataFromMultipleChannelsOfMultipleTIMs:
349 case TransducerAccessService.SyncWriteTransducerBlockDataFromMultipleChannelsOfMultipleTIMs:
350 case TransducerAccessService.AsyncReadTransducerBlockDataFromAChannelOfATIM:
351 case TransducerAccessService.CallbackAsyncReadTransducerBlockDataFromAChannelOfATIM:
352 case TransducerAccessService.AsyncReadTransducerStreamDataFromAChannelOfATIM:
353 case TransducerAccessService.CallbackAsyncReadTransducerStreamDataFromAChannelOfATIM:
354 case TransducerAccessService.AsyncReadTransducerBlockDataFromMultipleChannelsOfATIM:
355 case TransducerAccessService.Callback:
356 case TransducerAccessService.AsyncReadTransducerBlockDataFromMultipleChannelOfMultipleTIMs:
357 case TransducerAccessService.CallbackAsyncReadTransducerBlockDataFromMultipleChannelOfMultipleTIMs:
358 return; // TODO
359 }
360 }
362 {
364 out TedsAccessCode TedsAccessCode, out uint TedsOffset,
365 out double TimeoutSeconds))
366 {
367 await RemoveErrorAsync(This, "TedsRequestError");
368
369 StringBuilder sb = new StringBuilder();
370
371 sb.Append(This.Topic.FullTopic);
372 sb.Append('/');
373 sb.Append(Hashes.BinaryToString(Address.NcapId));
374
375 if (!MessageSwitch.IsZero(Address.TimId))
376 {
377 sb.Append('/');
378 sb.Append(Hashes.BinaryToString(Address.TimId));
379
380 if (Address.ChannelId != 0)
381 {
382 sb.Append('/');
383 sb.Append(Address.ChannelId.ToString());
384 }
385 }
386
387 SubTopic = await This.Topic.Broker.GetTopic(sb.ToString(), true, false);
388
389 if (!(SubTopic?.Node is ITedsNode TedsNode))
390 return;
391
392 await TedsNode.TedsRequest(TedsAccessMessage, TedsAccessCode, TedsOffset, TimeoutSeconds);
393 }
394 else
395 {
396 await LogErrorAsync(This, "TedsRequestError", "Unable to parse TEDS request.");
397 return;
398 }
399 }
401 {
402 if (DiscoveryMessage.TryParseMessage(out ushort _, out DiscoveryData Data))
403 {
404 await RemoveErrorAsync(This, "DiscoveryRequestError");
405
406 StringBuilder sb = new StringBuilder();
407
408 sb.Append(This.Topic.FullTopic);
409 if (!MessageSwitch.IsZero(Data.Channel.TimId))
410 {
411 sb.Append('/');
412 sb.Append(Hashes.BinaryToString(Data.Channel.NcapId));
413
414 if (!MessageSwitch.IsZero(Data.Channel.TimId))
415 {
416 sb.Append('/');
417 sb.Append(Hashes.BinaryToString(Data.Channel.TimId));
418
419 if (Data.Channel.ChannelId != 0)
420 {
421 sb.Append('/');
422 sb.Append(Data.Channel.ChannelId.ToString());
423 }
424 }
425 }
426
427 SubTopic = await This.Topic.Broker.GetTopic(sb.ToString(), true, false);
428
429 if (SubTopic?.Node is MeteringNode Node)
430 {
431 bool Broadcast = SubTopic.LocalTopic == "D0" &&
432 await Node.GetParent() is RootTopic;
433
434 LinkedList<INode> ToProcess = new LinkedList<INode>();
435 IEnumerable<INode> ChildNodes;
436
437 if (Broadcast)
438 {
439 ChildNodes = await Node.ChildNodes;
440 if (!(ChildNodes is null))
441 {
442 foreach (INode ChildNode in ChildNodes)
443 ToProcess.AddLast(ChildNode);
444 }
445 }
446 else
447 ToProcess.AddLast(Node);
448
449 while (!(ToProcess.First is null))
450 {
451 INode ChildNode = ToProcess.First.Value;
452 ToProcess.RemoveFirst();
453
454 bool CheckChildren = Broadcast;
455
456 if (ChildNode is IDiscoverableNode DiscoverableNode)
457 await DiscoverableNode.DiscoveryRequest(DiscoveryMessage);
458 else
459 CheckChildren = ChildNode is DiscoverableTopicNode;
460
461 if (CheckChildren)
462 {
463 ChildNodes = await ChildNode.ChildNodes;
464 if (!(ChildNodes is null))
465 {
466 foreach (INode ChildNode2 in ChildNodes)
467 ToProcess.AddLast(ChildNode2);
468 }
469 }
470 }
471 }
472 }
473 else
474 {
475 await LogErrorAsync(This, "DiscoveryRequestError", "Unable to parse TEDS request.");
476 return;
477 }
478 }
479 }
480
481 private static Task LogErrorAsync(MqttData This, string EventId, string Message)
482 {
483 return This.Topic?.Node?.LogErrorAsync(EventId, Message) ?? Task.CompletedTask;
484 }
485
486 private static Task RemoveErrorAsync(MqttData This, string EventId)
487 {
488 return This.Topic?.Node?.RemoveErrorAsync(EventId) ?? Task.CompletedTask;
489 }
490
494 public override Grade DefaultSupport => Grade.Perfect;
495
503 public override Task StartReadout(ThingReference ThingReference, ISensorReadout Request, string Prefix, bool Last)
504 {
505 List<Field> Data = new List<Field>()
506 {
507 new Int32Field(ThingReference, this.Timestamp, this.Append(Prefix, "#Bytes"),
508 this.value?.Length ?? 0, FieldType.Momentary, FieldQoS.AutomaticReadout)
509 };
510
511 if (!(this.value is null) && this.value.Length <= 256)
512 {
513 Data.Add(new StringField(ThingReference, this.Timestamp, "Raw",
514 Convert.ToBase64String(this.value), FieldType.Momentary, FieldQoS.AutomaticReadout));
515 }
516
517 Request.ReportFields(Last, Data);
518
519 return Task.CompletedTask;
520 }
521 }
522}
Information about content received from the MQTT server.
Definition: MqttContent.cs:9
ICommunicationLayer CommunicationLayer
Communication layer on which the message was received.
Definition: MqttContent.cs:50
MqttHeader Header
MQTT Header
Definition: MqttContent.cs:35
Contains methods for simple hash calculations.
Definition: Hashes.cs:59
static string BinaryToString(byte[] Data)
Converts an array of bytes to a string with their hexadecimal representations (in lower case).
Definition: Hashes.cs:65
Task LogInformationToSniffer()
Logs accumulated sniffer output to associated sniffable interface.
Definition: Binary.cs:104
string GetTopic(string BaseTopic)
Gets the topic name of the corresponding node.
static byte[] SerializeRequest(StringBuilder SnifferOutput)
Serializes an NCAP Discovery request.
bool TryParseMessage(out ushort ErrorCode, out DiscoveryData Data)
Tries to parse a Discovery message.
bool TryParseRequest(out ChannelAddress Channel, out TedsAccessCode TedsAccessCode, out uint TedsOffset, out double TimeoutSeconds)
Tries to parse a TEDS request from the message.
Task<(ushort ErrorCode, Teds Teds)> TryParseTeds()
Tries to parse a TEDS from the message.
ChannelAddress ChannelInfo
Address information.
Definition: Teds.cs:54
bool TryParseTransducerData(ThingReference Thing, Teds ChannelTeds, Unit PreferredUnit, out ushort ErrorCode, out TransducerData Data)
Tries to parse Transducer Data from the message.
TransducerAccessService TransducerAccessService
Transducer Access Service
bool TryParseRequest(out ChannelAddress Channel, out SamplingMode SamplingMode, out double TimeoutSeconds)
Tries to parse a Transfucer Access request from the message.
Abstract base class for IEEE 1451.1.6 NCAPs.
Definition: Ncap.cs:22
override Grade DefaultSupport
Default support.
Definition: Ncap.cs:494
Ncap()
Abstract base class for IEEE 1451.1.6 NCAPs.
Definition: Ncap.cs:29
override Task StartReadout(ThingReference ThingReference, ISensorReadout Request, string Prefix, bool Last)
Starts a readout of the data.
Definition: Ncap.cs:503
Ncap(MqttTopic Topic, byte[] Value)
Abstract base class for IEEE 1451.1.6 NCAPs.
Definition: Ncap.cs:39
async Task< DataProcessingResult > DataReported(MqttTopic Topic, MqttContent Content, byte[] Data)
Called when new data has been published.
Definition: Ncap.cs:52
static async Task< DataProcessingResult > MessageReceived(MqttData This, MqttTopic Topic, Message Message)
Processes an IEEE 1451.0 message.
Definition: Ncap.cs:77
Static class for IEEE 1451-related parsing tasks.
static Task< Message > TryParseMessage(byte[] Data)
Tries to parse an IEEE 1451.0-encoded data.
Base class for all metering nodes.
Definition: MeteringNode.cs:28
Abstract base class for MQTT data encapsulations.
Definition: MqttData.cs:18
DateTime Timestamp
Timestamp of data reception.
Definition: MqttData.cs:39
string Append(string Prefix, string Name)
Appends a name to a topic name.
Definition: MqttData.cs:100
MQTT Broker connection object.
Definition: MqttBroker.cs:17
Task Publish(string Topic, MqttQualityOfService QoS, bool Retain, byte[] Data)
Publishes binary data to a topic.
Definition: MqttBroker.cs:147
MqttBrokerNode Node
Reference to broker node.
Definition: MqttBroker.cs:63
async Task< MqttTopic > GetTopic(string TopicString, bool CreateNew, bool IgnoreGuids)
Gets the Node responsible for managing a Topic
Definition: MqttBroker.cs:309
MQTT Topic information.
Definition: MqttTopic.cs:19
MqttBroker Broker
MQTT Broker
Definition: MqttTopic.cs:56
string FullTopic
Full topic name
Definition: MqttTopic.cs:66
IMqttTopicNode Node
Reference to the MQTT Topic Node
Definition: MqttTopic.cs:51
Node representing a connection to an MQTT broker.
async Task Information(string Comment)
Called to inform the viewer of something.
Represents a 32-bit integer value.
Definition: Int32Field.cs:10
Represents a string value.
Definition: StringField.cs:10
Contains a reference to a thing
Interface for nodes that are published through the concentrator interface.
Definition: INode.cs:49
Task< IEnumerable< INode > > ChildNodes
Child nodes. If no child nodes are available, null is returned.
Definition: INode.cs:140
Interface for classes managing sensor data readouts.
Task ReportFields(bool Done, params Field[] Fields)
Report read fields to the client.
Interface for nodes that can be discovered on an IEEE 1451.0 network.
Interface for nodes that can return TEDS.
Definition: ITedsNode.cs:10
Interface for nodes that can return transducer information.
Task< bool > RemoveErrorAsync()
Removes error messages with an empty event ID from the node.
Task LogErrorAsync(string Body)
Logs an error message on the node.
MqttQualityOfService
MQTT Quality of Service level.
Grade
Grade enumeration
Definition: Grade.cs:7
MessageType
Network Service Message Type
Definition: MessageType.cs:7
TransducerAccessService
Transducer access service
DataProcessingResult
Results from processing an incoming message.
Definition: IMqttData.cs:17
FieldQoS
Field Quality of Service flags
Definition: FieldQoS.cs:10
FieldType
Field Type flags
Definition: FieldType.cs:10
MqttQualityOfService QualityOfService
Quality of Service level.
Definition: MqttHeader.cs:16