Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
PeerConnection.cs
1using System;
2using System.Collections.Generic;
3using System.Net;
4using System.Threading;
5using System.Threading.Tasks;
6using Waher.Events;
7
9{
13 public class PeerConnection : IDisposable
14 {
15 private byte[] packetBuffer = null;
16 private readonly PeerToPeerNetwork network;
17 private IPEndPoint remoteEndpoint;
18 private BinaryTcpClient tcpConnection;
19 private EventHandlerAsync resynchCallback;
20 private object stateObject = null;
21 private int readState = 0;
22 private int packetSize = 0;
23 private ushort outgoingPacketNumber = 0;
24 private int offset = 0;
25 private int packetPos = 0;
26 private bool closed = false;
27 private bool disposed = false;
28 private readonly bool encapsulatePackets;
29
30 internal PeerConnection(BinaryTcpClient TcpConnection, PeerToPeerNetwork Network, IPEndPoint RemoteEndpoint,
31 bool EncapsulatePackets)
32 {
33 this.network = Network;
34 this.remoteEndpoint = RemoteEndpoint;
35 this.tcpConnection = TcpConnection;
36 this.encapsulatePackets = EncapsulatePackets;
37
38 this.tcpConnection.OnDisconnected += this.TcpConnection_OnDisconnected;
39 this.tcpConnection.OnError += this.TcpConnection_OnError;
40 this.tcpConnection.OnReceived += this.TcpConnection_OnReceived;
41 this.tcpConnection.OnSent += this.TcpConnection_OnSent;
42 }
43
44 private async Task TcpConnection_OnSent(object Sender, byte[] Buffer, int Offset, int Count)
45 {
46 this.lastTcpPacket = DateTime.Now;
47
49 if (!(h is null))
50 {
51 try
52 {
53 await h(this, Buffer, Offset, Count);
54 }
55 catch (Exception ex)
56 {
57 Log.Exception(ex);
58 }
59 }
60 }
61
62 private async Task<bool> TcpConnection_OnReceived(object Sender, byte[] Buffer, int Offset, int Count)
63 {
64 bool Continue = true;
65
66 this.lastTcpPacket = DateTime.Now;
67 this.resynchCallback = null;
68
69 if (this.encapsulatePackets)
70 {
71 int NrLeft;
72 byte b;
73
74 while (Count-- > 0 && Continue && !this.disposed)
75 {
76 switch (this.readState)
77 {
78 case 0:
79 b = Buffer[Offset++];
80 this.packetSize |= (b & 127) << this.offset;
81 this.offset += 7;
82 if ((b & 128) == 0)
83 {
84 this.packetBuffer = new byte[this.packetSize];
85 this.packetPos = 0;
86 this.readState = 1;
87 }
88 break;
89
90 case 1:
91 NrLeft = Math.Min(Count, this.packetSize - this.packetPos);
92 Array.Copy(Buffer, Offset, this.packetBuffer, this.packetPos, NrLeft);
93 Offset += NrLeft;
94 this.packetPos += NrLeft;
95
96 if (this.packetPos >= this.packetSize)
97 {
98 Continue = await this.OnPacketReceived();
99
100 this.readState = 0;
101 this.packetSize = 0;
102 this.offset = 0;
103 this.packetBuffer = null;
104 }
105 break;
106
107 default:
108 Count = 0;
109 break;
110 }
111 }
112 }
113 else
114 {
115 this.packetSize = Count;
116 this.packetBuffer = new byte[Count];
117 Array.Copy(Buffer, Offset, this.packetBuffer, 0, Count);
118 Continue = await this.OnPacketReceived();
119 }
120
121 return Continue;
122 }
123
124 private async Task<bool> OnPacketReceived()
125 {
127 if (!(h is null))
128 {
129 try
130 {
131 return await h(this, this.packetBuffer, 0, this.packetSize);
132 }
133 catch (Exception ex)
134 {
135 Log.Exception(ex);
136 }
137 }
138
139 return true;
140 }
141
142 private Task TcpConnection_OnError(object _, Exception _2)
143 {
144 return this.Closed();
145 }
146
147 private Task TcpConnection_OnDisconnected(object Sender, EventArgs e)
148 {
149 return this.Closed();
150 }
151
155 public void Start()
156 {
157 this.Start(null);
158 }
159
165 public void Start(EventHandlerAsync ResynchCallback)
166 {
167 this.readState = 0;
168 this.packetSize = 0;
169 this.offset = 0;
170 this.resynchCallback = ResynchCallback;
171 this.tcpConnection?.Continue();
172 }
173
177 public BinaryTcpClient Tcp => this.tcpConnection;
178
182 public PeerToPeerNetwork Network => this.network;
183
187 public IPEndPoint RemoteEndpoint
188 {
189 get => this.remoteEndpoint;
190 internal set => this.remoteEndpoint = value;
191 }
192
196 [Obsolete("Use DisposeAsync()")]
197 public async void Dispose()
198 {
199 try
200 {
201 await this.DisposeAsync();
202 }
203 catch (Exception ex)
204 {
205 Log.Exception(ex);
206 }
207 }
208
212 public Task DisposeAsync()
213 {
214 this.disposed = true;
215
216 this.idleTimer?.Dispose();
217 this.idleTimer = null;
218
219 this.tcpConnection?.Dispose();
220 this.tcpConnection = null;
221
222 return this.Closed();
223 }
224
230 public Task SendTcp(byte[] Packet)
231 {
232 return this.SendTcp(Packet, null, null);
233 }
234
242 public Task SendTcp(byte[] Packet, EventHandlerAsync<DeliveryEventArgs> Callback, object State)
243 {
244 if (this.disposed)
245 return Task.CompletedTask;
246
247 byte[] EncodedPacket = this.EncodePacket(Packet, false);
248 return this.tcpConnection.SendAsync(EncodedPacket, Callback, State);
249 }
250
251 private byte[] EncodePacket(byte[] Packet, bool IncludePacketNumber)
252 {
253 if (!this.encapsulatePackets)
254 return Packet;
255
256 ushort PacketNr;
257 int i = Packet.Length;
258 int j = 0;
259 int c = 1;
260 byte b;
261
262 i >>= 7;
263 while (i > 0)
264 {
265 c++;
266 i >>= 7;
267 }
268
269 if (IncludePacketNumber)
270 c += 2;
271
272 i = Packet.Length;
273
274 byte[] Packet2 = new byte[c + i];
275 Array.Copy(Packet, 0, Packet2, c, i);
276
277 do
278 {
279 b = (byte)(i & 127);
280 i >>= 7;
281 if (i > 0)
282 b |= 128;
283
284 Packet2[j++] = b;
285 }
286 while (i > 0);
287
288 if (IncludePacketNumber)
289 {
290 PacketNr = ++this.outgoingPacketNumber;
291
292 Packet2[j++] = (byte)PacketNr;
293 Packet2[j++] = (byte)(PacketNr >> 8);
294 }
295
296 return Packet2;
297 }
298
308 public Task SendUdp(byte[] Packet, int IncludeNrPreviousPackets)
309 {
310 byte[] EncodedPacket = this.EncodePacket(Packet, true);
311
312 lock (this.historicPackets)
313 {
314 byte[] ToSend;
315 int j, i = 0;
316 int c = EncodedPacket.Length;
317
318 if (IncludeNrPreviousPackets == 0)
319 ToSend = EncodedPacket;
320 else
321 {
322 foreach (byte[] Packet2 in this.historicPackets)
323 {
324 c += Packet2.Length;
325 i++;
326 if (i >= IncludeNrPreviousPackets)
327 break;
328 }
329
330 ToSend = new byte[c];
331 j = EncodedPacket.Length;
332 Array.Copy(EncodedPacket, 0, ToSend, 0, j);
333
334 i = 0;
335 foreach (byte[] Packet2 in this.historicPackets)
336 {
337 Array.Copy(Packet2, 0, ToSend, j, Packet2.Length);
338 j += Packet2.Length;
339 i++;
340 if (i >= IncludeNrPreviousPackets)
341 break;
342 }
343 }
344
345 this.historicPackets.AddFirst(EncodedPacket);
346
347 if (this.nrHistoricPackets >= IncludeNrPreviousPackets)
348 this.historicPackets.RemoveLast(); // Doesn't reduce the size to INcludeNrPreviousPackets, but keeps list at the largest requested number, to date.
349 else
350 this.nrHistoricPackets++;
351
352 return this.network.SendUdp(this.remoteEndpoint, ToSend);
353 }
354 }
355
356 private int nrHistoricPackets = 0;
357 private readonly LinkedList<byte[]> historicPackets = new LinkedList<byte[]>();
358
363
367 public bool Paused => this.tcpConnection.Paused;
368
372 public void Continue()
373 {
374 this.tcpConnection.Continue();
375 }
376
381
382 private async Task Closed()
383 {
384 if (!this.closed)
385 {
386 this.closed = true;
387
388 if (!(this.resynchCallback is null))
389 {
390 await this.resynchCallback.Raise(this, EventArgs.Empty, false);
391 await this.DisposeAsync();
392 }
393 else
394 await this.RaiseOnClosed();
395 }
396 }
397
398 private Task RaiseOnClosed()
399 {
400 return this.OnClosed.Raise(this, EventArgs.Empty);
401 }
402
406 public event EventHandlerAsync OnClosed = null;
407
411 public object StateObject
412 {
413 get => this.stateObject;
414 set => this.stateObject = value;
415 }
416
417 internal async Task UdpDatagramReceived(object _, UdpDatagramEventArgs e)
418 {
419 if (this.encapsulatePackets)
420 {
421 LinkedList<KeyValuePair<ushort, byte[]>> LostPackets = null;
422 byte[] FirstPacket = null;
423 ushort FirstPacketNr = 0;
424 ushort PacketNr;
425 byte[] Packet;
426 byte[] Data = e.Data;
427 int Len = Data.Length;
428 int Pos = 0;
429 int PacketLen;
430 int Offset;
431 byte b;
432
433 lock (this.udpReceiveLock)
434 {
435 while (Pos < Len)
436 {
437 b = Data[Pos++];
438 PacketLen = (b & 127);
439 Offset = 7;
440 while (Pos < Len && (b & 128) != 0)
441 {
442 b = Data[Pos++];
443 PacketLen |= (b & 127) << Offset;
444 Offset += 7;
445 }
446
447 if (Pos + 2 > Len)
448 break;
449
450 PacketNr = Data[Pos++];
451 PacketNr |= (ushort)(Data[Pos++] << 8);
452
453 if (Pos + PacketLen > Len)
454 break;
455
456 Packet = new byte[PacketLen];
457 Array.Copy(Data, Pos, Packet, 0, PacketLen);
458 Pos += PacketLen;
459
460 if ((short)(PacketNr - this.lastReceivedPacket) > 0)
461 {
462 if (FirstPacket is null)
463 {
464 FirstPacket = Packet;
465 FirstPacketNr = PacketNr;
466 }
467 else
468 {
469 if (LostPackets is null)
470 LostPackets = new LinkedList<KeyValuePair<ushort, byte[]>>();
471
472 LostPackets.AddFirst(new KeyValuePair<ushort, byte[]>(PacketNr, Packet)); // Reverse order
473 }
474 }
475 }
476
477 if (!(FirstPacket is null))
478 this.lastReceivedPacket = FirstPacketNr;
479 }
480
482 if (!(h is null))
483 {
484 if (!(LostPackets is null))
485 {
486 foreach (KeyValuePair<ushort, byte[]> P in LostPackets)
487 {
488 try
489 {
490 await h(this, P.Value, 0, P.Value.Length);
491 }
492 catch (Exception ex)
493 {
494 Log.Exception(ex);
495 }
496 }
497 }
498
499 if (!(FirstPacket is null))
500 {
501 try
502 {
503 await h(this, FirstPacket, 0, FirstPacket.Length);
504 }
505 catch (Exception ex)
506 {
507 Log.Exception(ex);
508 }
509 }
510 }
511 }
512 else
513 {
514 byte[] Data = e.Data;
515 int Len = Data.Length;
516 byte[] Packet = new byte[Len];
517
518 Array.Copy(Data, 0, Packet, 0, Len);
519
521 if (!(h is null))
522 {
523 try
524 {
525 await h(this, Packet, 0, Packet.Length);
526 }
527 catch (Exception ex)
528 {
529 Log.Exception(ex);
530 }
531 }
532 }
533 }
534
535 private ushort lastReceivedPacket = 0;
536 private readonly object udpReceiveLock = new object();
537
538 internal void StartIdleTimer()
539 {
540 this.idleTimer = new Timer(this.IdleTimerCallback, null, 5000, 5000);
541 }
542
543 private async void IdleTimerCallback(object P)
544 {
545 try
546 {
547 if ((DateTime.Now - this.lastTcpPacket).TotalSeconds > 10)
548 {
549 try
550 {
551 await this.SendTcp(new byte[0]);
552 }
553 catch (Exception)
554 {
555 try
556 {
557 await this.Closed();
558 await this.DisposeAsync();
559 }
560 catch (Exception ex)
561 {
562 Log.Exception(ex);
563 }
564 }
565 }
566 }
567 catch (Exception ex)
568 {
569 Log.Exception(ex);
570 }
571 }
572
573 private Timer idleTimer = null;
574 private DateTime lastTcpPacket = DateTime.Now;
575
576 }
577}
Static class managing the application event log. Applications and services log events on this static ...
Definition: Log.cs:13
static void Exception(Exception Exception, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, params KeyValuePair< string, object >[] Tags)
Logs an exception. Event type will be determined by the severity of the exception.
Definition: Log.cs:1647
Implements a binary TCP Client, by encapsulating a TcpClient. It also makes the use of TcpClient safe...
Task< bool > SendAsync(byte[] Packet)
Sends a binary packet.
void Continue()
Continues reading from the socket, if paused in an event handler.
bool Paused
If the reading is paused.
virtual void Dispose()
Disposes of the object. The underlying TcpClient is either disposed directly, or when asynchronous op...
BinaryTcpClient Tcp
Underlying TCP connection
PeerToPeerNetwork Network
Peer-to-peer network.
object StateObject
State object that applications can use to attach information to a connection.
BinaryDataWrittenEventHandler OnSent
Event raised when a packet has been sent.
void Start(EventHandlerAsync ResynchCallback)
Starts receiving on the connection.
IPEndPoint RemoteEndpoint
Remote endpoint.
void Start()
Starts receiving on the connection.
void Continue()
Continues a paused connection.
bool Paused
If reading has been paused.
Task SendTcp(byte[] Packet)
Sends a packet to the peer at the other side of the TCP connection. Transmission is done asynchronous...
async void Dispose()
IDisposable.Dispose
Task SendTcp(byte[] Packet, EventHandlerAsync< DeliveryEventArgs > Callback, object State)
Sends a packet to the peer at the other side of the TCP connection. Transmission is done asynchronous...
BinaryDataReadEventHandler OnReceived
Event received when binary data has been received.
Task SendUdp(byte[] Packet, int IncludeNrPreviousPackets)
Sends a packet to a peer using UDP. Transmission is done asynchronously and is buffered if a sending ...
EventHandlerAsync OnClosed
Event raised when a connection has been closed for some reason.
Manages a peer-to-peer network that can receive connections from outside of a NAT-enabled firewall.
Event arguments for UDP Datagram events.
delegate Task EventHandlerAsync(object Sender, EventArgs e)
Asynchronous version of EventArgs.
delegate Task BinaryDataWrittenEventHandler(object Sender, byte[] Buffer, int Offset, int Count)
Event handler for binary packet events.
delegate Task< bool > BinaryDataReadEventHandler(object Sender, byte[] Buffer, int Offset, int Count)
Event handler for binary packet events.