Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
PipeEventRecipient.cs
1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.IO.Pipes;
5using System.Text;
6using System.Threading.Tasks;
7using System.Xml;
11
12namespace Waher.Events.Pipe
13{
19 public delegate NamedPipeServerStream NamedPipeServerStreamFactory(string Name);
20
24 public class PipeEventRecipient : IDisposable
25 {
26 private const int bufferSize = 65536;
27
28 private readonly StringBuilder fragment = new StringBuilder();
29 private readonly string pipeName;
30 private readonly byte[] inputBuffer = new byte[bufferSize];
31 private readonly bool logIncoming;
32 private readonly NamedPipeServerStreamFactory pipeStreamFactory;
33 private NamedPipeServerStream pipe = null;
34 private bool disposed = false;
35 private int inputState = 0;
36 private int inputDepth = 0;
37
42 public PipeEventRecipient(string PipeName)
43 : this(PipeName, true, DefaultFactory)
44 {
45 }
46
52 public PipeEventRecipient(string PipeName, bool LogIncomingEvents)
53 : this(PipeName, LogIncomingEvents, DefaultFactory)
54 {
55 }
56
63 public PipeEventRecipient(string PipeName, bool LogIncomingEvents, NamedPipeServerStreamFactory StreamFactory)
64 {
65 if (StreamFactory is null)
66 throw new ArgumentNullException(nameof(StreamFactory), "Pipe stream factory cannot be null.");
67
68 this.pipeStreamFactory = StreamFactory;
69 this.pipeName = PipeName;
70 this.logIncoming = LogIncomingEvents;
71 this.pipe = StreamFactory(this.pipeName);
72 this.pipe.BeginWaitForConnection(this.PipeClientConnected, null);
73 }
74
75 private static NamedPipeServerStream DefaultFactory(string Name)
76 {
77 return new NamedPipeServerStream(Name, PipeDirection.In, 1, PipeTransmissionMode.Message,
78 PipeOptions.Asynchronous, 65536, 65536);
79 }
80
84 public NamedPipeServerStream Pipe => this.pipe;
85
89 public void Dispose()
90 {
91 if (!this.disposed)
92 {
93 this.disposed = true;
94 this.pipe?.Dispose();
95 this.pipe = null;
96 }
97 }
98
99 private void RestartPipe()
100 {
101 if (this.pipe is null || this.disposed)
102 return;
103
104 try
105 {
106 this.pipe?.Disconnect();
107 }
108 catch (Exception ex)
109 {
110 Log.Exception(ex);
111 this.pipe?.Close();
112 this.pipe?.Dispose();
113 this.pipe = null;
114
115 this.pipe = this.pipeStreamFactory(this.pipeName);
116 }
117
118 this.inputState = 0;
119 this.inputDepth = 0;
120 this.fragment.Clear();
121
122 this.pipe.BeginWaitForConnection(this.PipeClientConnected, null);
123 }
124
125 private void PipeClientConnected(IAsyncResult ar)
126 {
127 try
128 {
129 if (this.pipe is null)
130 return;
131
132 this.pipe.EndWaitForConnection(ar);
133 this.pipe.BeginRead(this.inputBuffer, 0, bufferSize, this.PipeReadComplete, null);
134 }
135 catch (Exception ex)
136 {
137 Log.Exception(ex);
138 this.RestartPipe();
139 }
140 }
141
142 private async void PipeReadComplete(IAsyncResult ar)
143 {
144 try
145 {
146 if (this.pipe is null)
147 return;
148
149 int NrRead = this.pipe.EndRead(ar);
150 if (NrRead <= 0)
151 this.pipe.BeginWaitForConnection(this.PipeClientConnected, null);
152 else
153 {
154 string s = Encoding.UTF8.GetString(this.inputBuffer, 0, NrRead);
155 bool Continue;
156
157 try
158 {
159 Continue = await this.ParseIncoming(s);
160 }
161 catch (Exception ex)
162 {
163 Log.Exception(ex);
164 Continue = true;
165 }
166
167 if (Continue)
168 this.pipe.BeginRead(this.inputBuffer, 0, bufferSize, this.PipeReadComplete, null);
169 }
170 }
171 catch (IOException)
172 {
173 Log.Error("Pipe-connection down.", this.pipeName);
174 this.RestartPipe();
175 }
176 catch (Exception ex)
177 {
178 Log.Exception(ex, this.pipeName);
179 this.RestartPipe();
180 }
181 }
182
183 private async Task<bool> ParseIncoming(string s)
184 {
185 bool Result = true;
186
187 foreach (char ch in s)
188 {
189 switch (this.inputState)
190 {
191 case 0: // Waiting for <
192 if (ch == '<')
193 {
194 this.fragment.Append(ch);
195 this.inputState++;
196 }
197 else if (this.inputDepth > 0)
198 this.fragment.Append(ch);
199 else if (ch > ' ')
200 {
201 this.RestartPipe();
202 return false;
203 }
204 break;
205
206 case 1: // Second character in tag
207 this.fragment.Append(ch);
208 if (ch == '/')
209 this.inputState++;
210 else
211 this.inputState += 2;
212 break;
213
214 case 2: // Waiting for end of closing tag
215 this.fragment.Append(ch);
216 if (ch == '>')
217 {
218 this.inputDepth--;
219 if (this.inputDepth < 0)
220 {
221 this.RestartPipe();
222 return false;
223 }
224 else
225 {
226 if (this.inputDepth == 0)
227 {
228 if (!await this.ProcessFragment(this.fragment.ToString()))
229 Result = false;
230
231 this.fragment.Clear();
232 }
233
234 if (this.inputState > 0)
235 this.inputState = 0;
236 }
237 }
238 break;
239
240 case 3: // Wait for end of start tag
241 this.fragment.Append(ch);
242 if (ch == '>')
243 {
244 this.inputDepth++;
245 this.inputState = 0;
246 }
247 else if (ch == '/')
248 this.inputState++;
249 break;
250
251 case 4: // Check for end of childless tag.
252 this.fragment.Append(ch);
253 if (ch == '>')
254 {
255 if (this.inputDepth == 0)
256 {
257 if (!await this.ProcessFragment(this.fragment.ToString()))
258 Result = false;
259
260 this.fragment.Clear();
261 }
262
263 if (this.inputState != 0)
264 this.inputState = 0;
265 }
266 else
267 this.inputState--;
268 break;
269
270 default:
271 break;
272 }
273 }
274
275 return Result;
276 }
277
278 private async Task<bool> ProcessFragment(string Xml)
279 {
280 try
281 {
282 XmlDocument Doc = new XmlDocument();
283
284 Doc.LoadXml(Xml);
285
286 if (EventExtensions.TryParse(Doc.DocumentElement, out Event Event))
287 {
288 if (this.logIncoming)
289 Log.Event(Event);
290
291 await this.EventReceived.Raise(this, new EventEventArgs(Event));
292 }
293 else
294 await this.CustomFragmentReceived.Raise(this, new CustomFragmentEventArgs(Doc));
295 }
296 catch (Exception ex)
297 {
298 Log.Exception(ex);
299 }
300
301 return true;
302 }
303
307 public event EventHandlerAsync<EventEventArgs> EventReceived;
308
312 public event EventHandlerAsync<CustomFragmentEventArgs> CustomFragmentReceived;
313 }
314}
Class representing an event.
Definition: Event.cs:10
static bool TryParse(string Xml, out Event[] Parsed)
Tries to parse an event object (or collection of event objects) from an XML document.
Static class managing the application event log. Applications and services log events on this static ...
Definition: Log.cs:13
static void Exception(Exception Exception, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, params KeyValuePair< string, object >[] Tags)
Logs an exception. Event type will be determined by the severity of the exception.
Definition: Log.cs:1647
static void Error(string Message, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, string StackTrace, params KeyValuePair< string, object >[] Tags)
Logs an error event.
Definition: Log.cs:682
static async void Event(Event Event)
Logs an event. It will be distributed to registered event sinks.
Definition: Log.cs:128
Receives events from an operating system pipe
EventHandlerAsync< CustomFragmentEventArgs > CustomFragmentReceived
Event raised when a custom XML fragment has been received.
EventHandlerAsync< EventEventArgs > EventReceived
Event raised when an event has been received.
PipeEventRecipient(string PipeName, bool LogIncomingEvents, NamedPipeServerStreamFactory StreamFactory)
Receives events from an operating system pipe
NamedPipeServerStream Pipe
Pipe object.
PipeEventRecipient(string PipeName, bool LogIncomingEvents)
Receives events from an operating system pipe
PipeEventRecipient(string PipeName)
Receives events from an operating system pipe
delegate NamedPipeServerStream NamedPipeServerStreamFactory(string Name)
Delegate for methods that create object instances of NamedPipeServerStream.