Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
MqttActorTcp.cs
1using System;
2using System.Collections.Generic;
3using System.Threading.Tasks;
4using System.Xml;
11using Waher.Script;
13
15{
19 public class MqttActorTcp : Actor
20 {
24 public const string MqttNamespace = "http://lab.tagroot.io/Schema/ComSim/MQTT.xsd";
25
29 public const string MqttSchema = "TAG.Simulator.MQTT.Schema.ComSimMqtt.xsd";
30
31 private KeyValuePair<string, MqttQualityOfService>[] subscriptions;
32 private AccountCredentials credentials;
33 private MqttClient client;
34 private ISniffer sniffer;
35 private string domain;
36 private string userName;
37 private string password;
38 private int port;
39 private bool encrypted;
40 private bool trustServer;
41 private bool isOnline = false;
42 private TaskCompletionSource<bool> connected;
43
50 : base(Parent, Model)
51 {
52 }
53
63 {
64 }
65
69 public override string LocalName => nameof(MqttActorTcp);
70
74 public override string Namespace => MqttNamespace;
75
79 public override string SchemaResource => MqttSchema;
80
84 public bool IsOnline => this.isOnline;
85
89 public bool TrustServer => this.trustServer;
90
94 public string Domain => this.domain;
95
99 public MqttClient Client => this.client;
100
108 {
109 return new MqttActorTcp(Parent, Model);
110 }
111
116 public override Task FromXml(XmlElement Definition)
117 {
118 this.domain = XML.Attribute(Definition, "domain");
119 this.encrypted = XML.Attribute(Definition, "encrypted", false);
120 this.port = XML.Attribute(Definition, "port", this.encrypted ? 8883 : 1883);
121 this.userName = XML.Attribute(Definition, "userName");
122 this.password = XML.Attribute(Definition, "password");
123 this.trustServer = XML.Attribute(Definition, "trustServer", false);
124
125 return base.FromXml(Definition);
126 }
127
136 public override async Task<Actor> CreateInstanceAsync(int InstanceIndex, string InstanceId)
137 {
138 MqttActorTcp Result = new MqttActorTcp(this, this.Model, InstanceIndex, InstanceId)
139 {
140 domain = this.domain,
141 encrypted = this.encrypted,
142 port = this.port,
143 userName = this.userName + InstanceIndex.ToString(),
144 password = this.password,
145 trustServer = this.trustServer
146 };
147
149 ObjectProperties Properties = new ObjectProperties(Result, Variables);
150 List<KeyValuePair<string, MqttQualityOfService>> Topics = new List<KeyValuePair<string, MqttQualityOfService>>();
151
152 foreach (ISimulationNode Node in this.Children)
153 {
154 if (Node is Subscribe Subscribe)
155 {
156 string Topic = await Expression.TransformAsync(Subscribe.Topic, "{", "}", Properties);
157 Topics.Add(new KeyValuePair<string, MqttQualityOfService>(Topic, Subscribe.QoS));
158 }
159 }
160
161 Result.subscriptions = Topics.ToArray();
162
163 return Result;
164 }
165
169 public override async Task InitializeInstance()
170 {
171 this.credentials = await Database.FindFirstIgnoreRest<AccountCredentials>(new FilterAnd(
172 new FilterFieldEqualTo("Domain", this.domain),
173 new FilterFieldEqualTo("UserName", this.userName)));
174
175 if (this.credentials is null)
176 {
177 this.credentials = new AccountCredentials()
178 {
179 Domain = this.domain,
180 UserName = this.userName,
181 Password = string.IsNullOrEmpty(this.password) ? string.Empty : await this.Model.GetKey(this.password, this.userName)
182 };
183 }
184
185 this.sniffer = this.Model.GetSniffer(this.userName);
186
187 if (this.sniffer is null)
188 this.client = new MqttClient(this.domain, this.port, this.encrypted, this.userName, this.credentials.Password);
189 else
190 this.client = new MqttClient(this.domain, this.port, this.encrypted, this.userName, this.credentials.Password, this.sniffer);
191
192 this.client.TrustServer = this.trustServer;
193
194 this.client.OnStateChanged += this.Client_OnStateChanged;
195 this.client.OnConnectionError += this.Client_OnConnectionError;
196 this.client.OnError += this.Client_OnError;
197 this.client.OnContentReceived += this.Client_OnContentReceived;
198 this.client.OnPing += this.Client_OnPing;
199 this.client.OnPingResponse += this.Client_OnPingResponse;
200 this.client.OnPublished += this.Client_OnPublished;
201 this.client.OnSubscribed += this.Client_OnSubscribed;
202 this.client.OnUnsubscribed += this.Client_OnUnsubscribed;
203
204 this.connected = new TaskCompletionSource<bool>();
205 }
206
207 private Task Client_OnUnsubscribed(object Sender, ushort PacketIdentifier)
208 {
209 this.Model.ExternalEvent(this, "OnUnsubscribed",
210 new KeyValuePair<string, object>("PacketIdentifier", PacketIdentifier),
211 new KeyValuePair<string, object>("Client", this.client));
212
213 return Task.CompletedTask;
214 }
215
216 private Task Client_OnSubscribed(object Sender, ushort PacketIdentifier)
217 {
218 this.Model.ExternalEvent(this, "OnSubscribed",
219 new KeyValuePair<string, object>("PacketIdentifier", PacketIdentifier),
220 new KeyValuePair<string, object>("Client", this.client));
221
222 return Task.CompletedTask;
223 }
224
225 private Task Client_OnPublished(object Sender, ushort PacketIdentifier)
226 {
227 this.Model.ExternalEvent(this, "OnPublished",
228 new KeyValuePair<string, object>("PacketIdentifier", PacketIdentifier),
229 new KeyValuePair<string, object>("Client", this.client));
230
231 return Task.CompletedTask;
232 }
233
234 private Task Client_OnPingResponse(object Sender, EventArgs e)
235 {
236 this.Model.ExternalEvent(this, "OnPingResponse",
237 new KeyValuePair<string, object>("e", e),
238 new KeyValuePair<string, object>("Client", this.client));
239
240 return Task.CompletedTask;
241 }
242
243 private Task Client_OnPing(object Sender, EventArgs e)
244 {
245 this.Model.ExternalEvent(this, "OnPing",
246 new KeyValuePair<string, object>("e", e),
247 new KeyValuePair<string, object>("Client", this.client));
248
249 return Task.CompletedTask;
250 }
251
252 private Task Client_OnContentReceived(object Sender, MqttContent Content)
253 {
254 this.Model.ExternalEvent(this, "OnContentReceived",
255 new KeyValuePair<string, object>("Content", Content),
256 new KeyValuePair<string, object>("Client", this.client));
257
258 return Task.CompletedTask;
259 }
260
261 private Task Client_OnError(object Sender, Exception Exception)
262 {
263 this.Model.ExternalEvent(this, "Error",
264 new KeyValuePair<string, object>("Exception", Exception),
265 new KeyValuePair<string, object>("Client", this.client));
266
267 return Task.CompletedTask;
268 }
269
270 private Task Client_OnConnectionError(object Sender, Exception Exception)
271 {
272 this.Model.ExternalEvent(this, "ConnectionError",
273 new KeyValuePair<string, object>("Exception", Exception),
274 new KeyValuePair<string, object>("Client", this.client));
275
276 return Task.CompletedTask;
277 }
278
279 private Task Client_OnStateChanged(object Sender, MqttState NewState)
280 {
281 switch (NewState)
282 {
283 case MqttState.Connected:
284 this.isOnline = true;
285
286 if (string.IsNullOrEmpty(this.credentials.ObjectId))
287 Database.Insert(this.credentials);
288
289 this.client.SUBSCRIBE(this.subscriptions);
290
291 this.connected?.TrySetResult(true);
292 break;
293
294 case MqttState.Error:
295 case MqttState.Offline:
296 this.isOnline = false;
297 this.connected?.TrySetResult(false);
298 break;
299 }
300
301 this.Model.ExternalEvent(this, "OnStateChanged",
302 new KeyValuePair<string, object>("NewState", NewState),
303 new KeyValuePair<string, object>("Client", this.client));
304
305 return Task.CompletedTask;
306 }
307
311 public override async Task StartInstance()
312 {
313 if (!(await this.connected.Task))
314 throw new Exception("Unable to connect " + this.userName + "@" + this.domain);
315
316 this.connected = null;
317 }
318
322 public override Task FinalizeInstance()
323 {
324 this.client?.Dispose();
325 this.client = null;
326
327 if (!(this.sniffer is null))
328 {
329 if (this.sniffer is IDisposable Disposable)
330 Disposable.Dispose();
331
332 this.sniffer = null;
333 }
334
335 return Task.CompletedTask;
336 }
337
341 public override object ActivityObject
342 {
343 get
344 {
345 return new MqttActivityObject()
346 {
347 Client = this.client,
348 UserName = this.userName,
349 InstanceId = this.InstanceId,
350 InstanceIndex = this.InstanceIndex
351 };
352 }
353 }
354
355 }
356}
Object used in simulation activities.
MQTT Actor connecting to the MQTT network using traditional TCP.
Definition: MqttActorTcp.cs:20
override async Task InitializeInstance()
Initializes an instance of an actor.
override Task FinalizeInstance()
Finalizes an instance of an actor.
override ISimulationNode Create(ISimulationNode Parent, Model Model)
Creates a new instance of the node.
MqttActorTcp(ISimulationNode Parent, Model Model, int InstanceIndex, string InstanceId)
Abstract base class for MQTT actors.
Definition: MqttActorTcp.cs:61
override string Namespace
XML Namespace where the element is defined.
Definition: MqttActorTcp.cs:74
override string SchemaResource
Points to the embedded XML Schema resource defining the semantics of the XML namespace.
Definition: MqttActorTcp.cs:79
override async Task< Actor > CreateInstanceAsync(int InstanceIndex, string InstanceId)
Creates an instance of the actor.
bool IsOnline
If instance is online.
Definition: MqttActorTcp.cs:84
const string MqttSchema
TAG.Simulator.MQTT.Schema.ComSimMqtt.xsd
Definition: MqttActorTcp.cs:29
MqttActorTcp(ISimulationNode Parent, Model Model)
MQTT Actor connecting to the MQTT network using traditional TCP.
Definition: MqttActorTcp.cs:49
const string MqttNamespace
http://lab.tagroot.io/Schema/ComSim/MQTT.xsd
Definition: MqttActorTcp.cs:24
bool TrustServer
If server is to be trusted, regardless of state of certificate.
Definition: MqttActorTcp.cs:89
override string LocalName
Local name of XML element defining contents of class.
Definition: MqttActorTcp.cs:69
override async Task StartInstance()
Starts an instance of an actor.
override Task FromXml(XmlElement Definition)
Sets properties and attributes of class in accordance with XML definition.
override object ActivityObject
Returns the object that will be used by the actor for actions during an activity.
Represents a topic subscription
Definition: Subscribe.cs:13
MqttQualityOfService QoS
Quality of Service
Definition: Subscribe.cs:50
Root node of a simulation model
Definition: Model.cs:49
async Task< string > GetKey(string KeyName, string LookupValue)
Gets a key from the database. If it does not exist, it prompts the user for input.
Definition: Model.cs:817
ISniffer GetSniffer(string Actor)
Gets a sniffer, if sniffer output is desired.
Definition: Model.cs:863
bool ExternalEvent(IExternalEventsNode Source, string Name, params KeyValuePair< string, object >[] Arguments)
Method called when an external event has been received.
Definition: Model.cs:889
Abstract base class for actors
Definition: Actor.cs:15
string InstanceId
ID of actor instance.
Definition: Actor.cs:57
int InstanceIndex
Actor instance index.
Definition: Actor.cs:67
Variables Variables
Collection of actor-variables.
Definition: Actor.cs:72
Helps with common XML-related tasks.
Definition: XML.cs:19
static string Attribute(XmlElement E, string Name)
Gets the value of an XML attribute.
Definition: XML.cs:914
Manages an MQTT connection. Implements MQTT v3.1.1, as defined in http://docs.oasis-open....
Definition: MqttClient.cs:26
async void Dispose()
Closes the connection and disposes of all resources.
Definition: MqttClient.cs:1159
Task< ushort > SUBSCRIBE(string Topic, MqttQualityOfService QoS)
Subscribes to information from a topic. Topics can include wildcards.
Definition: MqttClient.cs:988
Information about content received from the MQTT server.
Definition: MqttContent.cs:9
Static interface for database persistence. In order to work, a database provider has to be assigned t...
Definition: Database.cs:19
static async Task Insert(object Object)
Inserts an object into the default collection of the database.
Definition: Database.cs:95
This filter selects objects that conform to all child-filters provided.
Definition: FilterAnd.cs:10
This filter selects objects that have a named field equal to a given value.
Class managing a script expression.
Definition: Expression.cs:39
static Task< string > TransformAsync(string s, string StartDelimiter, string StopDelimiter, Variables Variables)
Transforms a string by executing embedded script.
Definition: Expression.cs:4441
Collection of variables.
Definition: Variables.cs:25
ISimulationNode[] Children
Child nodes.
Basic interface for simulator nodes. Implementing this interface allows classes with default contruct...
ISimulationNode Parent
Parent node in the simulation model.
Interface for sniffers. Sniffers can be added to ICommunicationLayer classes to eavesdrop on communic...
Definition: ISniffer.cs:11
MqttState
State of MQTT connection.
Definition: MqttState.cs:11