-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathrsocket_responder.dart
95 lines (84 loc) · 2.75 KB
/
rsocket_responder.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import 'package:universal_io/io.dart';
import 'package:web_socket_channel/src/channel.dart';
import '../core/rsocket_requester.dart';
import '../duplex_connection.dart';
import '../frame/frame.dart';
import '../frame/frame_types.dart' as frame_types;
import '../payload.dart';
import '../rsocket.dart';
class BaseResponder {
late SocketAcceptor socketAcceptor;
late Uri uri;
Future<void> receiveConnection(DuplexConnection duplexConn) async {
RSocketRequester? rsocketRequester;
duplexConn.receiveHandler = (chunk) {
for (var frame in parseFrames(chunk)) {
var header = frame.header;
if (header.type == frame_types.SETUP) {
var setupFrame = frame as SetupFrame;
var connectionSetupPayload = ConnectionSetupPayload()
..keepAliveIntervalMs = setupFrame.keepAliveIntervalMs
..keepAliveMaxLifetimeMs = setupFrame.keepAliveMaxLifetimeMs
..metadataMimeType = setupFrame.metadataMimeType
..dataMimeType = setupFrame.dataMimeType
..data = setupFrame.payload?.data
..metadata = setupFrame.payload?.data;
var temp =
RSocketRequester('responder', connectionSetupPayload, duplexConn);
var responder = socketAcceptor(connectionSetupPayload, temp);
if (responder == null) {
duplexConn.close();
break;
} else {
temp.responder = responder;
rsocketRequester = temp;
}
} else {
rsocketRequester?.receiveFrame(frame);
}
}
};
duplexConn.init();
}
}
class TcpRSocketResponder extends BaseResponder implements Closeable {
late ServerSocket serverSocket;
TcpRSocketResponder(
Uri uri, ServerSocket serverSocket, SocketAcceptor socketAcceptor) {
this.uri = uri;
this.socketAcceptor = socketAcceptor;
this.serverSocket = serverSocket;
}
void accept() {
serverSocket.listen((socket) {
receiveConnection(TcpDuplexConnection(socket)).then((value) => {});
});
}
@override
void close() {
serverSocket.close();
}
}
class WebSocketRSocketResponder extends BaseResponder implements Closeable {
late HttpServer httpServer;
WebSocketRSocketResponder(
Uri uri, HttpServer httpServer, SocketAcceptor socketAcceptor) {
this.uri = uri;
this.socketAcceptor = socketAcceptor;
this.httpServer = httpServer;
}
void accept() {
httpServer.listen((HttpRequest req) {
if (req.uri.path == uri.path) {
WebSocketTransformer.upgrade(req)
.then((webSocket) => receiveConnection(
WebSocketDuplexConnection(webSocket as WebSocketChannel)))
.then((value) => {});
}
});
}
@override
void close() {
httpServer.close();
}
}