-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLocalOnlyIPCTransport.cs
95 lines (78 loc) · 2.97 KB
/
LocalOnlyIPCTransport.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
using Akka.Actor;
using Akka.Remote.Transport;
using System;
using System.Threading.Tasks;
namespace Akka.Remote.LocalOnlyIPC;
public class LocalOnlyIPCTransport: Transport.Transport
{
private readonly TaskCompletionSource<IAssociationEventListener>
_associationEventListener;
private readonly Address localAddress;
private readonly ILocalOnlyIPC localOnlyIPC;
public LocalOnlyIPCTransport(
ActorSystem actorSystem,
Akka.Configuration.Config config) :
this(
actorSystem.Name,
config.GetString("connection-name"),
config.GetInt("connection-number"),
config.GetString("scheme-identifier"),
config.GetByteSize("maximum-payload-bytes", null) ?? 32000L)
{
}
private LocalOnlyIPCTransport(
string actorSystemName,
string connectionName,
int connectionNumber,
string schemeIdentifier,
long maximumPayloadBytes)
{
localOnlyIPC = schemeIdentifier.ToLowerInvariant() switch
{
"pipe" => new NamedPipeIPC(connectionName, connectionNumber, (int)maximumPayloadBytes),
"uds" => new UnixDomainSocketIPC(connectionName, connectionNumber, (int)maximumPayloadBytes),
_ => throw new NotImplementedException($"no Implementation for Local-Only IPC \"{schemeIdentifier}\"")
};
SchemeIdentifier = localOnlyIPC.IPCSchemeIdentifier;
MaximumPayloadBytes = (int)maximumPayloadBytes;
localAddress = new Address(
SchemeIdentifier,
actorSystemName,
localOnlyIPC.ConnectionName,
localOnlyIPC.ConnectionNumber);
_associationEventListener = new TaskCompletionSource<IAssociationEventListener>();
}
public override Task<AssociationHandle> Associate(Address remoteAddress)
{
LocalOnlyIPCConnectionBase connection =
localOnlyIPC.CreateOutboundConnection(remoteAddress);
AssociationHandle handle =
new LocalOnlyIPCAssociationHandler(
localAddress,
remoteAddress,
connection);
return Task.FromResult(handle);
}
public override bool IsResponsibleFor(Address remote) => true;
public override Task<(Address, TaskCompletionSource<IAssociationEventListener>)> Listen()
{
try
{
localOnlyIPC.PrepareInboundConnection(
_associationEventListener.Task);
}
catch (IPCConnectionFailedException ex)
{
System.Log.Log(
Akka.Event.LogLevel.ErrorLevel,
ex,
"can not offer new inbound connection");
// what do we do here ??
}
return Task.FromResult((localAddress, _associationEventListener));
}
public override Task<bool> Shutdown()
{
return Task.FromResult(true);
}
}