-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathrsocket_connector.dart
75 lines (66 loc) · 2.31 KB
/
rsocket_connector.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import 'dart:typed_data';
import 'core/rsocket_error.dart';
import 'core/rsocket_requester.dart';
import 'duplex_connection.dart';
import 'payload.dart';
import 'rsocket.dart';
class RSocketConnector {
Payload? payload;
int keepAliveIntervalMs = 20 * 1000; // 20 seconds
int keepAliveMaxLifeTimeMs = 90 * 1000; // 90 seconds
String _dataMimeType = 'application/json';
String _metadataMimeType = 'message/x.rsocket.composite-metadata.v0';
ErrorConsumer? _errorConsumer;
SocketAcceptor? _acceptor;
RSocketConnector.create();
RSocketConnector acceptor(SocketAcceptor socketAcceptor) {
this._acceptor = socketAcceptor;
return this;
}
RSocketConnector setupPayload(Payload payload) {
this.payload = payload;
return this;
}
RSocketConnector dataMimeType(String dataMimeType) {
_dataMimeType = dataMimeType;
return this;
}
RSocketConnector metadataMimeType(String metadataMimeType) {
_metadataMimeType = metadataMimeType;
return this;
}
// set the keep alive, and unit is second
RSocketConnector keepAlive(int interval, int maxLifeTime) {
this.keepAliveIntervalMs = interval * 1000;
this.keepAliveMaxLifeTimeMs = maxLifeTime * 1000;
return this;
}
Future<RSocket> connect(String url) async {
TcpChunkHandler handler = (Uint8List chunk) {};
var connectionSetupPayload = ConnectionSetupPayload()
..keepAliveIntervalMs = keepAliveIntervalMs
..keepAliveMaxLifetimeMs = keepAliveMaxLifeTimeMs
..metadataMimeType = _metadataMimeType
..dataMimeType = _dataMimeType
..data = payload?.data
..metadata = payload?.metadata;
return connectRSocket(url, handler).then((conn) {
var rsocketRequester =
RSocketRequester('requester', connectionSetupPayload, conn);
if (_acceptor != null) {
rsocketRequester.responder =
_acceptor!(connectionSetupPayload, rsocketRequester);
if (rsocketRequester.responder == null) {
rsocketRequester.close();
return Future.error(
'RSOCKET-0x00000003: Connection refused, please check setup and security!');
}
} else {
rsocketRequester.responder = RSocket();
}
rsocketRequester.errorConsumer = _errorConsumer;
rsocketRequester.sendSetupPayload();
return rsocketRequester;
});
}
}