Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
MqttTopic.cs
1using System;
2using System.Collections.Generic;
3using System.Threading.Tasks;
12
14{
18 public class MqttTopic
19 {
20 private readonly SortedDictionary<string, MqttTopic> topics = new SortedDictionary<string, MqttTopic>();
21 private readonly IMqttTopicNode node;
22 private readonly ThingReference nodeReference;
23 private readonly MqttTopic parent;
24 private readonly MqttBroker broker;
25 private readonly string localTopic;
26 private readonly string fullTopic;
27 private long dataCount = 0;
28 private Exception ex = null;
29 private DateTime exTP = DateTime.MinValue;
30 private IMqttData data = null;
31
36 {
37 this.node = Node;
38 this.fullTopic = FullTopic;
39 this.localTopic = LocalTopic;
40 this.parent = Parent;
41 this.broker = Broker;
42
43 this.nodeReference = Node as ThingReference;
44 if (this.nodeReference is null && !(Node is null))
45 this.nodeReference = new ThingReference(Node.NodeId, Node.SourceId, Node.Partition);
46 }
47
51 public IMqttTopicNode Node => this.node;
52
56 public MqttBroker Broker => this.broker;
57
61 public string LocalTopic => this.localTopic;
62
66 public string FullTopic => this.fullTopic;
67
71 public IMqttData Data => this.data;
72
73 private MqttTopic[] GetChildNodes()
74 {
75 if (this.topics is null)
76 return new MqttTopic[0];
77 else
78 {
79 MqttTopic[] Result;
80
81 lock (this.topics)
82 {
83 Result = new MqttTopic[this.topics.Count];
84 this.topics.Values.CopyTo(Result, 0);
85 }
86
87 return Result;
88 }
89 }
90
91 internal async Task<MqttTopic> GetTopic(MqttTopicRepresentation Representation, bool CreateNew, bool IgnoreGuids, MqttBroker Broker)
92 {
93 MqttTopic Topic = await this.GetLocalTopic(Representation, CreateNew, IgnoreGuids, Broker);
94
95 if (Topic is null)
96 return null;
97 else if (Representation.MoveNext(Topic))
98 return await Topic.GetTopic(Representation, CreateNew, IgnoreGuids, Broker);
99 else
100 return Topic;
101 }
102
103 private async Task<MqttTopic> GetLocalTopic(MqttTopicRepresentation Representation, bool CreateNew, bool IgnoreGuids, MqttBroker Broker)
104 {
105 string CurrentSegment = Representation.CurrentSegment;
106 MqttTopic Topic, Topic2;
107
108 lock (this.topics)
109 {
110 if (this.topics.TryGetValue(CurrentSegment, out Topic))
111 return Topic;
112 }
113
114 if (IgnoreGuids && Guid.TryParse(CurrentSegment.Replace('_', '-'), out Guid _))
115 return null;
116
117 if (this.node.HasChildren)
118 {
119 foreach (INode Child in await this.node.ChildNodes)
120 {
121 if (Child is IMqttTopicNode TopicNode && TopicNode.LocalTopic == CurrentSegment)
122 {
123 lock (this.topics)
124 {
125 if (this.topics.TryGetValue(CurrentSegment, out Topic2))
126 return Topic2;
127 else
128 {
129 Topic = new MqttTopic(TopicNode, Representation.ProcessedSegments, CurrentSegment, null, Broker);
130 this.topics[CurrentSegment] = Topic;
131 return Topic;
132 }
133 }
134 }
135 }
136 }
137
138 if (!CreateNew)
139 return null;
140
141 IMqttTopicNode AddNode = Types.FindBest<IMqttTopicNode, MqttTopicRepresentation>(Representation);
142 if (AddNode is null)
143 return null;
144
145 AddNode = await AddNode.CreateNew(Representation);
146 Topic = new MqttTopic(AddNode, Representation.ProcessedSegments, AddNode.LocalTopic, null, Broker);
147
148 lock (this.topics)
149 {
150 if (this.topics.TryGetValue(CurrentSegment, out Topic2))
151 return Topic2;
152 else
153 this.topics[CurrentSegment] = Topic;
154 }
155
156 await this.node.AddAsync(AddNode);
157
158 return Topic;
159 }
160
166 public void SetData<T>(T Data)
167 where T : IMqttData
168 {
169 this.data = Data;
170 }
171
176 public async Task DataReported(MqttContent Content)
177 {
178 int Len = Content.Data.Length;
179 if (Len == 0)
180 {
181 this.data = null;
182 return;
183 }
184
185 this.dataCount += Len;
186
187 bool NewMomentaryValues;
188
189 try
190 {
191 if (this.data is null)
192 this.data = this.FindDataType(Content).CreateNew(this, Content);
193
194 switch (await this.data.DataReported(this, Content))
195 {
196 case DataProcessingResult.Incompatible:
197 default:
198 this.data = null;
199 this.data = this.FindDataType(Content).CreateNew(this, Content);
200 NewMomentaryValues = false;
201 break;
202
203 case DataProcessingResult.Processed:
204 NewMomentaryValues = false;
205 break;
206
207 case DataProcessingResult.ProcessedNewMomentaryValues:
208 NewMomentaryValues = true;
209 break;
210 }
211
212 await this.SetOk();
213 }
214 catch (Exception)
215 {
216 this.data = this.FindDataType(Content).CreateNew(this, Content);
217 NewMomentaryValues = false;
218 }
219
220 if (this.broker.Client?.HasSniffers ?? false)
221 this.data.SnifferOutput(this.broker.Client);
222
223 if (NewMomentaryValues)
224 {
225 try
226 {
227 InternalReadoutRequest Request = new InternalReadoutRequest(string.Empty,
228 new IThingReference[] { this.node }, FieldType.Momentary, null, DateTime.MinValue, DateTime.MaxValue,
229 (Sender, e) =>
230 {
231 this.node.NewMomentaryValues(e.Fields);
232
233 MqttTopic Current = this;
234 MqttTopic Parent = this.parent;
235
236 while (!(Parent is null))
237 {
238 foreach (Field F in e.Fields)
239 {
240 if (F.Name == "Value")
241 F.Name = Current.localTopic;
242 else
243 F.Name = Current.localTopic + ", " + F.Name;
244
245 Parent.node.NewMomentaryValues(F);
246 }
247
248 Current = Parent;
249 Parent = Parent.parent;
250 }
251
252 return Task.CompletedTask;
253 },
254 (Sender, e) =>
255 {
256 return Task.CompletedTask;
257 }, null);
258
259 await this.StartReadout(Request, true);
260 }
261 catch (Exception ex)
262 {
263 await this.Exception(ex);
264 }
265 }
266 }
267
274 {
275 try
276 {
277 IMqttData Data = Types.FindBest<IMqttData, MqttContent>(Content);
278 if (!(Data is null))
279 return Data;
280
281 return new BinaryData(this, Content.Data);
282 }
283 catch (Exception)
284 {
285 return new BinaryData(this, Content.Data);
286 }
287 }
288
289 private Task SetOk()
290 {
291 this.ex = null;
292 this.exTP = DateTime.MinValue;
293
294 return this.node.RemoveErrorAsync("Error");
295 }
296
297 private Task Exception(Exception ex)
298 {
299 this.ex = ex;
300 this.exTP = DateTime.UtcNow;
301
302 return this.node.LogErrorAsync("Error", ex.Message);
303 }
304
308 public override string ToString()
309 {
310 return this.fullTopic;
311 }
312
319 public Task StartReadout(ISensorReadout Request, bool DoneAfter)
320 {
321 return this.StartReadout(this.nodeReference, Request, string.Empty, DoneAfter);
322 }
323
327 public async Task StartReadout(ThingReference ThingReference, ISensorReadout Request, string Prefix, bool Last)
328 {
329 try
330 {
331 MqttTopic[] ChildNodes = this.GetChildNodes();
332
333 if (!(ChildNodes is null) && ChildNodes.Length > 0)
334 {
335 foreach (MqttTopic ChildTopic in ChildNodes)
336 {
337 await ChildTopic.StartReadout(ThingReference, Request,
338 string.IsNullOrEmpty(Prefix) ? ChildTopic.LocalTopic : Prefix + ", " + ChildTopic.LocalTopic, false);
339 }
340 }
341
342 if (!(this.ex is null))
343 await Request.ReportErrors(Last, new ThingError(ThingReference, this.exTP, this.ex.Message));
344 else if (this.data is null)
345 {
346 this.data = await this.node.GetDefaultDataObject();
347
348 if (this.data is null)
349 {
350 if (Last)
351 await Request.ReportFields(true);
352 }
353 else
354 await this.data.StartReadout(ThingReference, Request, Prefix, Last);
355 }
356 else
357 await this.data.StartReadout(ThingReference, Request, Prefix, Last);
358
359 await this.node.RemoveErrorAsync("Readout");
360 }
361 catch (Exception ex)
362 {
363 await Request.ReportErrors(Last, new ThingError(ThingReference, DateTime.UtcNow, ex.Message));
364 await this.node.LogErrorAsync("Readout", ex.Message);
365 }
366 }
367
372 {
373 if (this.data is null || !this.data.IsControllable)
374 return new ControlParameter[0];
375 else
376 return this.data.GetControlParameters();
377 }
378
382 public async Task<IEnumerable<Parameter>> GetDisplayableParametersAsync(LinkedList<Parameter> Parameters,
384 {
385 if (!(this.data is null))
386 {
387 Parameters.AddLast(new StringParameter("Type", await Language.GetStringAsync(typeof(MqttTopicNode), 25, "Type"),
388 await this.data.GetTypeName(Language)));
389 }
390
391 if (this.dataCount > 0)
392 {
393 Parameters.AddLast(new Int64Parameter("Data Count", await Language.GetStringAsync(typeof(MqttTopicNode), 26, "Data Count"),
394 this.dataCount));
395 }
396
397 return Parameters;
398 }
399
405 public bool Remove(string LocalTopic)
406 {
407 if (!(LocalTopic is null))
408 {
409 lock (this.topics)
410 {
411 return this.topics.Remove(LocalTopic);
412 }
413 }
414 else
415 return false;
416 }
417
421 public MqttClient MqttClient => this.broker?.Client;
422 }
423}
Manages an MQTT connection. Implements MQTT v3.1.1, as defined in http://docs.oasis-open....
Definition: MqttClient.cs:26
Information about content received from the MQTT server.
Definition: MqttContent.cs:9
Manages a chat sensor data readout request.
Static class that dynamically manages types and interfaces available in the runtime environment.
Definition: Types.cs:14
Contains information about a language.
Definition: Language.cs:17
Task< string > GetStringAsync(Type Type, int Id, string Default)
Gets the string value of a string ID. If no such string exists, a string is created with the default ...
Definition: Language.cs:209
Abstract base class for control parameters.
Represents an MQTT topic with binary data.
Definition: BinaryData.cs:17
MQTT Broker connection object.
Definition: MqttBroker.cs:17
MQTT Topic information.
Definition: MqttTopic.cs:19
async Task DataReported(MqttContent Content)
Called when new data has been published.
Definition: MqttTopic.cs:176
MqttBroker Broker
MQTT Broker
Definition: MqttTopic.cs:56
void SetData< T >(T Data)
Sets the parsed data of a topic.
Definition: MqttTopic.cs:166
string FullTopic
Full topic name
Definition: MqttTopic.cs:66
async Task< IEnumerable< Parameter > > GetDisplayableParametersAsync(LinkedList< Parameter > Parameters, Language Language, RequestOrigin _)
TODO
Definition: MqttTopic.cs:382
ControlParameter[] GetControlParameters()
TODO
Definition: MqttTopic.cs:371
async Task StartReadout(ThingReference ThingReference, ISensorReadout Request, string Prefix, bool Last)
TODO
Definition: MqttTopic.cs:327
MqttTopic(IMqttTopicNode Node, string FullTopic, string LocalTopic, MqttTopic Parent, MqttBroker Broker)
MQTT Topic information.
Definition: MqttTopic.cs:35
IMqttData FindDataType(MqttContent Content)
FInds best implementation to process binary data.
Definition: MqttTopic.cs:273
override string ToString()
TODO
Definition: MqttTopic.cs:308
string LocalTopic
Local topic name.
Definition: MqttTopic.cs:61
IMqttTopicNode Node
Reference to the MQTT Topic Node
Definition: MqttTopic.cs:51
bool Remove(string LocalTopic)
Removes a child topic
Definition: MqttTopic.cs:405
IMqttData Data
Current parsed data.
Definition: MqttTopic.cs:71
Task StartReadout(ISensorReadout Request, bool DoneAfter)
Starts the readout of the sensor.
Definition: MqttTopic.cs:319
A Metering node representing an MQTT topic
Contains information about an MQTT topic
bool MoveNext(MqttTopic NewParent)
Moves to the next segment.
string CurrentSegment
Current segment being processed.
Tokens available in request.
Definition: RequestOrigin.cs:9
Base class for all sensor data fields.
Definition: Field.cs:20
Contains information about an error on a thing
Definition: ThingError.cs:10
Contains a reference to a thing
Interface for nodes that are published through the concentrator interface.
Definition: INode.cs:49
Task< IEnumerable< INode > > ChildNodes
Child nodes. If no child nodes are available, null is returned.
Definition: INode.cs:140
Interface for classes managing sensor data readouts.
Task ReportErrors(bool Done, params ThingError[] Errors)
Report error states to the client.
Task ReportFields(bool Done, params Field[] Fields)
Report read fields to the client.
Interface for thing references.
string Partition
Optional partition in which the Node ID is unique.
string SourceId
Optional ID of source containing node.
void NewMomentaryValues(params Field[] Values)
Reports newly measured values.
Interface for MQTT Topic nodes.
string LocalTopic
Local Topic segment
Task< IMqttTopicNode > CreateNew(MqttTopicRepresentation Topic)
Creates a new node of the same type.
Interface for MQTT Data encapsulations
Definition: IMqttData.cs:38
Task< string > GetTypeName(Language Language)
Type name representing data.
ControlParameter[] GetControlParameters()
Gets an array of control parameters
bool IsControllable
If data can be controlled (written)
Definition: IMqttData.cs:62
Task StartReadout(ThingReference ThingReference, ISensorReadout Request, string Prefix, bool Last)
Starts a readout of the data.
Task< DataProcessingResult > DataReported(MqttTopic Topic, MqttContent Content)
Called when new data has been published.
IMqttData CreateNew(MqttTopic Topic, MqttContent Content)
Creates a new instance of the data.
void SnifferOutput(ICommunicationLayer Output)
Outputs the parsed data to the sniffer.
DataProcessingResult
Results from processing an incoming message.
Definition: IMqttData.cs:17
FieldType
Field Type flags
Definition: FieldType.cs:10