Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
PipeEventSink.cs
1using System;
2using System.Collections.Generic;
3using System.IO;
4using System.IO.Pipes;
5using System.Security.Principal;
6using System.Text;
7using System.Threading.Tasks;
11
12namespace Waher.Events.Pipe
13{
19 public delegate NamedPipeClientStream NamedPipeClientStreamFactory(string Name);
20
24 public class PipeEventSink : EventSink
25 {
26 private readonly LinkedList<byte[]> pipeQueue = new LinkedList<byte[]>();
27 private readonly NamedPipeClientStreamFactory pipeStreamFactory;
28 private readonly string pipeName;
29 private NamedPipeClientStream pipe;
30 private bool writing = false;
31
37 public PipeEventSink(string ObjectId, string PipeName)
38 : this(ObjectId, PipeName, DefaultFactory)
39 {
40 }
41
48 public PipeEventSink(string ObjectId, string PipeName, NamedPipeClientStreamFactory StreamFactory)
49 : base(ObjectId)
50 {
51 this.pipeStreamFactory = StreamFactory;
52 this.pipe = null;
53 this.pipeName = PipeName;
54 }
55
56 private static NamedPipeClientStream DefaultFactory(string Name)
57 {
58 return new NamedPipeClientStream(".", Name, PipeDirection.Out, PipeOptions.Asynchronous,
59 TokenImpersonationLevel.Anonymous, HandleInheritability.None);
60 }
61
65 public NamedPipeClientStream Pipe => this.pipe;
66
70 public override void Dispose()
71 {
72 base.Dispose();
73
74 this.pipe?.Dispose();
75 this.pipe = null;
76 }
77
82 public override Task Queue(Event Event)
83 {
84 return this.Queue(Event.ToXML(), false);
85 }
86
91 public Task Queue(string Xml)
92 {
93 return this.Queue(Xml, true);
94 }
95
100 private async Task Queue(string Xml, bool ValidateXml)
101 {
102 if (ValidateXml && !XML.IsValidXml(Xml))
103 throw new ArgumentException("Invalid XML.", nameof(Xml));
104
105 try
106 {
107 byte[] Bin = Encoding.UTF8.GetBytes(Xml);
108
109 lock (this.pipeQueue)
110 {
111 if (this.writing)
112 {
113 this.pipeQueue.AddLast(Bin);
114 return;
115 }
116 else
117 this.writing = true;
118 }
119
120 if (!(this.pipe is null) && !this.pipe.IsConnected)
121 {
122 this.pipe.Dispose();
123 this.pipe = null;
124 }
125
126 if (this.pipe is null)
127 {
128 this.pipe = this.pipeStreamFactory(this.pipeName);
129 await this.BeforeConnect.Raise(this, EventArgs.Empty);
130 await this.pipe.ConnectAsync(5000);
131 await this.AfterConnect.Raise(this, EventArgs.Empty);
132 }
133
134 while (!(Bin is null))
135 {
136 await this.pipe.WriteAsync(Bin, 0, Bin.Length);
137
138 lock (this.pipeQueue)
139 {
140 if (this.pipeQueue.First is null)
141 {
142 this.writing = false;
143 Bin = null;
144 }
145 else
146 {
147 Bin = this.pipeQueue.First.Value;
148 this.pipeQueue.RemoveFirst();
149 }
150 }
151 }
152 }
153 catch (TimeoutException)
154 {
155 this.EmptyPipeQueue();
156 }
157 catch (IOException)
158 {
159 this.EmptyPipeQueue();
160 }
161 catch (Exception ex)
162 {
163 this.EmptyPipeQueue();
164 Log.Exception(ex);
165 }
166 }
167
168 private void EmptyPipeQueue()
169 {
170 lock (this.pipeQueue)
171 {
172 this.pipeQueue.Clear();
173 this.writing = false;
174 }
175 }
176
181
186 }
187}
Helps with common XML-related tasks.
Definition: XML.cs:19
static bool IsValidXml(string Xml)
Checks if a string is valid XML
Definition: XML.cs:1223
Class representing an event.
Definition: Event.cs:10
Base class for event sinks.
Definition: EventSink.cs:9
Static class managing the application event log. Applications and services log events on this static ...
Definition: Log.cs:13
static void Exception(Exception Exception, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, params KeyValuePair< string, object >[] Tags)
Logs an exception. Event type will be determined by the severity of the exception.
Definition: Log.cs:1647
Writes logged events to an operating system pipe, for inter-process communication.
PipeEventSink(string ObjectId, string PipeName)
Writes logged events to an operating system pipe, for inter-process communication.
override void Dispose()
IDisposable.Dispose()
EventHandlerAsync AfterConnect
Raised after connecting to the pipe stream
override Task Queue(Event Event)
Queues an event to be output.
PipeEventSink(string ObjectId, string PipeName, NamedPipeClientStreamFactory StreamFactory)
Writes logged events to an operating system pipe, for inter-process communication.
NamedPipeClientStream Pipe
Pipe object.
EventHandlerAsync BeforeConnect
Raised before connecting to the pipe stream.
Task Queue(string Xml)
Queues XML-encoded information to be output.
delegate NamedPipeClientStream NamedPipeClientStreamFactory(string Name)
Delegate for methods that create object instances of NamedPipeClientStream.
delegate Task EventHandlerAsync(object Sender, EventArgs e)
Asynchronous version of EventArgs.