2using System.Collections.Generic;
4using System.Threading.Tasks;
18 private static Scheduler scheduler =
null;
19 private readonly SortedDictionary<string, MqttTopic> topics =
new SortedDictionary<string, MqttTopic>();
22 private bool connectionOk =
false;
23 private readonly
string host;
24 private readonly
int port;
25 private readonly
bool tls;
26 private readonly
bool trustServer;
27 private readonly
string userName;
28 private readonly
string password;
29 private readonly
string connectionSubscription;
30 private string willTopic;
31 private string willData;
32 private bool willRetain;
34 private DateTime nextCheck;
40 string ConnectionSubscription,
string WillTopic,
string WillData,
bool WillRetain,
MqttQualityOfService WillQoS)
46 this.trustServer = TrustServer;
47 this.userName = UserName;
48 this.password = Password;
49 this.connectionSubscription = ConnectionSubscription;
50 this.willTopic = WillTopic;
51 this.willData = WillData;
52 this.willRetain = WillRetain;
53 this.willQoS = WillQoS;
67 this.mqttClient =
new MqttClient(this.host, this.port, this.tls, this.userName, this.password, this.willTopic,
68 this.willQoS, this.willRetain, Encoding.UTF8.GetBytes(
this.willData))
70 TrustServer = this.trustServer
73 this.mqttClient.OnConnectionError += this.MqttClient_OnConnectionError;
74 this.mqttClient.OnContentReceived += this.MqttClient_OnContentReceived;
75 this.mqttClient.OnStateChanged += this.MqttClient_OnStateChanged;
77 this.nextCheck =
Scheduler.
Add(DateTime.Now.AddMinutes(1),
this.CheckOnline,
null);
80 private async Task Close()
82 if (!(this.mqttClient is
null))
86 this.mqttClient.OnConnectionError -= this.MqttClient_OnConnectionError;
87 this.mqttClient.OnContentReceived -= this.MqttClient_OnContentReceived;
88 this.mqttClient.OnStateChanged -= this.MqttClient_OnStateChanged;
91 this.mqttClient =
null;
98 [Obsolete(
"Use the DisposeAsync() method.")]
119 private async
void CheckOnline(
object _)
136 this.nextCheck =
Scheduler.
Add(DateTime.Now.AddMinutes(1),
this.CheckOnline,
null);
149 return this.Client.
PUBLISH(Topic, QoS, Retain, Data);
161 return this.
Publish(Topic, QoS, Retain, Encoding.UTF8.GetBytes(Data));
170 if (!(Topic is
null))
179 if (this.willTopic != WillTopic || this.willData != WillData || this.willRetain != WillRetain || this.willQoS != WillQoS)
183 this.willTopic = WillTopic;
184 this.willData = WillData;
185 this.willRetain = WillRetain;
186 this.willQoS = WillQoS;
192 private async Task MqttClient_OnStateChanged(
object Sender,
MqttState NewState)
199 this.connectionOk =
true;
200 await this.node.RemoveErrorAsync(
"Offline");
201 await this.node.RemoveErrorAsync(
"Error");
203 if (!
string.IsNullOrEmpty(this.connectionSubscription))
205 string[] Parts = this.connectionSubscription.Split(
',');
206 int i, c = Parts.Length;
208 for (i = 0; i < c; i++)
209 Parts[i] = Parts[i].Trim();
216 await this.node.LogErrorAsync(
"Error",
"Connection to broker failed.");
219 await this.Reconnect();
223 await this.node.LogErrorAsync(
"Offline",
"Connection is closed.");
226 await this.Reconnect();
236 private Task MqttClient_OnContentReceived(
object Sender,
MqttContent Content)
242 this.queue.AddLast(Content);
243 return Task.CompletedTask;
246 this.processing =
true;
249 this.Process(Content);
251 return Task.CompletedTask;
254 private readonly LinkedList<MqttContent> queue =
new LinkedList<MqttContent>();
255 private bool processing =
false;
263 MqttTopic Topic = await this.
GetTopic(Content.
Topic,
true,
true);
264 if (!(Topic is
null))
265 await Topic.DataReported(Content);
269 if (this.queue.First is
null)
271 this.processing =
false;
276 Content = this.queue.First.Value;
277 this.queue.RemoveFirst();
285 this.processing =
false;
289 private async Task MqttClient_OnConnectionError(
object Sender, Exception Exception)
292 await this.Reconnect();
295 private async Task Reconnect()
297 if (this.connectionOk)
299 this.connectionOk =
false;
309 public async Task<MqttTopic>
GetTopic(
string TopicString,
bool CreateNew,
bool IgnoreGuids)
311 if (
string.IsNullOrEmpty(TopicString))
315 MqttTopic Topic = await this.GetLocalTopic(Representation, CreateNew);
319 else if (Representation.
MoveNext(Topic))
320 return await Topic.GetTopic(Representation, CreateNew, IgnoreGuids,
this);
332 if (this.topics.TryGetValue(CurrentSegment, out Topic))
336 if (Guid.TryParse(CurrentSegment.Replace(
'_',
'-'), out Guid _))
339 if (this.node.HasChildren)
347 if (this.topics.TryGetValue(CurrentSegment, out Topic2))
351 Topic =
new MqttTopic(TopicNode, CurrentSegment, CurrentSegment,
null,
this);
352 this.topics[CurrentSegment] = Topic;
363 IMqttTopicNode AddNode =
Types.FindBest<IMqttTopicNode, MqttTopicRepresentation>(Representation);
367 AddNode = await AddNode.CreateNew(Representation);
368 Topic =
new MqttTopic(AddNode, AddNode.LocalTopic, AddNode.LocalTopic,
null,
this);
372 if (this.topics.TryGetValue(CurrentSegment, out Topic2))
375 this.topics[CurrentSegment] = Topic;
378 await this.node.AddAsync(AddNode);
390 if (!(LocalTopic is
null))
394 return this.topics.Remove(LocalTopic);
408 if (scheduler is
null)
416 Log.Terminating += (Sender, e) =>
Static class managing the application event log. Applications and services log events on this static ...
static void Exception(Exception Exception, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, params KeyValuePair< string, object >[] Tags)
Logs an exception. Event type will be determined by the severity of the exception.
Manages an MQTT connection. Implements MQTT v3.1.1, as defined in http://docs.oasis-open....
MqttState State
Current state of connection.
async Task Reconnect()
Reconnects a client after an error or if it's offline. Reconnecting, instead of creating a completely...
Task< ushort > PUBLISH(string Topic, MqttQualityOfService QoS, bool Retain, byte[] Data)
Publishes information on a topic.
Task< ushort > SUBSCRIBE(string Topic, MqttQualityOfService QoS)
Subscribes to information from a topic. Topics can include wildcards.
async Task DisposeAsync()
Closes the connection and disposes of all resources.
Information about content received from the MQTT server.
Module that controls the life cycle of communication.
static bool Stopping
If the system is stopping.
Static class that dynamically manages types and interfaces available in the runtime environment.
static bool TryGetModuleParameter(string Name, out object Value)
Tries to get a module parameter value.
Class that can be used to schedule events in time. It uses a timer to execute tasks at the appointed ...
bool Remove(DateTime When)
Removes an event scheduled for a given point in time.
void Dispose()
IDisposable.Dispose
DateTime Add(DateTime When, ScheduledEventCallback Callback, object State)
Adds an event.
MQTT Broker connection object.
Task Publish(string Topic, MqttQualityOfService QoS, bool Retain, string Data)
Publishes text data to a topic.
async Task DataReceived(MqttContent Content)
TODO
static Scheduler Scheduler
Scheduler for asynchronous tasks.
bool Remove(string LocalTopic)
Removes a child topic
Task DisposeAsync()
Closes the connection and disposes of all resources.
MqttBroker(MqttBrokerNode Node, string Host, int Port, bool Tls, bool TrustServer, string UserName, string Password, string ConnectionSubscription, string WillTopic, string WillData, bool WillRetain, MqttQualityOfService WillQoS)
MQTT Broker connection object.
async Task SetWill(string WillTopic, string WillData, bool WillRetain, MqttQualityOfService WillQoS)
TODO
Task Publish(string Topic, MqttQualityOfService QoS, bool Retain, byte[] Data)
Publishes binary data to a topic.
MqttBrokerNode Node
Reference to broker node.
async Task< MqttTopic > GetTopic(string TopicString, bool CreateNew, bool IgnoreGuids)
Gets the Node responsible for managing a Topic
async Task DataReported(MqttContent Content)
Called when new data has been published.
Node representing a connection to an MQTT broker.
Contains information about an MQTT topic
bool MoveNext(MqttTopic NewParent)
Moves to the next segment.
string CurrentSegment
Current segment being processed.
Interface for nodes that are published through the concentrator interface.
Task< IEnumerable< INode > > ChildNodes
Child nodes. If no child nodes are available, null is returned.
Interface for MQTT Topic nodes.
string LocalTopic
Local Topic segment
MqttQualityOfService
MQTT Quality of Service level.
MqttState
State of MQTT connection.