Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
MqActorTcp.cs
1using System;
2using System.Collections.Generic;
3using System.Threading;
4using System.Threading.Tasks;
5using System.Xml;
11using Waher.Script;
13
15{
19 public class MqActorTcp : Actor
20 {
24 public const string MqNamespace = "http://lab.tagroot.io/Schema/ComSim/MQ.xsd";
25
29 public const string MqSchema = "TAG.Simulator.MQ.Schema.ComSimMq.xsd";
30
31 private AccountCredentials credentials;
32 private MqClient client;
33 private ISniffer sniffer;
34 private Task connectionTask;
35 private string queueManager;
36 private string channel;
37 private string userName;
38 private string password;
39 private string cipher;
40 private string cipherSuite;
41 private string certificateStore;
42 private string host;
43 private int port;
44
51 : base(Parent, Model)
52 {
53 }
54
64 {
65 }
66
70 public override string LocalName => nameof(MqActorTcp);
71
75 public override string Namespace => MqNamespace;
76
80 public override string SchemaResource => MqSchema;
81
85 public MqClient Client => this.client;
86
94 {
95 return new MqActorTcp(Parent, Model);
96 }
97
102 public override Task FromXml(XmlElement Definition)
103 {
104 this.host = XML.Attribute(Definition, "host");
105 this.port = XML.Attribute(Definition, "port", 1414);
106 this.queueManager = XML.Attribute(Definition, "queueManager");
107 this.channel = XML.Attribute(Definition, "channel");
108 this.userName = XML.Attribute(Definition, "userName");
109 this.password = XML.Attribute(Definition, "password");
110 this.cipher = XML.Attribute(Definition, "cipher");
111 this.cipherSuite = XML.Attribute(Definition, "cipherSuite");
112 this.certificateStore = XML.Attribute(Definition, "certificateStore");
113
114 return base.FromXml(Definition);
115 }
116
125 public override Task<Actor> CreateInstanceAsync(int InstanceIndex, string InstanceId)
126 {
127 MqActorTcp Result = new MqActorTcp(this, this.Model, InstanceIndex, InstanceId)
128 {
129 host = this.host,
130 port = this.port,
131 queueManager = this.queueManager,
132 channel = this.channel,
133 userName = this.userName + InstanceIndex.ToString(),
134 password = this.password,
135 cipher = this.cipher,
136 cipherSuite = this.cipherSuite,
137 certificateStore = this.certificateStore
138 };
139
140 return Task.FromResult<Actor>(Result);
141 }
142
146 public override async Task InitializeInstance()
147 {
148 this.credentials = await Database.FindFirstIgnoreRest<AccountCredentials>(new FilterAnd(
149 new FilterFieldEqualTo("Host", this.host),
150 new FilterFieldEqualTo("UserName", this.userName)));
151
152 if (this.credentials is null)
153 {
154 this.credentials = new AccountCredentials()
155 {
156 Host = this.host,
157 UserName = this.userName,
158 Password = string.IsNullOrEmpty(this.password) ? string.Empty : await this.Model.GetKey(this.password, this.userName)
159 };
160 }
161
162 this.sniffer = this.Model.GetSniffer(this.InstanceId);
163
164 if (this.sniffer is null)
165 this.client = new MqClient(this.queueManager, this.channel, this.cipher, this.cipherSuite, this.certificateStore, this.host, this.port);
166 else
167 this.client = new MqClient(this.queueManager, this.channel, this.cipher, this.cipherSuite, this.certificateStore, this.host, this.port, this.sniffer);
168
169 this.connectionTask = this.client.ConnectAsync(this.credentials.UserName, this.credentials.Password);
170 }
171
175 public override async Task StartInstance()
176 {
177 await this.connectionTask;
178
179 if (string.IsNullOrEmpty(this.credentials.ObjectId))
180 await Database.Insert(this.credentials);
181
183 ObjectProperties Properties = new ObjectProperties(this, Variables);
184 List<SubscriptionState> Subscriptions = new List<SubscriptionState>();
185
187 {
188 foreach (ISimulationNode Node in Parent.Children)
189 {
190 if (Node is Subscribe Subscribe)
191 {
192 SubscriptionState Subscription = new SubscriptionState()
193 {
194 ExtEventName = await Expression.TransformAsync(Subscribe.ExtEvent, "{", "}", Properties),
195 Queue = await Expression.TransformAsync(Subscribe.Queue, "{", "}", Properties),
196 Actor = this,
197 Model = this.Model,
198 Cancel = new ManualResetEvent(false),
199 Stopped = new TaskCompletionSource<bool>()
200 };
201
202 Subscription.Subscribe(this.Client);
203 Subscriptions.Add(Subscription);
204 }
205 }
206 }
207
208 this.subscriptions = Subscriptions.ToArray();
209 }
210
211 private SubscriptionState[] subscriptions;
212
213 private class SubscriptionState : IDisposable
214 {
215 public string ExtEventName;
216 public string Queue;
217 public MqActorTcp Actor;
218 public Model Model;
219 public ManualResetEvent Cancel;
220 public TaskCompletionSource<bool> Stopped;
221
222 public void Dispose()
223 {
224 this.Cancel.Set();
225 }
226
227 public void Subscribe(MqClient Client)
228 {
229 Client.SubscribeIncoming(this.Queue, this.Cancel, this.Stopped, this.MessageReceived, null);
230 }
231
232 private Task MessageReceived(object Sender, MqMessageEventArgs e)
233 {
234 this.Model.ExternalEvent(this.Actor, this.ExtEventName,
235 new KeyValuePair<string, object>("Message", e.Message),
236 new KeyValuePair<string, object>("Client", this.Actor.client));
237
238 return Task.CompletedTask;
239 }
240 }
241
245 public override async Task FinalizeInstance()
246 {
247 foreach (SubscriptionState State in this.subscriptions)
248 State.Dispose();
249
250 foreach (SubscriptionState State in this.subscriptions)
251 await State.Stopped.Task;
252
253 this.client?.Dispose();
254 this.client = null;
255
256 if (!(this.sniffer is null))
257 {
258 if (this.sniffer is IDisposable Disposable)
259 Disposable.Dispose();
260
261 this.sniffer = null;
262 }
263 }
264
268 public override object ActivityObject
269 {
270 get
271 {
272 return new MqActivityObject()
273 {
274 Client = this.client,
275 UserName = this.userName,
276 InstanceId = this.InstanceId,
277 InstanceIndex = this.InstanceIndex
278 };
279 }
280 }
281
282 }
283}
Object used in simulation activities.
MQ Actor connecting to the MQ network using traditional TCP.
Definition: MqActorTcp.cs:20
override string Namespace
XML Namespace where the element is defined.
Definition: MqActorTcp.cs:75
override Task< Actor > CreateInstanceAsync(int InstanceIndex, string InstanceId)
Creates an instance of the actor.
Definition: MqActorTcp.cs:125
MqActorTcp(ISimulationNode Parent, Model Model, int InstanceIndex, string InstanceId)
Abstract base class for MQ actors.
Definition: MqActorTcp.cs:62
override string SchemaResource
Points to the embedded XML Schema resource defining the semantics of the XML namespace.
Definition: MqActorTcp.cs:80
override async Task FinalizeInstance()
Finalizes an instance of an actor.
Definition: MqActorTcp.cs:245
const string MqNamespace
http://lab.tagroot.io/Schema/ComSim/MQ.xsd
Definition: MqActorTcp.cs:24
override string LocalName
Local name of XML element defining contents of class.
Definition: MqActorTcp.cs:70
override async Task StartInstance()
Starts an instance of an actor.
Definition: MqActorTcp.cs:175
override ISimulationNode Create(ISimulationNode Parent, Model Model)
Creates a new instance of the node.
Definition: MqActorTcp.cs:93
override async Task InitializeInstance()
Initializes an instance of an actor.
Definition: MqActorTcp.cs:146
override object ActivityObject
Returns the object that will be used by the actor for actions during an activity.
Definition: MqActorTcp.cs:269
override Task FromXml(XmlElement Definition)
Sets properties and attributes of class in accordance with XML definition.
Definition: MqActorTcp.cs:102
const string MqSchema
TAG.Simulator.MQ.Schema.ComSimMq.xsd
Definition: MqActorTcp.cs:29
MqActorTcp(ISimulationNode Parent, Model Model)
MQ Actor connecting to the MQ network using traditional TCP.
Definition: MqActorTcp.cs:50
Represents a queue subscription
Definition: Subscribe.cs:12
string ExtEvent
External Event Name
Definition: Subscribe.cs:49
void Dispose()
IDisposable.Dispose
Definition: MqClient.cs:76
Task ConnectAsync(string UserName, string Password)
Connects to the Queue Manager asynchronously.
Definition: MqClient.cs:111
void SubscribeIncoming(string QueueName, MqMessageEventHandler Callback, object State)
Subscribes to incoming messages.
Definition: MqClient.cs:299
Root node of a simulation model
Definition: Model.cs:49
async Task< string > GetKey(string KeyName, string LookupValue)
Gets a key from the database. If it does not exist, it prompts the user for input.
Definition: Model.cs:817
ISniffer GetSniffer(string Actor)
Gets a sniffer, if sniffer output is desired.
Definition: Model.cs:863
bool ExternalEvent(IExternalEventsNode Source, string Name, params KeyValuePair< string, object >[] Arguments)
Method called when an external event has been received.
Definition: Model.cs:889
Abstract base class for actors
Definition: Actor.cs:15
string InstanceId
ID of actor instance.
Definition: Actor.cs:57
int InstanceIndex
Actor instance index.
Definition: Actor.cs:67
Variables Variables
Collection of actor-variables.
Definition: Actor.cs:72
Model Model
Model in which the node is defined.
Helps with common XML-related tasks.
Definition: XML.cs:19
static string Attribute(XmlElement E, string Name)
Gets the value of an XML attribute.
Definition: XML.cs:914
Static interface for database persistence. In order to work, a database provider has to be assigned t...
Definition: Database.cs:19
static async Task Insert(object Object)
Inserts an object into the default collection of the database.
Definition: Database.cs:95
This filter selects objects that conform to all child-filters provided.
Definition: FilterAnd.cs:10
This filter selects objects that have a named field equal to a given value.
Class managing a script expression.
Definition: Expression.cs:39
static Task< string > TransformAsync(string s, string StartDelimiter, string StopDelimiter, Variables Variables)
Transforms a string by executing embedded script.
Definition: Expression.cs:4441
Collection of variables.
Definition: Variables.cs:25
Basic interface for simulator nodes with child nodes.
Basic interface for simulator nodes. Implementing this interface allows classes with default contruct...
ISimulationNode Parent
Parent node in the simulation model.
Interface for sniffers. Sniffers can be added to ICommunicationLayer classes to eavesdrop on communic...
Definition: ISniffer.cs:11