Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
ClusterUdpClient.cs
1using System;
2using System.Collections.Generic;
3using System.Net;
4using System.Net.Sockets;
5using System.Security.Cryptography;
6using Waher.Events;
7
9{
10 internal class ClusterUdpClient : IDisposable
11 {
12 private readonly ClusterEndpoint endpoint;
13 private readonly LinkedList<byte[]> outputQueue = new LinkedList<byte[]>();
14 private readonly IPAddress localAddress;
15 private readonly byte[] ivTx = new byte[16];
16 private readonly byte[] ivRx = new byte[16];
17 private HMACSHA1 hmac;
18 private UdpClient client;
19 private bool isWriting = false;
20 private bool disposed = false;
21
22 internal ClusterUdpClient(ClusterEndpoint Endpoint, UdpClient Client, IPAddress LocalAddress)
23 {
24 this.endpoint = Endpoint;
25 this.client = Client;
26 this.localAddress = LocalAddress;
27
28 if (LocalAddress is null)
29 this.hmac = null;
30 else
31 {
32 byte[] A = LocalAddress.GetAddressBytes();
33 Array.Copy(A, 0, this.ivTx, 8, 4);
34
35 this.hmac = new HMACSHA1(A);
36 }
37 }
38
39 internal IPAddress Address => this.localAddress;
40 internal IPEndPoint EndPoint => this.client.Client.LocalEndPoint as IPEndPoint;
41
42 internal bool IsEndpoint(string AddressString, int Port)
43 {
44 if (!(this.client.Client.LocalEndPoint is IPEndPoint EndPoint))
45 return false;
46
47 return (EndPoint.Address.ToString() == AddressString && EndPoint.Port == Port);
48 }
49
50 public void Dispose()
51 {
52 this.disposed = true;
53
54 this.client?.Dispose();
55 this.client = null;
56
57 this.hmac?.Dispose();
58 this.hmac = null;
59 }
60
61 internal async void BeginReceive() // Starts parallel task
62 {
63 try
64 {
65 while (!this.disposed)
66 {
67 UdpReceiveResult Data = await this.client.ReceiveAsync();
68 if (this.disposed)
69 return;
70
71 await this.endpoint.Information(Data.Buffer.Length.ToString() + " bytes received. (" + DateTime.Now.TimeOfDay.ToString() + ")");
72
73 try
74 {
75 byte[] Datagram = Data.Buffer;
76 int i, c = Datagram.Length - 12;
77
78 if (c <= 0 || (c & 15) != 0)
79 continue;
80
81 long Ticks = BitConverter.ToInt64(Datagram, 0);
82 DateTime TP = new DateTime(Ticks, DateTimeKind.Utc);
83 if ((DateTime.UtcNow - TP).TotalSeconds >= 10) // Margin for unsynchronized clocks.
84 continue;
85
86 Array.Copy(Datagram, 0, this.ivRx, 0, 8);
87 Array.Copy(Datagram, 8, this.ivRx, 12, 4);
88
89 byte[] A = Data.RemoteEndPoint.Address.GetAddressBytes();
90 Array.Copy(A, 0, this.ivRx, 8, 4);
91
92 int FragmentNr = this.ivRx[13];
93 bool LastFragment = (FragmentNr & 0x80) != 0;
94 FragmentNr &= 0x7f;
95 FragmentNr <<= 8;
96 FragmentNr |= this.ivRx[12];
97
98 int Padding = this.ivRx[14] >> 4;
99
100 using (ICryptoTransform Decryptor = this.endpoint.aes.CreateDecryptor(this.endpoint.key, this.ivRx))
101 {
102 byte[] Decrypted = Decryptor.TransformFinalBlock(Datagram, 12, Datagram.Length - 12);
103
104 using (HMACSHA1 HMAC = new HMACSHA1(A))
105 {
106 c = Decrypted.Length - 20 - Padding;
107
108 byte[] MAC = HMAC.ComputeHash(Decrypted, 20, c);
109
110 for (i = 0; i < 20; i++)
111 {
112 if (MAC[i] != Decrypted[i])
113 break;
114 }
115
116 if (i < 20)
117 continue;
118
119 byte[] Received = new byte[c];
120 Array.Copy(Decrypted, 20, Received, 0, c);
121
122 if (LastFragment && FragmentNr == 0)
123 this.endpoint.DataReceived(Received, Data.RemoteEndPoint);
124 else
125 {
126 string Key = Data.RemoteEndPoint.ToString() + " " + Ticks.ToString();
127
128 if (!this.endpoint.currentStatus.TryGetValue(Key, out object Obj) ||
129 !(Obj is Fragments Fragments))
130 {
131 Fragments = new Fragments()
132 {
133 Source = Data.RemoteEndPoint,
134 Timestamp = Ticks
135 };
136
137 this.endpoint.currentStatus[Key] = Fragments;
138 }
139
140 Fragments.Parts[FragmentNr] = Received;
141
142 if (LastFragment)
143 {
144 Fragments.Done = true;
145 Fragments.NrParts = FragmentNr + 1;
146 }
147
148 if (Fragments.Done &&
149 Fragments.NrParts == Fragments.Parts.Count)
150 {
151 this.endpoint.currentStatus.Remove(Key);
152 this.endpoint.DataReceived(Fragments.ToByteArray(), Fragments.Source);
153 }
154 }
155 }
156 }
157 }
158 catch (Exception ex)
159 {
160 await this.endpoint.Exception(ex);
161 Log.Exception(ex);
162 }
163 }
164 }
165 catch (ObjectDisposedException)
166 {
167 // Closed.
168 }
169 catch (Exception ex)
170 {
171 await this.endpoint.Exception(ex);
172 }
173 }
174
175 internal async void BeginTransmit(byte[] Message, IPEndPoint Destination) // Starts parallel task
176 {
177 try
178 {
179 if (this.disposed)
180 return;
181
182 lock (this.outputQueue)
183 {
184 if (this.isWriting)
185 {
186 this.outputQueue.AddLast(Message);
187 return;
188 }
189 else
190 this.isWriting = true;
191 }
192
193 while (!(Message is null))
194 {
195 int Len = Message.Length;
196 int NrFragments = (Len + 32767) >> 15;
197 int FragmentNr;
198 int Pos = 0;
199
200 if (NrFragments == 0)
201 return;
202
203 if (NrFragments >= 32768)
204 throw new ArgumentOutOfRangeException("Message too big.", nameof(Message));
205
206 byte[] TP = BitConverter.GetBytes(DateTime.UtcNow.Ticks);
207 Array.Copy(TP, 0, this.ivTx, 0, 8);
208
209 for (FragmentNr = 0; FragmentNr < NrFragments; FragmentNr++, Pos += 32768)
210 {
211 int FragmentSize = Math.Min(32768, Len - (FragmentNr << 15));
212 int Padding = (-(20 + FragmentSize)) & 15;
213 byte[] Datagram = new byte[32 + FragmentSize + Padding];
214
215 this.ivTx[12] = (byte)FragmentNr;
216 this.ivTx[13] = (byte)(FragmentNr >> 8);
217
218 if (FragmentNr == NrFragments - 1)
219 this.ivTx[13] |= 0x80;
220
221 this.ivTx[14] = (byte)((this.ivTx[14] & 0x0f) | (Padding << 4));
222
223 Array.Copy(this.ivTx, 0, Datagram, 0, 8);
224 Array.Copy(this.ivTx, 12, Datagram, 8, 4);
225
226 byte[] MAC = this.hmac.ComputeHash(Message, Pos, FragmentSize);
227
228 Array.Copy(MAC, 0, Datagram, 12, 20);
229 Array.Copy(Message, Pos, Datagram, 32, FragmentSize);
230
231 using (ICryptoTransform Encryptor = this.endpoint.aes.CreateEncryptor(this.endpoint.key, this.ivTx))
232 {
233 byte[] Encrypted = Encryptor.TransformFinalBlock(Datagram, 12, Datagram.Length - 12);
234 Array.Copy(Encrypted, 0, Datagram, 12, Encrypted.Length);
235 }
236
237 if (++this.ivTx[15] == 0)
238 ++this.ivTx[14];
239
240 await this.client.SendAsync(Datagram, Datagram.Length, Destination);
241
242 if (this.disposed)
243 return;
244 }
245
246 lock (this.outputQueue)
247 {
248 if (this.outputQueue.First is null)
249 {
250 this.isWriting = false;
251 Message = null;
252 }
253 else
254 {
255 Message = this.outputQueue.First.Value;
256 this.outputQueue.RemoveFirst();
257 }
258 }
259 }
260 }
261 catch (Exception ex)
262 {
263 await this.endpoint.Exception(ex);
264
265 lock (this.outputQueue)
266 {
267 this.outputQueue.Clear();
268 this.isWriting = false;
269 }
270 }
271 finally
272 {
273 if (this.endpoint.shuttingDown)
274 this.endpoint.Dispose2();
275 }
276 }
277 }
278}
Static class managing the application event log. Applications and services log events on this static ...
Definition: Log.cs:13
static void Exception(Exception Exception, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, params KeyValuePair< string, object >[] Tags)
Logs an exception. Event type will be determined by the severity of the exception.
Definition: Log.cs:1647