-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConnectionManager.cs
139 lines (124 loc) · 5.43 KB
/
ConnectionManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
using Capnp.Rpc;
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace Mas.Infrastructure.Common
{
public class ConnectionManager : IDisposable
{
private ConcurrentDictionary<string, TcpRpcClient> _connections = new();
private TcpRpcServer _server;
public Restorer Restorer { get; set; }
public ushort Port => (ushort)_server.Port;
public ConnectionManager(Restorer restorer = null)
{
Restorer = restorer;
//Console.WriteLine("ConnectionManager created");
}
public void Dispose() => Dispose(true);
public static string GetLocalIPAddress(string connectToHost = "8.8.8.8", int connectToPort = 53)
{
var localIP = "127.0.0.1";
try
{
using Socket socket = new (AddressFamily.InterNetwork, SocketType.Stream, 0);
socket.Connect(connectToHost, connectToPort);
var endPoint = socket.LocalEndPoint as IPEndPoint;
localIP = endPoint.Address.ToString();
} catch(System.Exception e) {
Console.WriteLine(e.Message);
}
return localIP;
}
protected virtual void Dispose(bool disposing)
{
//Console.WriteLine("Disposing ConnectionManager");
//if(_Connections.Any()) Console.WriteLine("ConnectionManager: Disposing connections");
foreach (var (key, con) in _connections)
{
try
{
con?.Dispose();
}
catch(System.Exception e)
{
Console.WriteLine("Exception thrown while disposing connection (TcpRpcClient): " + key + " Exception: " + e.Message);
}
}
try
{
//Console.WriteLine("ConnectionManager: Disposing Capnp.Rpc.TcpRpcServer");
_server?.Dispose();
}
catch(System.Exception e)
{
Console.WriteLine("Exception thrown while disposing TcpRpcServer. Exception: " + e.Message);
}
}
public async Task<TRemoteInterface> Connect<TRemoteInterface>(string sturdyRef) where TRemoteInterface : class, IDisposable
{
// we assume that a sturdy ref url looks always like
// capnp://vat-id_base64-curve25519-public-key@host:port/sturdy-ref-token
if (!sturdyRef.StartsWith("capnp://")) return null;
var vatIdBase64Url = "";
var addressPort = "";
var address = "";
var port = 0;
var srToken = "";
var rest = sturdyRef[8..];
// is unix domain socket
if (rest.StartsWith("/")) rest = rest[1..];
else {
var vatIdAndRest = rest.Split("@");
if (vatIdAndRest.Length > 0) vatIdBase64Url = vatIdAndRest[0];
if (vatIdAndRest[^1].Contains('/')) {
var addressPortAndRest = vatIdAndRest[^1].Split("/");
if (addressPortAndRest.Length > 0) {
addressPort = addressPortAndRest[0];
var addressAndPort = addressPort.Split(":");
if (addressAndPort.Length > 0) address = addressAndPort[0];
if (addressAndPort.Length > 1) port = Int32.Parse(addressAndPort[1]);
}
if (addressPortAndRest.Length > 1) srToken = addressPortAndRest[1];
}
}
if (addressPort.Length <= 0) return null;
try {
//Console.WriteLine("ConnectionManager: ThreadId: " + Thread.CurrentThread.ManagedThreadId);
//var con = new TcpRpcClient(address, port);
var con = _connections.GetOrAdd(addressPort, new TcpRpcClient(address, port));
await con.WhenConnected;
if (!string.IsNullOrEmpty(srToken)) {
var restorer = con.GetMain<Schema.Persistence.IRestorer>();
//var srTokenArr = Convert.FromBase64String(Restorer.FromBase64Url(srToken));
//var srToken = System.Text.Encoding.UTF8.GetString(srTokenArr);
var cap = await restorer.Restore(new Schema.Persistence.Restorer.RestoreParams {
LocalRef=new Schema.Persistence.SturdyRef.Token { Text = srToken }});
return cap.Cast<TRemoteInterface>(true);
} else {
var bootstrap = con.GetMain<TRemoteInterface>();
//Console.WriteLine("ConnectionManager: current TaskSchedulerId: " + TaskScheduler.Current.Id);
return bootstrap;
}
}
catch(ArgumentOutOfRangeException) {
_connections.TryRemove(addressPort, out _);
}
catch(System.Exception e) {
_connections.TryRemove(addressPort, out _);
throw;
}
return null;
}
public void Bind(IPAddress address, int tcpPort, object bootstrap)
{
_server?.Dispose();
_server = new TcpRpcServer();
_server.AddBuffering();
_server.Main = bootstrap;
_server.StartAccepting(address, tcpPort);
}
}
}