Neuron®
The Neuron® is the basis for the creation of open and secure federated networks for smart societies.
Loading...
Searching...
No Matches
ClusterEndpoint.cs
1using System;
2using System.Collections.Generic;
3using System.Net;
4using System.Net.NetworkInformation;
5using System.Net.Sockets;
6using System.Reflection;
7using System.Security.Cryptography;
8using System.Text;
9using System.Threading;
10using System.Threading.Tasks;
11using Waher.Events;
20using Waher.Security;
21#if WINDOWS_UWP
22using Windows.Networking;
23using Windows.Networking.Connectivity;
24#endif
25
27{
31 public class ClusterEndpoint : CommunicationLayer, IDisposable
32 {
33 private static readonly Dictionary<Type, ObjectInfo> objectInfo = new Dictionary<Type, ObjectInfo>();
34 private static Dictionary<Type, IProperty> propertyTypes = null;
35 private static ObjectInfo rootObject;
36
37 private readonly LinkedList<ClusterUdpClient> outgoing = new LinkedList<ClusterUdpClient>();
38 private readonly LinkedList<ClusterUdpClient> incoming = new LinkedList<ClusterUdpClient>();
39 private readonly IPEndPoint destination;
40 internal readonly byte[] key;
41 internal Aes aes;
42 internal Cache<string, object> currentStatus;
43 private Scheduler scheduler;
44 private readonly Random rnd = new Random();
45 private readonly Dictionary<IPEndPoint, object> remoteStatus;
46 internal readonly Dictionary<string, LockInfo> lockedResources = new Dictionary<string, LockInfo>();
47 private object localStatus;
48 private Timer aliveTimer;
49 internal bool shuttingDown = false;
50 private readonly bool internalScheduler;
51 private ManualResetEvent shutDown = new ManualResetEvent(false);
52
60 public ClusterEndpoint(IPAddress MulticastAddress, int Port, string SharedSecret,
61 params ISniffer[] Sniffers)
62 : this(MulticastAddress, Port, Encoding.UTF8.GetBytes(SharedSecret), Sniffers)
63 {
64 }
65
73 public ClusterEndpoint(IPAddress MulticastAddress, int Port, byte[] SharedSecret, params ISniffer[] Sniffers)
74 : base(true, Sniffers)
75 {
76 ClusterUdpClient ClusterUdpClient;
77 UdpClient Client;
78
79 if (MulticastAddress.AddressFamily != AddressFamily.InterNetwork)
80 throw new ArgumentException("Cluster communication must be done using IPv4.", nameof(MulticastAddress));
81
82 if (propertyTypes is null)
83 {
84 Init();
85 rootObject = GetObjectInfo(typeof(object));
86 }
87
88 this.aes = Aes.Create();
89 this.aes.BlockSize = 128;
90 this.aes.KeySize = 256;
91 this.aes.Mode = CipherMode.CBC;
92 this.aes.Padding = PaddingMode.None;
93
94 this.key = Hashes.ComputeSHA256Hash(SharedSecret);
95 this.destination = new IPEndPoint(MulticastAddress, Port);
96
97 this.localStatus = null;
98 this.remoteStatus = new Dictionary<IPEndPoint, object>();
99 this.currentStatus = new Cache<string, object>(int.MaxValue, TimeSpan.MaxValue, TimeSpan.FromSeconds(30), true);
100 this.currentStatus.Removed += this.CurrentStatus_Removed;
101
102 if (Types.TryGetModuleParameter("Scheduler", out object Obj) && Obj is Scheduler Scheduler)
103 {
104 this.scheduler = Scheduler;
105 this.internalScheduler = false;
106 }
107 else
108 {
109 this.scheduler = new Scheduler();
110 this.internalScheduler = true;
111 }
112
113#if WINDOWS_UWP
114 foreach (HostName HostName in NetworkInformation.GetHostNames())
115 {
116 if (HostName.IPInformation is null)
117 continue;
118
119 foreach (ConnectionProfile Profile in NetworkInformation.GetConnectionProfiles())
120 {
121 if (Profile.GetNetworkConnectivityLevel() == NetworkConnectivityLevel.None)
122 continue;
123
124 if (Profile.NetworkAdapter.NetworkAdapterId != HostName.IPInformation.NetworkAdapter.NetworkAdapterId)
125 continue;
126
127 if (!IPAddress.TryParse(HostName.CanonicalName, out IPAddress Address))
128 continue;
129
130 AddressFamily AddressFamily = Address.AddressFamily;
131 bool IsLoopback = IPAddress.IsLoopback(Address);
132#else
133 foreach (NetworkInterface Interface in NetworkInterface.GetAllNetworkInterfaces())
134 {
135 if (Interface.OperationalStatus != OperationalStatus.Up)
136 continue;
137
138 switch (Interface.NetworkInterfaceType)
139 {
140 case NetworkInterfaceType.Loopback:
141 continue;
142 }
143
144 IPInterfaceProperties Properties = Interface.GetIPProperties();
145
146 foreach (UnicastIPAddressInformation UnicastAddress in Properties.UnicastAddresses)
147 {
148 IPAddress Address = UnicastAddress.Address;
149 AddressFamily AddressFamily = Address.AddressFamily;
150 bool IsLoopback = Interface.NetworkInterfaceType == NetworkInterfaceType.Loopback;
151#endif
152 if (Address.AddressFamily != MulticastAddress.AddressFamily)
153 continue;
154
155 if (!IsLoopback)
156 {
157 Client = null;
158
159 try
160 {
161 Client = new UdpClient(AddressFamily)
162 {
163 //DontFragment = true,
164 ExclusiveAddressUse = true,
165 MulticastLoopback = false,
166 EnableBroadcast = true,
167 Ttl = 30
168 };
169
170 Client.Client.Bind(new IPEndPoint(Address, 0));
171
172 try
173 {
174 Client.JoinMulticastGroup(MulticastAddress);
175 }
176 catch (Exception ex)
177 {
178 Log.Exception(ex);
179 }
180 }
181 catch (NotSupportedException)
182 {
183 Client?.Dispose();
184 continue;
185 }
186 catch (Exception ex)
187 {
188 Client?.Dispose();
189 Log.Exception(ex);
190 continue;
191 }
192
193 ClusterUdpClient = new ClusterUdpClient(this, Client, Address);
194 ClusterUdpClient.BeginReceive();
195
196 this.outgoing.AddLast(ClusterUdpClient);
197
198 Client = null;
199
200 try
201 {
202 Client = new UdpClient(AddressFamily)
203 {
204 ExclusiveAddressUse = false,
205 };
206
207 Client.Client.Bind(new IPEndPoint(Address, Port));
208 }
209 catch (NotSupportedException)
210 {
211 Client?.Dispose();
212 continue;
213 }
214 catch (Exception ex)
215 {
216 Client?.Dispose();
217 Log.Exception(ex);
218 continue;
219 }
220
221 ClusterUdpClient = new ClusterUdpClient(this, Client, Address);
222 ClusterUdpClient.BeginReceive();
223
224 this.incoming.AddLast(ClusterUdpClient);
225 }
226 }
227 }
228
229 try
230 {
231 Client = new UdpClient(Port, MulticastAddress.AddressFamily)
232 {
233 MulticastLoopback = false
234 };
235
236 Client.JoinMulticastGroup(MulticastAddress);
237
238 ClusterUdpClient = new ClusterUdpClient(this, Client, null);
239 ClusterUdpClient.BeginReceive();
240
241 this.incoming.AddLast(ClusterUdpClient);
242 }
243 catch (Exception)
244 {
245 // Ignore
246 }
247
248 this.aliveTimer = new Timer(this.SendAlive, null, 0, 5000);
249 }
250
254 public IEnumerable<IPEndPoint> Endpoints
255 {
256 get
257 {
258 LinkedList<IPEndPoint> Result = new LinkedList<IPEndPoint>();
259
260 foreach (ClusterUdpClient Client in this.incoming)
261 Result.AddLast(Client.EndPoint);
262
263 return Result;
264 }
265 }
266
270 [Obsolete("Use DisposeAsync")]
271 public async void Dispose()
272 {
273 try
274 {
275 await this.DisposeAsync();
276 }
277 catch (Exception ex)
278 {
279 Log.Exception(ex);
280 }
281 }
282
286 public async Task DisposeAsync()
287 {
288 if (!this.shuttingDown)
289 {
290 LinkedList<string> ToRelease = null;
291
292 lock (this.lockedResources)
293 {
294 foreach (LockInfo Info in this.lockedResources.Values)
295 {
296 if (Info.Locked)
297 {
298 if (ToRelease is null)
299 ToRelease = new LinkedList<string>();
300
301 ToRelease.AddLast(Info.Resource);
302 }
303 }
304
305 this.lockedResources.Clear();
306 }
307
308 if (!(ToRelease is null))
309 {
310 foreach (string Resource in ToRelease)
311 {
312 await this.SendMessageAcknowledged(new Release()
313 {
314 Resource = Resource
315 }, null, null);
316 }
317 }
318
319 this.shuttingDown = true;
320 await this.SendMessageUnacknowledged(new ShuttingDown());
321
322 this.shutDown?.WaitOne(2000);
323 this.shutDown?.Dispose();
324 this.shutDown = null;
325
326 this.aes?.Dispose();
327 this.aes = null;
328 }
329 }
330
331 internal void Dispose2()
332 {
333 if (this.internalScheduler)
334 this.scheduler?.Dispose();
335
336 this.scheduler = null;
337
338 this.aliveTimer?.Dispose();
339 this.aliveTimer = null;
340
341 this.Clear(this.outgoing);
342 this.Clear(this.incoming);
343
344 this.currentStatus?.Dispose();
345 this.currentStatus = null;
346
347 foreach (ISniffer Sniffer in this.Sniffers)
348 {
349 if (Sniffer is IDisposable Disposable)
350 {
351 try
352 {
353 Disposable.Dispose();
354 }
355 catch (Exception ex)
356 {
357 Log.Exception(ex);
358 }
359 }
360 }
361
362 this.shutDown?.Set();
363 }
364
365 private void Clear(LinkedList<ClusterUdpClient> Clients)
366 {
367 foreach (ClusterUdpClient Client in Clients)
368 {
369 try
370 {
371 Client.Dispose();
372 }
373 catch (Exception)
374 {
375 // Ignore
376 }
377 }
378
379 Clients.Clear();
380 }
381
382 internal static ObjectInfo GetObjectInfo(Type T)
383 {
384 lock (objectInfo)
385 {
386 if (objectInfo.TryGetValue(T, out ObjectInfo Result))
387 return Result;
388 }
389
390 List<PropertyReference> Properties = new List<PropertyReference>();
391
392 foreach (PropertyInfo PI in T.GetRuntimeProperties())
393 {
394 if (PI.GetMethod is null || PI.SetMethod is null)
395 continue;
396
397 Type PT = PI.PropertyType;
398 IProperty Property = GetProperty(PT);
399
400 if (Property is null)
401 continue;
402
403 Properties.Add(new PropertyReference()
404 {
405 Name = PI.Name,
406 Info = PI,
408 });
409 }
410
411 return new ObjectInfo()
412 {
413 Type = T,
414 Properties = Properties.ToArray()
415 };
416 }
417
418 internal static IProperty GetProperty(Type PT)
419 {
420 if (PT.IsArray)
421 {
422 Type ElementType = PT.GetElementType();
423 return new ArrayProperty(PT, ElementType, GetProperty(ElementType));
424 }
425
426 TypeInfo PTI = PT.GetTypeInfo();
427
428 if (PTI.IsEnum)
429 return new EnumProperty(PT);
430 else if (PTI.IsGenericType && PT.GetGenericTypeDefinition() == typeof(Nullable<>))
431 {
432 Type ElementType = PT.GenericTypeArguments[0];
433 IProperty Element = GetProperty(ElementType);
434
435 return new NullableProperty(PT, ElementType, Element);
436 }
437 else if (propertyTypes.TryGetValue(PT, out IProperty Property))
438 return Property;
439 else if (!PTI.IsValueType)
440 return new ObjectProperty(PT, GetObjectInfo(PT));
441 else
442 return null;
443 }
444
445 private static void Init()
446 {
447 Dictionary<Type, IProperty> PropertyTypes = new Dictionary<Type, IProperty>();
448
449 foreach (Type T in Types.GetTypesImplementingInterface(typeof(IProperty)))
450 {
451 ConstructorInfo DefaultConstructor = Types.GetDefaultConstructor(T);
452 if (DefaultConstructor is null)
453 continue;
454
455 try
456 {
457 IProperty Property = (IProperty)DefaultConstructor.Invoke(Types.NoParameters);
458 Type PT = Property.PropertyType;
459 if (PT is null)
460 continue;
461
462 if (PropertyTypes.ContainsKey(PT))
463 {
464 Log.Error("Multiple classes available for property type " + PT.FullName + ".");
465 continue;
466 }
467
468 PropertyTypes[PT] = Property;
469 }
470 catch (Exception)
471 {
472 continue;
473 }
474 }
475
476 if (propertyTypes is null)
477 Types.OnInvalidated += (Sender, e) => Init();
478
479 propertyTypes = PropertyTypes;
480 }
481
487 public byte[] Serialize(object Object)
488 {
489 using (Serializer Output = new Serializer())
490 {
491 rootObject.Serialize(Output, Object);
492 return Output.ToArray();
493 }
494 }
495
502 public object Deserialize(byte[] Data)
503 {
504 using (Deserializer Input = new Deserializer(Data))
505 {
506 return rootObject.Deserialize(Input, typeof(object));
507 }
508 }
509
510 internal async void DataReceived(byte[] Data, IPEndPoint From)
511 {
512 try
513 {
514 if (Data.Length == 0)
515 return;
516
517 await this.ReceiveBinary(Data);
518
519 using (Deserializer Input = new Deserializer(Data))
520 {
521 byte Command = Input.ReadByte();
522
523 switch (Command)
524 {
525 case 0: // Unacknowledged message
526 object Object = rootObject.Deserialize(Input, typeof(object));
527 if (Object is IClusterMessage Message)
528 {
529 await Message.MessageReceived(this, From);
530
531 await this.OnMessageReceived.Raise(this, new ClusterMessageEventArgs(Message));
532 }
533 else
534 await this.Error("Non-message object received in message: " + Object?.GetType()?.FullName);
535 break;
536
537 case 1: // Acknowledged message
538 Guid Id = Input.ReadGuid();
539 string s = Id.ToString();
540 ulong Len = Input.ReadVarUInt64();
541 bool Skip = false;
542
543 while (Len > 0)
544 {
545 string Address = new IPAddress(Input.ReadBinary()).ToString();
546 int Port = Input.ReadUInt16();
547
548 if (this.IsEndpoint(this.outgoing, Address, Port) ||
549 this.IsEndpoint(this.incoming, Address, Port))
550 {
551 Skip = true;
552 break;
553 }
554
555 Len--;
556 }
557
558 if (Skip)
559 break;
560
561 if (!this.currentStatus.TryGetValue(s, out object Obj) ||
562 !(Obj is bool Ack))
563 {
564 Object = rootObject.Deserialize(Input, typeof(object));
565 if (!((Message = Object as IClusterMessage) is null))
566 {
567 try
568 {
569 Ack = await Message.MessageReceived(this, From);
570
571 await this.OnMessageReceived.Raise(this, new ClusterMessageEventArgs(Message));
572 }
573 catch (Exception ex)
574 {
575 await this.Exception(ex);
576 Log.Exception(ex);
577 Ack = false;
578 }
579
580 this.currentStatus[s] = Ack;
581 }
582 else
583 {
584 await this.Error("Non-message object received in message: " + Object?.GetType()?.FullName);
585 Ack = false;
586 }
587 }
588
589 using (Serializer Output = new Serializer())
590 {
591 Output.WriteByte(Ack ? (byte)2 : (byte)3);
592 Output.WriteGuid(Id);
593
594 await this.Transmit(Output.ToArray(), From);
595 }
596 break;
597
598 case 2: // ACK
599 case 3: // NACK
600 Id = Input.ReadGuid();
601 s = Id.ToString();
602
603 if (this.currentStatus.TryGetValue(s, out Obj) &&
604 Obj is MessageStatus MessageStatus)
605 {
606 lock (MessageStatus.Acknowledged)
607 {
608 MessageStatus.Acknowledged[From] = (Command == 2);
609 }
610
611 EndpointStatus[] CurrentStatus = this.GetRemoteStatuses();
612
613 if (MessageStatus.IsComplete(CurrentStatus))
614 {
615 this.currentStatus.Remove(s);
616 this.scheduler.Remove(MessageStatus.Timeout);
617
618 await MessageStatus.Callback.Raise(this, new ClusterMessageAckEventArgs(
619 MessageStatus.Message, MessageStatus.GetResponses(CurrentStatus),
620 MessageStatus.State));
621 }
622 }
623 break;
624
625 case 4: // Command
626 Id = Input.ReadGuid();
627 s = Id.ToString();
628 Len = Input.ReadVarUInt64();
629 Skip = false;
630
631 while (Len > 0)
632 {
633 string Address = new IPAddress(Input.ReadBinary()).ToString();
634 int Port = Input.ReadUInt16();
635
636 if (this.IsEndpoint(this.outgoing, Address, Port) ||
637 this.IsEndpoint(this.incoming, Address, Port))
638 {
639 Skip = true;
640 break;
641 }
642
643 Len--;
644 }
645
646 if (Skip)
647 break;
648
649 if (!this.currentStatus.TryGetValue(s, out Obj))
650 {
651 Object = rootObject.Deserialize(Input, typeof(object));
652 if (Object is IClusterCommand ClusterCommand)
653 {
654 try
655 {
656 Obj = await ClusterCommand.Execute(this, From);
657 this.currentStatus[s] = Obj;
658 }
659 catch (Exception ex)
660 {
661 Obj = ex;
662 }
663 }
664 else
665 Obj = new Exception("Non-command object received in command: " + Object?.GetType()?.FullName);
666 }
667
668 using (Serializer Output = new Serializer())
669 {
670 if (Obj is Exception ex)
671 {
672 ex = Log.UnnestException(ex);
673
674 Output.WriteByte(6);
675 Output.WriteGuid(Id);
676 Output.WriteString(ex.Message);
677 Output.WriteString(ex.GetType().FullName);
678 }
679 else
680 {
681 Output.WriteByte(5);
682 Output.WriteGuid(Id);
683 rootObject.Serialize(Output, Obj);
684 }
685
686 await this.Transmit(Output.ToArray(), From);
687 }
688 break;
689
690 case 5: // Command Response
691 Id = Input.ReadGuid();
692 s = Id.ToString();
693
694 if (this.currentStatus.TryGetValue(s, out Obj) &&
695 Obj is CommandStatusBase CommandStatus)
696 {
697 Object = rootObject.Deserialize(Input, typeof(object));
698
699 CommandStatus.AddResponse(From, Object);
700
701 EndpointStatus[] CurrentStatus = this.GetRemoteStatuses();
702
703 if (CommandStatus.IsComplete(CurrentStatus))
704 {
705 this.currentStatus.Remove(s);
706 this.scheduler.Remove(CommandStatus.Timeout);
707
708 await CommandStatus.RaiseResponseEvent(CurrentStatus);
709 }
710 }
711 break;
712
713 case 6: // Command Exception
714 Id = Input.ReadGuid();
715 s = Id.ToString();
716
717 if (this.currentStatus.TryGetValue(s, out Obj) &&
718 Obj is CommandStatusBase CommandStatus2)
719 {
720 string ExceptionMessage = Input.ReadString();
721 string ExceptionType = Input.ReadString();
722 Exception ex;
723
724 try
725 {
726 Type T = Types.GetType(ExceptionType);
727 if (T is null)
728 ex = new Exception(ExceptionMessage);
729 else
730 ex = (Exception)Activator.CreateInstance(T, ExceptionMessage);
731 }
732 catch (Exception)
733 {
734 ex = new Exception(ExceptionMessage);
735 }
736
737 CommandStatus2.AddError(From, ex);
738
739 EndpointStatus[] CurrentStatus = this.GetRemoteStatuses();
740
741 if (CommandStatus2.IsComplete(CurrentStatus))
742 {
743 this.currentStatus.Remove(s);
744 this.scheduler.Remove(CommandStatus2.Timeout);
745
746 await CommandStatus2.RaiseResponseEvent(CurrentStatus);
747 }
748 }
749 break;
750 }
751 }
752 }
753 catch (Exception ex)
754 {
755 await this.Exception(ex);
756 Log.Exception(ex);
757 }
758 }
759
760 private bool IsEndpoint(LinkedList<ClusterUdpClient> Clients, string Address, int Port)
761 {
762 foreach (ClusterUdpClient Client in Clients)
763 {
764 if (Client.IsEndpoint(Address, Port))
765 return true;
766 }
767
768 return false;
769 }
770
774 public event EventHandlerAsync<ClusterMessageEventArgs> OnMessageReceived = null;
775
776 private async Task Transmit(byte[] Message, IPEndPoint Destination)
777 {
778 await this.TransmitBinary(Message);
779
780 foreach (ClusterUdpClient Client in this.outgoing)
781 Client.BeginTransmit(Message, Destination);
782 }
783
789 {
790 Serializer Output = new Serializer();
791
792 try
793 {
794 Output.WriteByte(0); // Unacknowledged message.
795 rootObject.Serialize(Output, Message);
796 await this.Transmit(Output.ToArray(), this.destination);
797 }
798 catch (Exception ex)
799 {
800 Log.Exception(ex);
801
802 if (this.shuttingDown)
803 this.Dispose2();
804 }
805 finally
806 {
807 Output.Dispose();
808 }
809 }
810
817 public async Task SendMessageAcknowledged(IClusterMessage Message,
818 EventHandlerAsync<ClusterMessageAckEventArgs> Callback, object State)
819 {
820 Serializer Output = new Serializer();
821 Guid Id = Guid.NewGuid();
822 string s = Id.ToString();
823 MessageStatus Rec = null;
824
825 try
826 {
827 Output.WriteByte(1); // Acknowledged message.
828 Output.WriteGuid(Id);
829 Output.WriteVarUInt64(0); // Endpoints to skip
830 rootObject.Serialize(Output, Message);
831
832 byte[] Bin = Output.ToArray();
833 DateTime Now = DateTime.Now;
834
835 Rec = new MessageStatus()
836 {
837 Id = Id,
838 Message = Message,
839 MessageBinary = Bin,
840 Callback = Callback,
841 State = State,
842 TimeLimit = Now.AddSeconds(30)
843 };
844
845 this.currentStatus[s] = Rec;
846
847 Rec.Timeout = this.scheduler.Add(Now.AddSeconds(2), this.ResendAcknowledgedMessage, Rec);
848
849 await this.Transmit(Bin, this.destination);
850
851 EndpointStatus[] CurrentStatus = this.GetRemoteStatuses();
852
853 if (Rec.IsComplete(CurrentStatus))
854 {
855 this.currentStatus.Remove(Rec.Id.ToString());
856 this.scheduler.Remove(Rec.Timeout);
857
858 await Rec.Callback.Raise(this, new ClusterMessageAckEventArgs(
859 Rec.Message, Rec.GetResponses(CurrentStatus), Rec.State));
860 }
861 }
862 catch (Exception ex)
863 {
864 Log.Exception(ex);
865
866 if (this.shuttingDown)
867 this.Dispose2();
868 }
869 finally
870 {
871 Output.Dispose();
872 }
873 }
874
875 private async Task ResendAcknowledgedMessage(object P)
876 {
877 MessageStatus Rec = (MessageStatus)P;
878
879 try
880 {
881 EndpointStatus[] CurrentStatus = this.GetRemoteStatuses();
882 DateTime Now = DateTime.Now;
883
884 if (Rec.IsComplete(CurrentStatus) || Now >= Rec.TimeLimit)
885 {
886 this.currentStatus.Remove(Rec.Id.ToString());
887 this.scheduler.Remove(Rec.Timeout);
888
889 await Rec.Callback.Raise(this, new ClusterMessageAckEventArgs(
890 Rec.Message, Rec.GetResponses(CurrentStatus), Rec.State));
891 }
892 else
893 {
894 using (Serializer Output = new Serializer())
895 {
896 Output.WriteByte(1); // Acknowledged message.
897 Output.WriteGuid(Rec.Id);
898
899 lock (Rec.Acknowledged)
900 {
901 Output.WriteVarUInt64((uint)Rec.Acknowledged.Count); // Endpoints to skip
902
903 foreach (IPEndPoint Endpoint in Rec.Acknowledged.Keys)
904 {
905 Output.WriteBinary(Endpoint.Address.GetAddressBytes());
906 Output.WriteUInt16((ushort)Endpoint.Port);
907 }
908 }
909
910 Output.WriteRaw(Rec.MessageBinary, 18, Rec.MessageBinary.Length - 18);
911
912 Rec.Timeout = this.scheduler.Add(Now.AddSeconds(2), this.ResendAcknowledgedMessage, Rec);
913 await this.Transmit(Output.ToArray(), this.destination);
914 }
915 }
916 }
917 catch (Exception ex)
918 {
919 Log.Exception(ex);
920 }
921 }
922
927 public async Task<EndpointAcknowledgement[]> SendMessageAcknowledgedAsync(IClusterMessage Message)
928 {
929 TaskCompletionSource<EndpointAcknowledgement[]> Result = new TaskCompletionSource<EndpointAcknowledgement[]>();
930
931 await this.SendMessageAcknowledged(Message, (Sender, e) =>
932 {
933 Result.TrySetResult(e.Responses);
934 return Task.CompletedTask;
935 }, null);
936
937 return await Result.Task;
938 }
939
946 public async Task SendMessageAssured(IClusterMessage Message,
947 EventHandlerAsync<ClusterMessageAckEventArgs> Callback, object State)
948 {
949 Guid MessageId = Guid.NewGuid();
950
951 await this.SendMessageAcknowledged(new Transport()
952 {
953 MessageID = MessageId,
954 Message = Message
955 }, async (Sender, e) =>
956 {
957 await this.SendMessageAcknowledged(new Deliver()
958 {
959 MessageID = MessageId
960 }, async (sender2, e2) =>
961 {
962 await Callback.Raise(this, new ClusterMessageAckEventArgs(Message, e2.Responses, State));
963 }, null);
964 }, null);
965 }
966
971 public async Task<EndpointAcknowledgement[]> SendMessageAssuredAsync(IClusterMessage Message)
972 {
973 TaskCompletionSource<EndpointAcknowledgement[]> Result = new TaskCompletionSource<EndpointAcknowledgement[]>();
974
975 await this.SendMessageAssured(Message, (Sender, e) =>
976 {
977 Result.TrySetResult(e.Responses);
978 return Task.CompletedTask;
979 }, null);
980
981 return await Result.Task;
982 }
983
984 private async void SendAlive(object _)
985 {
986 try
987 {
989
990 await this.GetStatus.Raise(this, e);
991
992 this.localStatus = e.Status;
993
994 await this.SendMessageUnacknowledged(new Alive()
995 {
996 Status = this.localStatus
997 });
998 }
999 catch (Exception ex)
1000 {
1001 Log.Exception(ex);
1002 }
1003 }
1004
1008 public event EventHandlerAsync<ClusterGetStatusEventArgs> GetStatus = null;
1009
1013 public object LocalStatus => this.localStatus;
1014
1021 {
1022 EndpointStatus[] Result;
1023 int i, c;
1024
1025 lock (this.remoteStatus)
1026 {
1027 Result = new EndpointStatus[c = this.remoteStatus.Count];
1028
1029 i = 0;
1030 foreach (KeyValuePair<IPEndPoint, object> P in this.remoteStatus)
1031 Result[i++] = new EndpointStatus(P.Key, P.Value);
1032 }
1033
1034 return Result;
1035 }
1036
1044 public async void AddRemoteStatus(IPEndPoint Endpoint, object Status)
1045 {
1046 bool New;
1047
1048 lock (this.remoteStatus)
1049 {
1050 New = !this.remoteStatus.ContainsKey(Endpoint);
1051 this.remoteStatus[Endpoint] = Status;
1052 }
1053
1054 if (New)
1055 await this.EndpointOnline.Raise(this, new ClusterEndpointEventArgs(Endpoint));
1056
1057 await this.EndpointStatus.Raise(this, new ClusterEndpointStatusEventArgs(Endpoint, Status));
1058 }
1059
1065 public async Task<bool> RemoveRemoteStatus(IPEndPoint Endpoint)
1066 {
1067 bool Removed;
1068
1069 lock (this.remoteStatus)
1070 {
1071 Removed = this.remoteStatus.Remove(Endpoint);
1072 }
1073
1074 if (Removed)
1075 await this.EndpointOffline.Raise(this, new ClusterEndpointEventArgs(Endpoint));
1076
1077 return Removed;
1078 }
1079
1083 public event EventHandlerAsync<ClusterEndpointEventArgs> EndpointOnline = null;
1084
1088 public event EventHandlerAsync<ClusterEndpointEventArgs> EndpointOffline = null;
1089
1093 public event EventHandlerAsync<ClusterEndpointStatusEventArgs> EndpointStatus = null;
1094
1095 internal void StatusReported(object Status, IPEndPoint RemoteEndpoint)
1096 {
1097 this.AddRemoteStatus(RemoteEndpoint, Status);
1098
1099 string s = RemoteEndpoint.ToString();
1100
1101 if (!this.currentStatus.ContainsKey(s))
1102 this.currentStatus[s] = RemoteEndpoint;
1103 }
1104
1105 internal Task EndpointShutDown(IPEndPoint RemoteEndpoint)
1106 {
1107 return this.RemoveRemoteStatus(RemoteEndpoint);
1108 }
1109
1110 internal void AssuredTransport(Guid MessageId, IClusterMessage Message)
1111 {
1112 this.currentStatus[MessageId.ToString()] = Message;
1113 }
1114
1115 internal async Task<bool> AssuredDelivery(Guid MessageId, ClusterEndpoint Endpoint, IPEndPoint RemoteEndpoint)
1116 {
1117 string s = MessageId.ToString();
1118
1119 if (this.currentStatus.TryGetValue(s, out object Obj))
1120 {
1121 if (Obj is bool b)
1122 return b;
1123 else if (Obj is IClusterMessage Message)
1124 {
1125 this.currentStatus[s] = false;
1126 b = await Message.MessageReceived(Endpoint, RemoteEndpoint);
1127 this.currentStatus[s] = b;
1128
1129 await this.OnMessageReceived.Raise(this, new ClusterMessageEventArgs(Message));
1130
1131 return b;
1132 }
1133 else
1134 return false;
1135 }
1136 else
1137 return false;
1138 }
1139
1140 private async Task CurrentStatus_Removed(object Sender, CacheItemEventArgs<string, object> e)
1141 {
1142 if (e.Value is IPEndPoint RemoteEndpoint)
1143 await this.RemoveRemoteStatus(RemoteEndpoint);
1144 else if (e.Value is MessageStatus MessageStatus)
1145 {
1146 if (e.Reason != RemovedReason.Manual)
1147 {
1148 this.scheduler.Remove(MessageStatus.Timeout);
1149
1150 await MessageStatus.Callback.Raise(this, new ClusterMessageAckEventArgs(
1151 MessageStatus.Message, MessageStatus.GetResponses(this.GetRemoteStatuses()),
1152 MessageStatus.State));
1153 }
1154 }
1155 }
1156
1162 public Task Ping(EventHandlerAsync<ClusterMessageAckEventArgs> Callback, object State)
1163 {
1164 return this.SendMessageAcknowledged(new Messages.Ping(), Callback, State);
1165 }
1166
1170 public async Task<EndpointAcknowledgement[]> PingAsync()
1171 {
1172 TaskCompletionSource<EndpointAcknowledgement[]> Result = new TaskCompletionSource<EndpointAcknowledgement[]>();
1173
1174 await this.Ping((Sender, e) =>
1175 {
1176 Result.TrySetResult(e.Responses);
1177 return Task.CompletedTask;
1178 }, null);
1179
1180 return await Result.Task;
1181 }
1182
1194 {
1195 Serializer Output = new Serializer();
1196 Guid Id = Guid.NewGuid();
1197 string s = Id.ToString();
1198 CommandStatus<ResponseType> Rec = null;
1199
1200 try
1201 {
1202 Output.WriteByte(4); // Command.
1203 Output.WriteGuid(Id);
1204 Output.WriteVarUInt64(0); // Endpoints to skip
1205 rootObject.Serialize(Output, Command);
1206
1207 byte[] Bin = Output.ToArray();
1208 DateTime Now = DateTime.Now;
1209
1210 Rec = new CommandStatus<ResponseType>()
1211 {
1212 Id = Id,
1213 Command = Command,
1214 CommandBinary = Bin,
1215 TimeLimit = Now.AddSeconds(30),
1216 Callback = Callback,
1217 State = State
1218 };
1219
1220 this.currentStatus[s] = Rec;
1221
1222 Rec.Timeout = this.scheduler.Add(Now.AddSeconds(2), this.ResendCommand<ResponseType>, Rec);
1223
1224 await this.Transmit(Bin, this.destination);
1225
1226 EndpointStatus[] CurrentStatus = this.GetRemoteStatuses();
1227
1228 if (Rec.IsComplete(CurrentStatus))
1229 {
1230 this.currentStatus.Remove(Rec.Id.ToString());
1231 this.scheduler.Remove(Rec.Timeout);
1232
1233 await Rec.Callback.Raise(this, new ClusterResponseEventArgs<ResponseType>(
1234 Rec.Command, Rec.GetResponses(CurrentStatus), Rec.State));
1235 }
1236 }
1237 catch (Exception ex)
1238 {
1239 Log.Exception(ex);
1240
1241 if (this.shuttingDown)
1242 this.Dispose2();
1243 }
1244 finally
1245 {
1246 Output.Dispose();
1247 }
1248 }
1249
1250 private async Task ResendCommand<ResponseType>(object P)
1251 {
1252 CommandStatus<ResponseType> Rec = (CommandStatus<ResponseType>)P;
1253 DateTime Now = DateTime.Now;
1254
1255 try
1256 {
1257 EndpointStatus[] CurrentStatus = this.GetRemoteStatuses();
1258
1259 if (Rec.IsComplete(CurrentStatus) || Now >= Rec.TimeLimit)
1260 {
1261 this.currentStatus.Remove(Rec.Id.ToString());
1262 this.scheduler.Remove(Rec.Timeout);
1263
1264 await Rec.Callback.Raise(this, new ClusterResponseEventArgs<ResponseType>(
1265 Rec.Command, Rec.GetResponses(CurrentStatus), Rec.State));
1266 }
1267 else
1268 {
1269 using (Serializer Output = new Serializer())
1270 {
1271 Output.WriteByte(4); // Command.
1272 Output.WriteGuid(Rec.Id);
1273
1274 lock (Rec.Responses)
1275 {
1276 Output.WriteVarUInt64((uint)Rec.Responses.Count); // Endpoints to skip
1277
1278 foreach (IPEndPoint Endpoint in Rec.Responses.Keys)
1279 {
1280 Output.WriteBinary(Endpoint.Address.GetAddressBytes());
1281 Output.WriteUInt16((ushort)Endpoint.Port);
1282 }
1283 }
1284
1285 Output.WriteRaw(Rec.CommandBinary, 18, Rec.CommandBinary.Length - 18);
1286
1287 Rec.Timeout = this.scheduler.Add(Now.AddSeconds(2), this.ResendCommand<ResponseType>, Rec);
1288 await this.Transmit(Output.ToArray(), this.destination);
1289 }
1290 }
1291 }
1292 catch (Exception ex)
1293 {
1294 Log.Exception(ex);
1295 }
1296 }
1297
1306 public async Task<EndpointResponse<ResponseType>[]> ExecuteCommandAsync<ResponseType>(IClusterCommand Command)
1307 {
1308 TaskCompletionSource<EndpointResponse<ResponseType>[]> Result =
1309 new TaskCompletionSource<EndpointResponse<ResponseType>[]>();
1310
1311 await this.ExecuteCommand<ResponseType>(Command, (Sender, e) =>
1312 {
1313 Result.TrySetResult(e.Responses);
1314 return Task.CompletedTask;
1315 }, null);
1316
1317 return await Result.Task;
1318 }
1319
1326 public Task Echo(string Text, EventHandlerAsync<ClusterResponseEventArgs<string>> Callback, object State)
1327 {
1328 return this.ExecuteCommand(new Echo()
1329 {
1330 Text = Text
1331 }, Callback, State);
1332 }
1333
1338 public async Task<EndpointResponse<string>[]> EchoAsync(string Text)
1339 {
1340 TaskCompletionSource<EndpointResponse<string>[]> Result = new TaskCompletionSource<EndpointResponse<string>[]>();
1341
1342 await this.Echo(Text, (Sender, e) =>
1343 {
1344 Result.TrySetResult(e.Responses);
1345 return Task.CompletedTask;
1346 }, null);
1347
1348 return await Result.Task;
1349 }
1350
1356 public Task GetAssemblies(EventHandlerAsync<ClusterResponseEventArgs<string[]>> Callback, object State)
1357 {
1358 return this.ExecuteCommand(new Assemblies(), Callback, State);
1359 }
1360
1364 public async Task<EndpointResponse<string[]>[]> GetAssembliesAsync()
1365 {
1366 TaskCompletionSource<EndpointResponse<string[]>[]> Result = new TaskCompletionSource<EndpointResponse<string[]>[]>();
1367
1368 await this.GetAssemblies((Sender, e) =>
1369 {
1370 Result.TrySetResult(e.Responses);
1371 return Task.CompletedTask;
1372 }, null);
1373
1374 return await Result.Task;
1375 }
1376
1384 public Task Lock(string ResourceName, int TimeoutMilliseconds,
1385 EventHandlerAsync<ClusterResourceLockEventArgs> Callback, object State)
1386 {
1387 LockInfo Info;
1388 LockInfoRec InfoRec;
1389
1390 lock (this.lockedResources)
1391 {
1392 if (!this.lockedResources.TryGetValue(ResourceName, out Info))
1393 {
1394 Info = new LockInfo()
1395 {
1396 Resource = ResourceName,
1397 Locked = false
1398 };
1399
1400 this.lockedResources[ResourceName] = Info;
1401 }
1402
1403 Info.Queue.AddLast(InfoRec = new LockInfoRec()
1404 {
1405 Info = Info,
1406 Timeout = DateTime.Now.AddMilliseconds(TimeoutMilliseconds),
1407 Callback = Callback,
1408 State = State
1409 });
1410 }
1411
1412 if (Info.Locked)
1413 return this.LockResult(false, null, InfoRec);
1414 else
1415 return this.Lock(Info, InfoRec);
1416 }
1417
1418 private Task Lock(LockInfo Info, LockInfoRec InfoRec)
1419 {
1420 return this.SendMessageAcknowledged(new Lock()
1421 {
1422 Resource = Info.Resource
1423 }, (Sender, e) =>
1424 {
1425 IPEndPoint LockedBy = null;
1426
1427 foreach (EndpointAcknowledgement Response in e.Responses)
1428 {
1429 if (Response.ACK.HasValue && !Response.ACK.Value)
1430 {
1431 LockedBy = Response.Endpoint;
1432 break;
1433 }
1434 }
1435
1436 bool Ok;
1437
1438 lock (this.lockedResources)
1439 {
1440 if (Info.Locked)
1441 Ok = false;
1442 else
1443 Ok = LockedBy is null;
1444
1445 if (Ok)
1446 {
1447 Info.Locked = true;
1448 Info.Queue.Remove(InfoRec);
1449 }
1450 else if (InfoRec.Timeout <= DateTime.Now)
1451 {
1452 Info.Queue.Remove(InfoRec);
1453
1454 if (!Info.Locked && Info.Queue.First is null)
1455 this.lockedResources.Remove(Info.Resource);
1456 }
1457 }
1458
1459 return this.LockResult(Ok, LockedBy, InfoRec);
1460
1461 }, null);
1462 }
1463
1464 private async Task LockResult(bool Ok, IPEndPoint LockedBy, LockInfoRec InfoRec)
1465 {
1466 if (Ok)
1467 await this.Raise(InfoRec.Info.Resource, true, null, InfoRec);
1468 else if (DateTime.Now >= InfoRec.Timeout)
1469 await this.Raise(InfoRec.Info.Resource, false, LockedBy, InfoRec);
1470 else if (!InfoRec.TimeoutScheduled)
1471 {
1472 InfoRec.LockedBy = LockedBy;
1473 InfoRec.Timeout = this.scheduler.Add(InfoRec.Timeout, this.LockTimeout, InfoRec);
1474 InfoRec.TimeoutScheduled = true;
1475 }
1476 }
1477
1478 private Task LockTimeout(object P)
1479 {
1480 LockInfoRec InfoRec = (LockInfoRec)P;
1481 LockInfo Info = InfoRec.Info;
1482
1483 InfoRec.TimeoutScheduled = false;
1484
1485 lock (this.lockedResources)
1486 {
1487 Info.Queue.Remove(InfoRec);
1488
1489 if (!Info.Locked && Info.Queue.First is null)
1490 this.lockedResources.Remove(Info.Resource);
1491 }
1492
1493 return this.Raise(Info.Resource, false, InfoRec.LockedBy, InfoRec);
1494 }
1495
1496 private async Task Raise(string ResourceName, bool LockSuccessful, IPEndPoint LockedBy,
1497 LockInfoRec InfoRec)
1498 {
1499 try
1500 {
1501 if (InfoRec.TimeoutScheduled)
1502 {
1503 this.scheduler.Remove(InfoRec.Timeout);
1504 InfoRec.TimeoutScheduled = false;
1505 }
1506
1507 LockInfo Info = InfoRec.Info;
1508
1509 lock (this.lockedResources)
1510 {
1511 Info.Queue.Remove(InfoRec);
1512
1513 if (!Info.Locked && Info.Queue.First is null)
1514 this.lockedResources.Remove(Info.Resource);
1515 }
1516
1517 await InfoRec.Callback.Raise(this, new ClusterResourceLockEventArgs(ResourceName,
1518 LockSuccessful, LockedBy, InfoRec.State));
1519 }
1520 catch (Exception ex)
1521 {
1522 Log.Exception(ex);
1523 }
1524 }
1525
1531 public async Task<ClusterResourceLockEventArgs> LockAsync(string ResourceName, int TimeoutMilliseconds)
1532 {
1533 TaskCompletionSource<ClusterResourceLockEventArgs> Result = new TaskCompletionSource<ClusterResourceLockEventArgs>();
1534
1535 await this.Lock(ResourceName, TimeoutMilliseconds, (Sender, e) =>
1536 {
1537 Result.TrySetResult(e);
1538 return Task.CompletedTask;
1539 }, null);
1540
1541 return await Result.Task;
1542 }
1543
1549 public async Task Release(string ResourceName)
1550 {
1551 LockInfo Info;
1552 LockInfoRec InfoRec;
1553
1554 lock (this.lockedResources)
1555 {
1556 if (!this.lockedResources.TryGetValue(ResourceName, out Info) || !Info.Locked)
1557 throw new ArgumentException("Resource not locked.", nameof(ResourceName));
1558
1559 Info.Locked = false;
1560
1561 if (Info.Queue.First is null)
1562 {
1563 this.lockedResources.Remove(ResourceName);
1564 InfoRec = null;
1565 }
1566 else
1567 InfoRec = Info.Queue.First.Value;
1568 }
1569
1570 await this.SendMessageAcknowledged(new Release()
1571 {
1572 Resource = ResourceName
1573 }, null, null);
1574
1575 if (!(InfoRec is null))
1576 {
1577 this.scheduler.Add(DateTime.Now.AddMilliseconds(this.rnd.Next(50) + 1), (P) =>
1578 {
1579 this.Lock(Info, InfoRec);
1580 }, null);
1581 }
1582 }
1583
1584 internal void Released(string ResourceName)
1585 {
1586 LockInfo Info;
1587 LockInfoRec InfoRec;
1588
1589 lock (this.lockedResources)
1590 {
1591 if (!this.lockedResources.TryGetValue(ResourceName, out Info) || Info.Locked)
1592 return;
1593
1594 if (Info.Queue.First is null)
1595 {
1596 this.lockedResources.Remove(ResourceName);
1597 return;
1598 }
1599 else
1600 InfoRec = Info.Queue.First.Value;
1601 }
1602
1603 this.scheduler.Add(DateTime.Now.AddMilliseconds(this.rnd.Next(50) + 1), (P) =>
1604 {
1605 this.Lock(Info, InfoRec);
1606 }, null);
1607 }
1608
1609 }
1610}
Static class managing the application event log. Applications and services log events on this static ...
Definition: Log.cs:13
static void Exception(Exception Exception, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, params KeyValuePair< string, object >[] Tags)
Logs an exception. Event type will be determined by the severity of the exception.
Definition: Log.cs:1647
static void Error(string Message, string Object, string Actor, string EventId, EventLevel Level, string Facility, string Module, string StackTrace, params KeyValuePair< string, object >[] Tags)
Logs an error event.
Definition: Log.cs:682
static Exception UnnestException(Exception Exception)
Unnests an exception, to extract the relevant inner exception.
Definition: Log.cs:818
Event arguments for cluster endpoint events.
Represents one endpoint (or participant) in the network cluster.
async Task SendMessageAcknowledged(IClusterMessage Message, EventHandlerAsync< ClusterMessageAckEventArgs > Callback, object State)
Sends a message using acknowledged service. ("at least once")
async Task< EndpointAcknowledgement[]> SendMessageAssuredAsync(IClusterMessage Message)
Sends a message using assured service. ("exactly once")
async void Dispose()
IDisposable.Dispose
Task Lock(string ResourceName, int TimeoutMilliseconds, EventHandlerAsync< ClusterResourceLockEventArgs > Callback, object State)
Locks a singleton resource in the cluster.
Task Echo(string Text, EventHandlerAsync< ClusterResponseEventArgs< string > > Callback, object State)
Asks endpoints in the cluster to echo a text string back to the sender.
async Task ExecuteCommand< ResponseType >(IClusterCommand Command, EventHandlerAsync< ClusterResponseEventArgs< ResponseType > > Callback, object State)
Execute a command on the other members of the cluster, and waits for responses to be returned....
IEnumerable< IPEndPoint > Endpoints
IP endpoints listening on.
async Task< ClusterResourceLockEventArgs > LockAsync(string ResourceName, int TimeoutMilliseconds)
Locks a singleton resource in the cluster.
ClusterEndpoint(IPAddress MulticastAddress, int Port, string SharedSecret, params ISniffer[] Sniffers)
Represents one endpoint (or participant) in the network cluster.
EventHandlerAsync< ClusterGetStatusEventArgs > GetStatus
Event raised when current status is needed.
async Task< bool > RemoveRemoteStatus(IPEndPoint Endpoint)
Explicitly removes a remote cluster endpoint status object.
async Task DisposeAsync()
IDisposable.Dispose
object LocalStatus
Local status. For remote endpoint statuses, see GetRemoteStatuses
async Task Release(string ResourceName)
Releases a resource.
EndpointStatus[] GetRemoteStatuses()
Gets current remote statuses. For the current local status, see LocalStatus
EventHandlerAsync< ClusterEndpointEventArgs > EndpointOffline
Event raised when an endpoint goes offline.
object Deserialize(byte[] Data)
Deserializes an object.
EventHandlerAsync< ClusterEndpointStatusEventArgs > EndpointStatus
Event raised when status has been reported by an endpoint.
EventHandlerAsync< ClusterEndpointEventArgs > EndpointOnline
Event raised when a new endpoint is available in the cluster.
byte[] Serialize(object Object)
Serializes an object.
Task GetAssemblies(EventHandlerAsync< ClusterResponseEventArgs< string[]> > Callback, object State)
Asks endpoints in the cluster to return assemblies available in their runtime environment.
async void AddRemoteStatus(IPEndPoint Endpoint, object Status)
Explicitly adds a remote endpoint status object. In normal operation, calling this method is not nece...
async Task< EndpointResponse< string >[]> EchoAsync(string Text)
Asks endpoints in the cluster to echo a text string back to the sender.
EventHandlerAsync< ClusterMessageEventArgs > OnMessageReceived
Event raised when a cluster message has been received.
async Task< EndpointResponse< string[]>[]> GetAssembliesAsync()
Asks endpoints in the cluster to return assemblies available in their runtime environment.
ClusterEndpoint(IPAddress MulticastAddress, int Port, byte[] SharedSecret, params ISniffer[] Sniffers)
Represents one endpoint (or participant) in the network cluster.
Task Ping(EventHandlerAsync< ClusterMessageAckEventArgs > Callback, object State)
Sends an acknowledged ping message to the other servers in the cluster.
async Task< EndpointResponse< ResponseType >[]> ExecuteCommandAsync< ResponseType >(IClusterCommand Command)
Execute a command on the other members of the cluster, and waits for responses to be returned....
async Task SendMessageUnacknowledged(IClusterMessage Message)
Sends an unacknowledged message ("at most once")
async Task< EndpointAcknowledgement[]> SendMessageAcknowledgedAsync(IClusterMessage Message)
Sends a message using acknowledged service.
async Task SendMessageAssured(IClusterMessage Message, EventHandlerAsync< ClusterMessageAckEventArgs > Callback, object State)
Sends a message using assured service. ("exactly once")
async Task< EndpointAcknowledgement[]> PingAsync()
Sends an acknowledged ping message to the other servers in the cluster.
Event arguments for cluster endpoint status events.
Event arguments for cluster get status events.
Event arguments for cluster message acknowledgement events.
Event arguments for cluster message events.
Event arguments for cluster resource lock events.
Event arguments for cluster message acknowledgement events.
Command to request available assembly names from cluster endpoints.
Definition: Assemblies.cs:14
Command echoing incoming text string.
Definition: Echo.cs:13
Contains information about one of the endpoints in the cluster.
Contains information about one of the endpoints in the cluster.
Alive message regularly sent on the network to inform members of the cluster of the status of the cur...
Definition: Alive.cs:12
Delivers a message, as part of an assured message transfer
Definition: Deliver.cs:11
Message used to inform other members that the sending endpoint is trying to lock a resource....
Definition: Lock.cs:14
Ping message that can be sent to test endpoints in cluster receive messages.
Definition: Ping.cs:11
Message used to inform other members that the sending endpoint is releasing a locked resource.
Definition: Release.cs:13
Message sent when an endpoint is shut down, to let the other endpoints know the current endpoint is n...
Definition: ShuttingDown.cs:12
Transports a message, as part of an assured message transfer
Definition: Transport.cs:11
byte[] ReadBinary()
Reads binary data from the input.
Definition: Deserializer.cs:77
ushort ReadUInt16()
Reads a 16-bit unsigned integer from the input.
ulong ReadVarUInt64()
Reads a variable-length unsigned integer from the input.
Definition: Deserializer.cs:56
byte ReadByte()
Reads a byte from the input.
Definition: Deserializer.cs:47
Guid ReadGuid()
Reads a Guid from the input.
string ReadString()
Reads a string from the input.
Definition: Deserializer.cs:99
void WriteRaw(byte[] Binary)
Writes raw binary data to the output.
Definition: Serializer.cs:98
byte[] ToArray()
Returns the binary output.
Definition: Serializer.cs:36
void WriteVarUInt64(ulong Value)
Writes a variable-length unsigned integer to the output.
Definition: Serializer.cs:63
void WriteBinary(byte[] Value)
Writes binary data to the output.
Definition: Serializer.cs:83
void WriteGuid(Guid Value)
Writes a Guid to the output.
Definition: Serializer.cs:320
void WriteString(string Value)
Writes a string to the output.
Definition: Serializer.cs:118
void WriteByte(byte Value)
Writes a byte to the output.
Definition: Serializer.cs:54
void WriteUInt16(ushort Value)
Writes a 16-bit unsigned integer to the output.
Definition: Serializer.cs:202
Simple base class for classes implementing communication protocols.
Task TransmitBinary(byte[] Data)
Called when binary data has been transmitted.
Task Error(string Error)
Called to inform the viewer of an error state.
Task Exception(Exception Exception)
Called to inform the viewer of an exception state.
ISniffer[] Sniffers
Registered sniffers.
Task ReceiveBinary(byte[] Data)
Called when binary data has been received.
Implements an in-memory cache.
Definition: Cache.cs:15
bool ContainsKey(KeyType Key)
Checks if a key is available in the cache.
Definition: Cache.cs:296
void Dispose()
IDisposable.Dispose
Definition: Cache.cs:74
bool Remove(KeyType Key)
Removes an item from the cache.
Definition: Cache.cs:451
bool TryGetValue(KeyType Key, out ValueType Value)
Tries to get a value from the cache.
Definition: Cache.cs:203
Event arguments for cache item removal events.
ValueType Value
Value of item that was removed.
RemovedReason Reason
Reason for removing the item.
Static class that dynamically manages types and interfaces available in the runtime environment.
Definition: Types.cs:14
static Type GetType(string FullName)
Gets a type, given its full name.
Definition: Types.cs:41
static bool TryGetModuleParameter(string Name, out object Value)
Tries to get a module parameter value.
Definition: Types.cs:583
static object[] NoParameters
Contains an empty array of parameter values.
Definition: Types.cs:548
static Type[] GetTypesImplementingInterface(string InterfaceFullName)
Gets all types implementing a given interface.
Definition: Types.cs:84
static ConstructorInfo GetDefaultConstructor(Type Type)
Gets the default constructor of a type, if one exists.
Definition: Types.cs:1630
Class that can be used to schedule events in time. It uses a timer to execute tasks at the appointed ...
Definition: Scheduler.cs:26
bool Remove(DateTime When)
Removes an event scheduled for a given point in time.
Definition: Scheduler.cs:182
void Dispose()
IDisposable.Dispose
Definition: Scheduler.cs:46
DateTime Add(DateTime When, ScheduledEventCallback Callback, object State)
Adds an event.
Definition: Scheduler.cs:66
Contains methods for simple hash calculations.
Definition: Hashes.cs:59
static byte[] ComputeSHA256Hash(byte[] Data)
Computes the SHA-256 hash of a block of binary data.
Definition: Hashes.cs:348
Interface for cluster commands.
Interface for cluster messages.
Interface for sniffers. Sniffers can be added to ICommunicationLayer classes to eavesdrop on communic...
Definition: ISniffer.cs:11
delegate Task EventHandlerAsync(object Sender, EventArgs e)
Asynchronous version of EventArgs.
RemovedReason
Reason for removing the item.