Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
MqClient.cs
1using IBM.WMQ;
2using System;
3using System.Collections;
4using System.Collections.Generic;
5using System.Runtime.ExceptionServices;
6using System.Threading;
7using System.Threading.Tasks;
9using Waher.Events;
12
13namespace TAG.Simulator.MQ
14{
18 public class MqClient : CommunicationLayer, IDisposable
19 {
23 public const int DefaultPort = 1414;
24
25 private readonly Dictionary<string, MQQueue> inputQueues = new Dictionary<string, MQQueue>();
26 private readonly Dictionary<string, MQQueue> outputQueues = new Dictionary<string, MQQueue>();
27 private readonly string queueManager;
28 private readonly string channel;
29 private readonly string cipher;
30 private readonly string cipherSuite;
31 private readonly string certificateStore;
32 private readonly string host;
33 private readonly int port;
34 private MQQueueManager manager;
35
44 public MqClient(string QueueManager, string Channel, string Host, int Port, params ISniffer[] Sniffers)
45 : this(QueueManager, Channel, string.Empty, string.Empty, string.Empty, Host, Port, Sniffers)
46 {
47 }
48
60 public MqClient(string QueueManager, string Channel, string Cipher, string CipherSuite, string CertificateStore,
61 string Host, int Port, params ISniffer[] Sniffers)
62 : base(false, Sniffers)
63 {
64 this.queueManager = QueueManager;
65 this.channel = Channel;
66 this.cipher = Cipher;
67 this.cipherSuite = CipherSuite;
68 this.certificateStore = CertificateStore;
69 this.host = Host;
70 this.port = Port;
71 }
72
76 public void Dispose()
77 {
78 try
79 {
80 lock (this.inputQueues)
81 {
82 foreach (MQQueue Queue in this.inputQueues.Values)
83 Queue.Close();
84 }
85
86 lock (this.outputQueues)
87 {
88 foreach (MQQueue Queue in this.outputQueues.Values)
89 Queue.Close();
90 }
91
92 if (!(this.manager is null))
93 {
94 this.Information("Closing...");
95
96 this.manager?.Close();
97 this.manager = null;
98 }
99 }
100 catch (Exception ex)
101 {
102 Log.Exception(ex);
103 }
104 }
105
111 public Task ConnectAsync(string UserName, string Password)
112 {
113 ConnectionTask Item = new ConnectionTask(this, UserName, Password);
114 MqTasks.ExecuteTask(Item);
115 return Item.Completed;
116 }
117
123 public void Connect(string UserName, string Password)
124 {
125 this.Information("Connecting...");
126 try
127 {
128 Hashtable ConnectionParameters = new Hashtable()
129 {
130 { MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED },
131 { MQC.HOST_NAME_PROPERTY, this.host },
132 { MQC.PORT_PROPERTY, this.port },
133 { MQC.CHANNEL_PROPERTY, this.channel },
134 { MQC.USER_ID_PROPERTY, UserName },
135 { MQC.PASSWORD_PROPERTY, Password }
136 };
137
138 if (!string.IsNullOrEmpty(this.cipher))
139 ConnectionParameters[MQC.SSL_CIPHER_SPEC_PROPERTY] = this.cipher;
140
141 if (!string.IsNullOrEmpty(this.cipher))
142 ConnectionParameters[MQC.SSL_CIPHER_SUITE_PROPERTY] = this.cipherSuite;
143
144 if (!string.IsNullOrEmpty(this.cipher))
145 ConnectionParameters[MQC.SSL_CERT_STORE_PROPERTY] = this.certificateStore;
146
147 this.manager = new MQQueueManager(this.queueManager, ConnectionParameters);
148 }
149 catch (Exception ex)
150 {
151 this.Exception(ex);
152 ExceptionDispatchInfo.Capture(ex).Throw();
153 }
154 }
155
161 public Task PutAsync(string QueueName, string Message)
162 {
163 PutTask Item = new PutTask(this, QueueName, Message);
164 MqTasks.ExecuteTask(Item);
165 return Item.Completed;
166 }
167
173 public void Put(string QueueName, string Message)
174 {
175 try
176 {
177 MQQueue Queue;
178
179 this.Information("Putting to " + QueueName + ":");
180
181 lock (this.outputQueues)
182 {
183 if (!this.outputQueues.TryGetValue(QueueName, out Queue))
184 {
185 Queue = this.manager.AccessQueue(QueueName, MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING);
186 this.outputQueues[QueueName] = Queue;
187 }
188 }
189
190 MQMessage MqMessage = new MQMessage()
191 {
192 CharacterSet = 1208, // UTF-8
193 Format = MQC.MQFMT_STRING
194 };
195
196 this.TransmitText(Message);
197
198 MqMessage.WriteString(Message);
199
200 Queue.Put(MqMessage);
201 }
202 catch (Exception ex)
203 {
204 this.Exception(ex);
205 }
206 }
207
213 public string GetOne(string QueueName)
214 {
215 return this.GetOne(QueueName, MQC.MQWI_UNLIMITED);
216 }
217
224 public string GetOne(string QueueName, int TimeoutMilliseconds)
225 {
226 string Result = null;
227
228 try
229 {
230 MQQueue Queue;
231
232 lock (this.inputQueues)
233 {
234 if (!this.inputQueues.TryGetValue(QueueName, out Queue))
235 {
236 Queue = this.manager.AccessQueue(QueueName, MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING);
237 this.inputQueues[QueueName] = Queue;
238 }
239 }
240
241 MQMessage Message = new MQMessage();
242 MQGetMessageOptions Options = new MQGetMessageOptions()
243 {
244 Options = MQC.MQGMO_FAIL_IF_QUIESCING | MQC.MQGMO_WAIT,
245 WaitInterval = TimeoutMilliseconds
246 };
247
248 Queue.Get(Message, Options);
249
250 Result = Message.ReadString(Message.MessageLength);
251 this.ReceiveText(Result);
252 }
253 catch (MQException ex)
254 {
255 if (ex.Reason == 2033)
256 return null;
257
258 this.Exception(ex);
259 ExceptionDispatchInfo.Capture(ex).Throw();
260 }
261 catch (Exception ex)
262 {
263 this.Exception(ex);
264 ExceptionDispatchInfo.Capture(ex).Throw();
265 }
266
267 return Result;
268 }
269
275 public Task<string> GetOneAsync(string QueueName)
276 {
277 return this.GetOneAsync(QueueName, MQC.MQWI_UNLIMITED);
278 }
279
286 public Task<string> GetOneAsync(string QueueName, int TimeoutMilliseconds)
287 {
288 GetTask Item = new GetTask(this, QueueName, TimeoutMilliseconds);
289 MqTasks.ExecuteTask(Item);
290 return Item.Completed;
291 }
292
299 public void SubscribeIncoming(string QueueName, MqMessageEventHandler Callback, object State)
300 {
301 this.SubscribeIncoming(QueueName, null, null, Callback, State);
302 }
303
311 public void SubscribeIncoming(string QueueName, ManualResetEvent Cancel, MqMessageEventHandler Callback, object State)
312 {
313 this.SubscribeIncoming(QueueName, Cancel, null, Callback, State);
314 }
315
324 public void SubscribeIncoming(string QueueName, ManualResetEvent Cancel, TaskCompletionSource<bool> Stopped,
325 MqMessageEventHandler Callback, object State)
326 {
327 this.Information("Subscribing to messages from " + QueueName);
328 SubscriptionTask Item = new SubscriptionTask(this, QueueName, Cancel, Stopped, Callback, State);
329 MqTasks.ExecuteTask(Item);
330 }
331
332 }
333}
string GetOne(string QueueName, int TimeoutMilliseconds)
Gets one message from a queue.
Definition: MqClient.cs:224
void Connect(string UserName, string Password)
Connects to the Queue Manager
Definition: MqClient.cs:123
Task< string > GetOneAsync(string QueueName)
Gets one message from a queue.
Definition: MqClient.cs:275
MqClient(string QueueManager, string Channel, string Cipher, string CipherSuite, string CertificateStore, string Host, int Port, params ISniffer[] Sniffers)
IBM MQ client
Definition: MqClient.cs:60
MqClient(string QueueManager, string Channel, string Host, int Port, params ISniffer[] Sniffers)
IBM MQ client
Definition: MqClient.cs:44
Task< string > GetOneAsync(string QueueName, int TimeoutMilliseconds)
Gets one message from a queue.
Definition: MqClient.cs:286
void Dispose()
IDisposable.Dispose
Definition: MqClient.cs:76
const int DefaultPort
Default port for IBM MQ is 1414.
Definition: MqClient.cs:23
void SubscribeIncoming(string QueueName, ManualResetEvent Cancel, TaskCompletionSource< bool > Stopped, MqMessageEventHandler Callback, object State)
Subscribes to incoming messages.
Definition: MqClient.cs:324
void SubscribeIncoming(string QueueName, ManualResetEvent Cancel, MqMessageEventHandler Callback, object State)
Subscribes to incoming messages.
Definition: MqClient.cs:311
string GetOne(string QueueName)
Gets one message from a queue.
Definition: MqClient.cs:213
Task ConnectAsync(string UserName, string Password)
Connects to the Queue Manager asynchronously.
Definition: MqClient.cs:111
void SubscribeIncoming(string QueueName, MqMessageEventHandler Callback, object State)
Subscribes to incoming messages.
Definition: MqClient.cs:299
void Put(string QueueName, string Message)
Puts a message onto a queue.
Definition: MqClient.cs:173
Task PutAsync(string QueueName, string Message)
Puts a message onto a queue.
Definition: MqClient.cs:161
Static class managing the application event log. Applications and services log events on this static ...
Definition: Log.cs:13
static void Exception(Exception Exception, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, params KeyValuePair< string, object >[] Tags)
Logs an exception. Event type will be determined by the severity of the exception.
Definition: Log.cs:1647
Simple base class for classes implementing communication protocols.
Task Exception(Exception Exception)
Called to inform the viewer of an exception state.
ISniffer[] Sniffers
Registered sniffers.
Task Information(string Comment)
Called to inform the viewer of something.
Task TransmitText(string Text)
Called when text has been transmitted.
Task ReceiveText(string Text)
Called when text has been received.
Interface for sniffers. Sniffers can be added to ICommunicationLayer classes to eavesdrop on communic...
Definition: ISniffer.cs:11
delegate Task MqMessageEventHandler(object Sender, MqMessageEventArgs e)
Delegate for MQ Message event handlers or callback methods.