Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
MqttBroker.cs
1using System;
2using System.Collections.Generic;
3using System.Text;
4using System.Threading.Tasks;
5using Waher.Events;
10
12{
16 public class MqttBroker : IDisposable
17 {
18 private static Scheduler scheduler = null;
19 private readonly SortedDictionary<string, MqttTopic> topics = new SortedDictionary<string, MqttTopic>();
20 private readonly MqttBrokerNode node;
21 private MqttClient mqttClient;
22 private bool connectionOk = false;
23 private readonly string host;
24 private readonly int port;
25 private readonly bool tls;
26 private readonly bool trustServer;
27 private readonly string userName;
28 private readonly string password;
29 private readonly string connectionSubscription;
30 private string willTopic;
31 private string willData;
32 private bool willRetain;
33 private MqttQualityOfService willQoS;
34 private DateTime nextCheck;
35
39 public MqttBroker(MqttBrokerNode Node, string Host, int Port, bool Tls, bool TrustServer, string UserName, string Password,
40 string ConnectionSubscription, string WillTopic, string WillData, bool WillRetain, MqttQualityOfService WillQoS)
41 {
42 this.node = Node;
43 this.host = Host;
44 this.port = Port;
45 this.tls = Tls;
46 this.trustServer = TrustServer;
47 this.userName = UserName;
48 this.password = Password;
49 this.connectionSubscription = ConnectionSubscription;
50 this.willTopic = WillTopic;
51 this.willData = WillData;
52 this.willRetain = WillRetain;
53 this.willQoS = WillQoS;
54
55 this.Open();
56 }
57
58 internal MqttClient Client => this.mqttClient;
59
63 public MqttBrokerNode Node => this.node;
64
65 private void Open()
66 {
67 this.mqttClient = new MqttClient(this.host, this.port, this.tls, this.userName, this.password, this.willTopic,
68 this.willQoS, this.willRetain, Encoding.UTF8.GetBytes(this.willData))
69 {
70 TrustServer = this.trustServer
71 };
72
73 this.mqttClient.OnConnectionError += this.MqttClient_OnConnectionError;
74 this.mqttClient.OnContentReceived += this.MqttClient_OnContentReceived;
75 this.mqttClient.OnStateChanged += this.MqttClient_OnStateChanged;
76
77 this.nextCheck = Scheduler.Add(DateTime.Now.AddMinutes(1), this.CheckOnline, null);
78 }
79
80 private async Task Close()
81 {
82 if (!(this.mqttClient is null))
83 {
84 Scheduler.Remove(this.nextCheck);
85
86 this.mqttClient.OnConnectionError -= this.MqttClient_OnConnectionError;
87 this.mqttClient.OnContentReceived -= this.MqttClient_OnContentReceived;
88 this.mqttClient.OnStateChanged -= this.MqttClient_OnStateChanged;
89
90 await this.mqttClient.DisposeAsync();
91 this.mqttClient = null;
92 }
93 }
94
98 [Obsolete("Use the DisposeAsync() method.")]
99 public async void Dispose()
100 {
101 try
102 {
103 await this.DisposeAsync();
104 }
105 catch (Exception ex)
106 {
107 Log.Exception(ex);
108 }
109 }
110
114 public Task DisposeAsync()
115 {
116 return this.Close();
117 }
118
119 private async void CheckOnline(object _)
120 {
121 try
122 {
123 if (!(this.mqttClient is null) && !NetworkingModule.Stopping)
124 {
125 MqttState State = this.mqttClient.State;
126 if (State == MqttState.Offline || State == MqttState.Error || State == MqttState.Authenticating)
127 await this.mqttClient.Reconnect();
128 }
129 }
130 catch (Exception ex)
131 {
132 Log.Exception(ex);
133 }
134 finally
135 {
136 this.nextCheck = Scheduler.Add(DateTime.Now.AddMinutes(1), this.CheckOnline, null);
137 }
138 }
139
147 public Task Publish(string Topic, MqttQualityOfService QoS, bool Retain, byte[] Data)
148 {
149 return this.Client.PUBLISH(Topic, QoS, Retain, Data);
150 }
151
159 public Task Publish(string Topic, MqttQualityOfService QoS, bool Retain, string Data)
160 {
161 return this.Publish(Topic, QoS, Retain, Encoding.UTF8.GetBytes(Data));
162 }
163
167 public async Task DataReceived(MqttContent Content)
168 {
169 MqttTopic Topic = await this.GetTopic(Content.Topic, true, true);
170 if (!(Topic is null))
171 await Topic.DataReported(Content);
172 }
173
177 public async Task SetWill(string WillTopic, string WillData, bool WillRetain, MqttQualityOfService WillQoS)
178 {
179 if (this.willTopic != WillTopic || this.willData != WillData || this.willRetain != WillRetain || this.willQoS != WillQoS)
180 {
181 await this.Close();
182
183 this.willTopic = WillTopic;
184 this.willData = WillData;
185 this.willRetain = WillRetain;
186 this.willQoS = WillQoS;
187
188 this.Open();
189 }
190 }
191
192 private async Task MqttClient_OnStateChanged(object Sender, MqttState NewState)
193 {
194 try
195 {
196 switch (NewState)
197 {
198 case MqttState.Connected:
199 this.connectionOk = true;
200 await this.node.RemoveErrorAsync("Offline");
201 await this.node.RemoveErrorAsync("Error");
202
203 if (!string.IsNullOrEmpty(this.connectionSubscription))
204 {
205 string[] Parts = this.connectionSubscription.Split(',');
206 int i, c = Parts.Length;
207
208 for (i = 0; i < c; i++)
209 Parts[i] = Parts[i].Trim();
210
211 await this.mqttClient.SUBSCRIBE(MqttQualityOfService.AtLeastOnce, Parts);
212 }
213 break;
214
215 case MqttState.Error:
216 await this.node.LogErrorAsync("Error", "Connection to broker failed.");
217
219 await this.Reconnect();
220 break;
221
222 case MqttState.Offline:
223 await this.node.LogErrorAsync("Offline", "Connection is closed.");
224
226 await this.Reconnect();
227 break;
228 }
229 }
230 catch (Exception ex)
231 {
232 Log.Exception(ex);
233 }
234 }
235
236 private Task MqttClient_OnContentReceived(object Sender, MqttContent Content)
237 {
238 lock (this.topics)
239 {
240 if (this.processing)
241 {
242 this.queue.AddLast(Content);
243 return Task.CompletedTask;
244 }
245 else
246 this.processing = true;
247 }
248
249 this.Process(Content);
250
251 return Task.CompletedTask;
252 }
253
254 private readonly LinkedList<MqttContent> queue = new LinkedList<MqttContent>();
255 private bool processing = false;
256
257 private async void Process(MqttContent Content)
258 {
259 try
260 {
261 while (true)
262 {
263 MqttTopic Topic = await this.GetTopic(Content.Topic, true, true);
264 if (!(Topic is null))
265 await Topic.DataReported(Content);
266
267 lock (this.topics)
268 {
269 if (this.queue.First is null)
270 {
271 this.processing = false;
272 break;
273 }
274 else
275 {
276 Content = this.queue.First.Value;
277 this.queue.RemoveFirst();
278 }
279 }
280 }
281 }
282 catch (Exception ex)
283 {
284 Log.Exception(ex);
285 this.processing = false;
286 }
287 }
288
289 private async Task MqttClient_OnConnectionError(object Sender, Exception Exception)
290 {
292 await this.Reconnect();
293 }
294
295 private async Task Reconnect()
296 {
297 if (this.connectionOk)
298 {
299 this.connectionOk = false;
300
301 if (!(this.mqttClient is null) && !NetworkingModule.Stopping)
302 await this.mqttClient.Reconnect();
303 }
304 }
305
309 public async Task<MqttTopic> GetTopic(string TopicString, bool CreateNew, bool IgnoreGuids)
310 {
311 if (string.IsNullOrEmpty(TopicString))
312 return null;
313
314 MqttTopicRepresentation Representation = new MqttTopicRepresentation(TopicString, TopicString.Split('/'), 0);
315 MqttTopic Topic = await this.GetLocalTopic(Representation, CreateNew);
316
317 if (Topic is null)
318 return null;
319 else if (Representation.MoveNext(Topic))
320 return await Topic.GetTopic(Representation, CreateNew, IgnoreGuids, this);
321 else
322 return Topic;
323 }
324
325 private async Task<MqttTopic> GetLocalTopic(MqttTopicRepresentation Representation, bool CreateNew)
326 {
327 string CurrentSegment = Representation.CurrentSegment;
328 MqttTopic Topic, Topic2;
329
330 lock (this.topics)
331 {
332 if (this.topics.TryGetValue(CurrentSegment, out Topic))
333 return Topic;
334 }
335
336 if (Guid.TryParse(CurrentSegment.Replace('_', '-'), out Guid _))
337 return null;
338
339 if (this.node.HasChildren)
340 {
341 foreach (INode Child in await this.node.ChildNodes)
342 {
343 if (Child is IMqttTopicNode TopicNode && TopicNode.LocalTopic == CurrentSegment)
344 {
345 lock (this.topics)
346 {
347 if (this.topics.TryGetValue(CurrentSegment, out Topic2))
348 return Topic2;
349 else
350 {
351 Topic = new MqttTopic(TopicNode, CurrentSegment, CurrentSegment, null, this);
352 this.topics[CurrentSegment] = Topic;
353 return Topic;
354 }
355 }
356 }
357 }
358 }
359
360 if (!CreateNew)
361 return null;
362
363 IMqttTopicNode AddNode = Types.FindBest<IMqttTopicNode, MqttTopicRepresentation>(Representation);
364 if (AddNode is null)
365 return null;
366
367 AddNode = await AddNode.CreateNew(Representation);
368 Topic = new MqttTopic(AddNode, AddNode.LocalTopic, AddNode.LocalTopic, null, this);
369
370 lock (this.topics)
371 {
372 if (this.topics.TryGetValue(CurrentSegment, out Topic2))
373 return Topic2;
374 else
375 this.topics[CurrentSegment] = Topic;
376 }
377
378 await this.node.AddAsync(AddNode);
379
380 return Topic;
381 }
382
388 public bool Remove(string LocalTopic)
389 {
390 if (!(LocalTopic is null))
391 {
392 lock (this.topics)
393 {
394 return this.topics.Remove(LocalTopic);
395 }
396 }
397 else
398 return false;
399 }
400
404 public static Scheduler Scheduler
405 {
406 get
407 {
408 if (scheduler is null)
409 {
410 if (Types.TryGetModuleParameter("Scheduler", out object Obj) && Obj is Scheduler Scheduler)
411 scheduler = Scheduler;
412 else
413 {
414 scheduler = new Scheduler();
415
416 Log.Terminating += (Sender, e) =>
417 {
418 scheduler?.Dispose();
419 scheduler = null;
420 };
421 }
422 }
423
424 return scheduler;
425 }
426 }
427
428 }
429}
Static class managing the application event log. Applications and services log events on this static ...
Definition: Log.cs:13
static void Exception(Exception Exception, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, params KeyValuePair< string, object >[] Tags)
Logs an exception. Event type will be determined by the severity of the exception.
Definition: Log.cs:1647
Manages an MQTT connection. Implements MQTT v3.1.1, as defined in http://docs.oasis-open....
Definition: MqttClient.cs:26
MqttState State
Current state of connection.
Definition: MqttClient.cs:812
async Task Reconnect()
Reconnects a client after an error or if it's offline. Reconnecting, instead of creating a completely...
Definition: MqttClient.cs:210
Task< ushort > PUBLISH(string Topic, MqttQualityOfService QoS, bool Retain, byte[] Data)
Publishes information on a topic.
Definition: MqttClient.cs:851
Task< ushort > SUBSCRIBE(string Topic, MqttQualityOfService QoS)
Subscribes to information from a topic. Topics can include wildcards.
Definition: MqttClient.cs:988
async Task DisposeAsync()
Closes the connection and disposes of all resources.
Definition: MqttClient.cs:1174
Information about content received from the MQTT server.
Definition: MqttContent.cs:9
Module that controls the life cycle of communication.
static bool Stopping
If the system is stopping.
Static class that dynamically manages types and interfaces available in the runtime environment.
Definition: Types.cs:14
static bool TryGetModuleParameter(string Name, out object Value)
Tries to get a module parameter value.
Definition: Types.cs:583
Class that can be used to schedule events in time. It uses a timer to execute tasks at the appointed ...
Definition: Scheduler.cs:26
bool Remove(DateTime When)
Removes an event scheduled for a given point in time.
Definition: Scheduler.cs:182
void Dispose()
IDisposable.Dispose
Definition: Scheduler.cs:46
DateTime Add(DateTime When, ScheduledEventCallback Callback, object State)
Adds an event.
Definition: Scheduler.cs:66
MQTT Broker connection object.
Definition: MqttBroker.cs:17
Task Publish(string Topic, MqttQualityOfService QoS, bool Retain, string Data)
Publishes text data to a topic.
Definition: MqttBroker.cs:159
async Task DataReceived(MqttContent Content)
TODO
Definition: MqttBroker.cs:167
static Scheduler Scheduler
Scheduler for asynchronous tasks.
Definition: MqttBroker.cs:405
bool Remove(string LocalTopic)
Removes a child topic
Definition: MqttBroker.cs:388
Task DisposeAsync()
Closes the connection and disposes of all resources.
Definition: MqttBroker.cs:114
MqttBroker(MqttBrokerNode Node, string Host, int Port, bool Tls, bool TrustServer, string UserName, string Password, string ConnectionSubscription, string WillTopic, string WillData, bool WillRetain, MqttQualityOfService WillQoS)
MQTT Broker connection object.
Definition: MqttBroker.cs:39
async Task SetWill(string WillTopic, string WillData, bool WillRetain, MqttQualityOfService WillQoS)
TODO
Definition: MqttBroker.cs:177
Task Publish(string Topic, MqttQualityOfService QoS, bool Retain, byte[] Data)
Publishes binary data to a topic.
Definition: MqttBroker.cs:147
MqttBrokerNode Node
Reference to broker node.
Definition: MqttBroker.cs:63
async Task< MqttTopic > GetTopic(string TopicString, bool CreateNew, bool IgnoreGuids)
Gets the Node responsible for managing a Topic
Definition: MqttBroker.cs:309
MQTT Topic information.
Definition: MqttTopic.cs:19
async Task DataReported(MqttContent Content)
Called when new data has been published.
Definition: MqttTopic.cs:176
Node representing a connection to an MQTT broker.
Contains information about an MQTT topic
bool MoveNext(MqttTopic NewParent)
Moves to the next segment.
string CurrentSegment
Current segment being processed.
Interface for nodes that are published through the concentrator interface.
Definition: INode.cs:49
Task< IEnumerable< INode > > ChildNodes
Child nodes. If no child nodes are available, null is returned.
Definition: INode.cs:140
Interface for MQTT Topic nodes.
string LocalTopic
Local Topic segment
MqttQualityOfService
MQTT Quality of Service level.
MqttState
State of MQTT connection.
Definition: MqttState.cs:11