Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
MqttBrokerNode.cs
1using System;
2using System.Collections;
3using System.Collections.Generic;
4using System.Threading.Tasks;
12using Waher.Things.Ip;
14
15namespace Waher.Things.Mqtt
16{
21 {
22 private MqttQualityOfService willQoS = MqttQualityOfService.AtLeastOnce;
23 private string userName = string.Empty;
24 private string password = string.Empty;
25 private string willTopic = string.Empty;
26 private string willData = string.Empty;
27 private string brokerKey = null;
28 private string connectionSubscription = "#";
29 private bool willRetain = false;
30 private bool trustServer = false;
31
36 : base()
37 {
38 this.Port = 8883;
39 this.Tls = true;
40 }
41
45 [Page(1, "IP")]
46 [Header(44, "Trust Server", 80)]
47 [ToolTip(45, "If the remote server certificate should be trusted even if it is not valid.")]
48 public bool TrustServer
49 {
50 get => this.trustServer;
51 set => this.trustServer = value;
52 }
53
57 [Page(2, "MQTT")]
58 [Header(3, "User Name:")]
59 [ToolTip(4, "User name used during authentication process.")]
60 [DefaultValueStringEmpty]
61 public string UserName
62 {
63 get => this.userName;
64 set => this.userName = value;
65 }
66
70 [Page(2, "MQTT")]
71 [Header(5, "Password:")]
72 [ToolTip(6, "Password used during authentication process. NOTE: Will be sent in clear text. Don't reuse passwords.")]
73 [Masked]
74 [DefaultValueStringEmpty]
75 public string Password
76 {
77 get => this.password;
78 set => this.password = value;
79 }
80
84 [Page(2, "MQTT")]
85 [Header(46, "Connection Subscription:")]
86 [ToolTip(47, "Subscription topic executed when connecting. Empty means no subscription will be performed. Multiple subjects can be comma-separated.")]
87 [DefaultValue("#")]
89 {
90 get => this.connectionSubscription;
91 set => this.connectionSubscription = value;
92 }
93
97 [Page(10, "Last Will and Testament")]
98 [Header(11, "Will Topic:")]
99 [ToolTip(12, "When the connection is lost, a Last Will and Testament can be published on this topic to alert subscribers you've lost connection.")]
100 [DefaultValueStringEmpty]
101 public string WillTopic
102 {
103 get => this.willTopic;
104 set => this.willTopic = value;
105 }
106
110 [Page(10, "Last Will and Testament")]
111 [Header(13, "Will Data:")]
112 [ToolTip(14, "When the connection is lost, this content will be published on the topic defined above.")]
113 [DefaultValueStringEmpty]
114 public string WillData
115 {
116 get => this.willData;
117 set => this.willData = value;
118 }
119
123 [Page(10, "Last Will and Testament")]
124 [Header(15, "Retain Will on topic.")]
125 [ToolTip(16, "If the content published on the will should be retained on the topic.")]
126 [DefaultValue(false)]
127 public bool WillRetain
128 {
129 get => this.willRetain;
130 set => this.willRetain = value;
131 }
132
136 [Page(10, "Last Will and Testament")]
137 [Header(17, "Quality of Service:")]
138 [ToolTip(18, "The quality of service used when sending the last will and testament.")]
139 [DefaultValue(MqttQualityOfService.AtLeastOnce)]
140 [Option(MqttQualityOfService.AtMostOnce, 19, "At most once")]
141 [Option(MqttQualityOfService.AtLeastOnce, 20, "At least once")]
142 [Option(MqttQualityOfService.ExactlyOnce, 19, "Exactly once")]
144 {
145 get => this.willQoS;
146 set => this.willQoS = value;
147 }
148
152 public override Task<string> GetTypeNameAsync(Language Language)
153 {
154 return Language.GetStringAsync(typeof(MqttBrokerNode), 1, "MQTT Broker");
155 }
156
160 public override Task<bool> AcceptsChildAsync(INode Child)
161 {
162 return Task.FromResult(Child is MqttTopicNode);
163 }
164
168 public override Task DestroyAsync()
169 {
170 if (!string.IsNullOrEmpty(this.brokerKey))
171 MqttBrokers.DestroyBroker(this.brokerKey);
172
173 return base.DestroyAsync();
174 }
175
179 [IgnoreMember]
180 public string Key
181 {
182 get
183 {
184 string PrevKey = this.brokerKey;
185 this.brokerKey = MqttBrokers.GetKey(this.Host, this.Port, this.Tls, this.trustServer, this.userName, this.password,
186 this.connectionSubscription);
187
188 if (PrevKey != this.brokerKey && !string.IsNullOrEmpty(PrevKey))
189 MqttBrokers.DestroyBroker(PrevKey);
190
191 return this.brokerKey;
192 }
193 }
194
198 protected override Task NodeUpdated()
199 {
200 this.GetBroker();
201
202 return base.NodeUpdated();
203 }
204
209 public Task<MqttBroker> GetBroker()
210 {
211 return MqttBrokers.GetBroker(this, this.Key, this.Host, this.Port, this.Tls, this.TrustServer, this.userName, this.password,
212 this.connectionSubscription, this.willTopic, this.willData, this.willRetain, this.willQoS);
213 }
214
218 public override async Task<bool> RemoveAsync(INode Child)
219 {
220 if (Child is MqttTopicNode Topic)
221 (await this.GetBroker()).Remove(Topic.LocalTopic);
222
223 return await base.RemoveAsync(Child);
224 }
225
226 #region ICommunicationLayer
227
232 public bool DecoupledEvents => true;
233
237 public void Add(ISniffer Sniffer)
238 {
239 this.GetBroker().Result.Client?.Add(Sniffer); // TODO: Avoid blocking call
240 }
241
245 public void AddRange(IEnumerable<ISniffer> Sniffers)
246 {
247 this.GetBroker().Result.Client?.AddRange(Sniffers); // TODO: Avoid blocking call
248 }
249
253 public bool Remove(ISniffer Sniffer)
254 {
255 return this.GetBroker().Result.Client?.Remove(Sniffer) ?? false; // TODO: Avoid blocking call
256 }
257
262 {
263 get { return this.GetBroker().Result.Client?.Sniffers ?? new ISniffer[0]; } // TODO: Avoid blocking call
264 }
265
269 public bool HasSniffers
270 {
271 get { return this.GetBroker().Result.Client?.HasSniffers ?? false; } // TODO: Avoid blocking call
272 }
273
277 public IEnumerator<ISniffer> GetEnumerator()
278 {
279 return new SnifferEnumerator(this.Sniffers);
280 }
281
282 IEnumerator IEnumerable.GetEnumerator()
283 {
284 return this.GetBroker().Result.Client?.GetEnumerator() ?? new ISniffer[0].GetEnumerator(); // TODO: Avoid blocking call
285 }
286
291 public async Task ReceiveBinary(byte[] Data)
292 {
293 MqttBroker Broker = await this.GetBroker();
294 MqttClient Client = Broker.Client;
295 if (!(Client is null))
296 await Client.ReceiveBinary(Data);
297 }
298
303 public async Task TransmitBinary(byte[] Data)
304 {
305 MqttBroker Broker = await this.GetBroker();
306 MqttClient Client = Broker.Client;
307 if (!(Client is null))
308 await Client.TransmitBinary(Data);
309 }
310
315 public async Task ReceiveText(string Text)
316 {
317 MqttBroker Broker = await this.GetBroker();
318 MqttClient Client = Broker.Client;
319 if (!(Client is null))
320 await Client.ReceiveText(Text);
321 }
322
327 public async Task TransmitText(string Text)
328 {
329 MqttBroker Broker = await this.GetBroker();
330 MqttClient Client = Broker.Client;
331 if (!(Client is null))
332 await Client.TransmitText(Text);
333 }
334
339 public async Task Information(string Comment)
340 {
341 MqttBroker Broker = await this.GetBroker();
342 MqttClient Client = Broker.Client;
343 if (!(Client is null))
344 await Client.Information(Comment);
345 }
346
351 public async Task Warning(string Warning)
352 {
353 MqttBroker Broker = await this.GetBroker();
354 MqttClient Client = Broker.Client;
355 if (!(Client is null))
356 await Client.Warning(Warning);
357 }
358
363 public async Task Error(string Error)
364 {
365 MqttBroker Broker = await this.GetBroker();
366 MqttClient Client = Broker.Client;
367 if (!(Client is null))
368 await Client.Error(Error);
369 }
370
375 public async Task Exception(Exception Exception)
376 {
377 MqttBroker Broker = await this.GetBroker();
378 MqttClient Client = Broker.Client;
379 if (!(Client is null))
380 await Client.Exception(Exception);
381 }
382
387 public async Task Exception(string Exception)
388 {
389 MqttBroker Broker = await this.GetBroker();
390 MqttClient Client = Broker.Client;
391 if (!(Client is null))
392 await Client.Exception(Exception);
393 }
394
400 public async Task ReceiveBinary(DateTime Timestamp, byte[] Data)
401 {
402 MqttBroker Broker = await this.GetBroker();
403 MqttClient Client = Broker.Client;
404 if (!(Client is null))
405 await Client.ReceiveBinary(Timestamp, Data);
406 }
407
413 public async Task TransmitBinary(DateTime Timestamp, byte[] Data)
414 {
415 MqttBroker Broker = await this.GetBroker();
416 MqttClient Client = Broker.Client;
417 if (!(Client is null))
418 await Client.TransmitBinary(Timestamp, Data);
419 }
420
426 public async Task ReceiveText(DateTime Timestamp, string Text)
427 {
428 MqttBroker Broker = await this.GetBroker();
429 MqttClient Client = Broker.Client;
430 if (!(Client is null))
431 await Client.ReceiveText(Timestamp, Text);
432 }
433
439 public async Task TransmitText(DateTime Timestamp, string Text)
440 {
441 MqttBroker Broker = await this.GetBroker();
442 MqttClient Client = Broker.Client;
443 if (!(Client is null))
444 await Client.TransmitText(Timestamp, Text);
445 }
446
452 public async Task Information(DateTime Timestamp, string Comment)
453 {
454 MqttBroker Broker = await this.GetBroker();
455 MqttClient Client = Broker.Client;
456 if (!(Client is null))
457 await Client.Information(Timestamp, Comment);
458 }
459
465 public async Task Warning(DateTime Timestamp, string Warning)
466 {
467 MqttBroker Broker = await this.GetBroker();
468 MqttClient Client = Broker.Client;
469 if (!(Client is null))
470 await Client.Warning(Timestamp, Warning);
471 }
472
478 public async Task Error(DateTime Timestamp, string Error)
479 {
480 MqttBroker Broker = await this.GetBroker();
481 MqttClient Client = Broker.Client;
482 if (!(Client is null))
483 await Client.Error(Timestamp, Error);
484 }
485
491 public async Task Exception(DateTime Timestamp, string Exception)
492 {
493 MqttBroker Broker = await this.GetBroker();
494 MqttClient Client = Broker.Client;
495 if (!(Client is null))
496 await Client.Exception(Timestamp, Exception);
497 }
498
504 public async Task Exception(DateTime Timestamp, Exception Exception)
505 {
506 MqttBroker Broker = await this.GetBroker();
507 MqttClient Client = Broker.Client;
508 if (!(Client is null))
509 await Client.Exception(Timestamp, Exception);
510 }
511
512 #endregion
513
517 public override Task<IEnumerable<ICommand>> Commands => this.GetCommands();
518
522 public async Task<IEnumerable<ICommand>> GetCommands()
523 {
524 List<ICommand> Result = new List<ICommand>();
525
526 Result.AddRange(await base.Commands);
527 Result.Add(new ReconnectCommand((await this.GetBroker()).Client));
528
529 return Result;
530 }
531
538 public async override Task<IEnumerable<Parameter>> GetDisplayableParametersAsync(Language Language, RequestOrigin Caller)
539 {
540 LinkedList<Parameter> Result = await base.GetDisplayableParametersAsync(Language, Caller) as LinkedList<Parameter>;
541 MqttBroker Broker = await this.GetBroker();
542
543 Result.AddLast(new StringParameter("State", await Language.GetStringAsync(typeof(MqttBrokerNode), 30, "State"),
544 Broker.Client.State.ToString() ?? string.Empty));
545
546 return Result;
547 }
548
549 }
550}
Task TransmitBinary(byte[] Data)
Called when binary data has been transmitted.
Task Exception(Exception Exception)
Called to inform the viewer of an exception state.
Task Information(string Comment)
Called to inform the viewer of something.
Task TransmitText(string Text)
Called when text has been transmitted.
Task ReceiveText(string Text)
Called when text has been received.
Task ReceiveBinary(byte[] Data)
Called when binary data has been received.
Task Warning(string Warning)
Called to inform the viewer of a warning state.
Manages an MQTT connection. Implements MQTT v3.1.1, as defined in http://docs.oasis-open....
Definition: MqttClient.cs:26
MqttState State
Current state of connection.
Definition: MqttClient.cs:812
Contains information about a language.
Definition: Language.cs:17
Task< string > GetStringAsync(Type Type, int Id, string Default)
Gets the string value of a string ID. If no such string exists, a string is created with the default ...
Definition: Language.cs:209
string Host
Host name or IP address.
Definition: IpHost.cs:63
Node representing a port on an IP Host machine.
Definition: IpHostPort.cs:21
bool Tls
If connection is encrypted using TLS or not.
Definition: IpHostPort.cs:67
int Port
Port number.
Definition: IpHostPort.cs:55
MQTT Broker connection object.
Definition: MqttBroker.cs:17
Static class managing connections to MQTT brokers.
Definition: MqttBrokers.cs:13
static async Task< MqttBroker > GetBroker(MqttBrokerNode Node, string Key, string Host, int Port, bool Tls, bool TrustServer, string UserName, string Password, string ConnectionSubscription, string WillTopic, string WillData, bool WillRetain, MqttQualityOfService WillQoS)
TODO
Definition: MqttBrokers.cs:45
static string GetKey(string Host, int Port, bool Tls, bool TrustServer, string UserName, string Password, string ConnectionSubscription)
Gets sort key for MQTT broker
Definition: MqttBrokers.cs:27
static Task DestroyBroker(string Key)
TODO
Definition: MqttBrokers.cs:91
Node representing a connection to an MQTT broker.
override Task DestroyAsync()
TODO
string ConnectionSubscription
Startup subscription
ISniffer[] Sniffers
Registered sniffers.
async Task ReceiveBinary(DateTime Timestamp, byte[] Data)
Called when binary data has been received.
override async Task< bool > RemoveAsync(INode Child)
TODO
async Task Warning(string Warning)
Called to inform the viewer of a warning state.
async Task ReceiveText(string Text)
Called when text has been received.
async Task Exception(string Exception)
Called to inform the viewer of an exception state.
async override Task< IEnumerable< Parameter > > GetDisplayableParametersAsync(Language Language, RequestOrigin Caller)
Gets displayable parameters.
async Task Error(string Error)
Called to inform the viewer of an error state.
void Add(ISniffer Sniffer)
ICommunicationLayer.Add
async Task Exception(DateTime Timestamp, string Exception)
Called to inform the viewer of an exception state.
void AddRange(IEnumerable< ISniffer > Sniffers)
ICommunicationLayer.AddRange
override Task< bool > AcceptsChildAsync(INode Child)
TODO
async Task ReceiveText(DateTime Timestamp, string Text)
Called when text has been received.
Task< MqttBroker > GetBroker()
Gets the corresponding broker node.
IEnumerator< ISniffer > GetEnumerator()
IEnumerable<T>.GetEnumerator()
async Task TransmitBinary(DateTime Timestamp, byte[] Data)
Called when binary data has been transmitted.
async Task Information(DateTime Timestamp, string Comment)
Called to inform the viewer of something.
bool HasSniffers
If there are sniffers registered on the object.
async Task< IEnumerable< ICommand > > GetCommands()
TODO
async Task TransmitText(DateTime Timestamp, string Text)
Called when text has been transmitted.
bool TrustServer
If connection is encrypted using TLS or not.
async Task TransmitBinary(byte[] Data)
Called when binary data has been transmitted.
async Task Warning(DateTime Timestamp, string Warning)
Called to inform the viewer of a warning state.
bool Remove(ISniffer Sniffer)
ICommunicationLayer.Remove
async Task Exception(Exception Exception)
Called to inform the viewer of an exception state.
override Task< string > GetTypeNameAsync(Language Language)
Type name representing data.
async Task ReceiveBinary(byte[] Data)
Called when binary data has been received.
override Task< IEnumerable< ICommand > > Commands
TODO
MqttQualityOfService WillQoS
TODO
bool DecoupledEvents
If events raised from the communication layer are decoupled, i.e. executed in parallel with the sourc...
async Task TransmitText(string Text)
Called when text has been transmitted.
async Task Error(DateTime Timestamp, string Error)
Called to inform the viewer of an error state.
async Task Information(string Comment)
Called to inform the viewer of something.
MqttBrokerNode()
Node representing a connection to an MQTT broker.
async Task Exception(DateTime Timestamp, Exception Exception)
Called to inform the viewer of an exception state.
A Metering node representing an MQTT topic
Tokens available in request.
Definition: RequestOrigin.cs:9
Interface for observable classes implementing communication protocols.
Interface for sniffers. Sniffers can be added to ICommunicationLayer classes to eavesdrop on communic...
Definition: ISniffer.cs:11
Interface for nodes that are published through the concentrator interface.
Definition: INode.cs:49
MqttQualityOfService
MQTT Quality of Service level.