@@ -7,13 +7,16 @@ import com.jeremy.thunder.connection.AppConnectionListener
7
7
import com.jeremy.thunder.coroutine.CoroutineScope.scope
8
8
import com.jeremy.thunder.event.WebSocketEvent
9
9
import com.jeremy.thunder.network.NetworkConnectivityService
10
- import com.jeremy.thunder.state.GetReady
10
+ import com.jeremy.thunder.state.Background
11
+ import com.jeremy.thunder.state.Foreground
11
12
import com.jeremy.thunder.state.Initialize
12
13
import com.jeremy.thunder.state.NetworkState
13
14
import com.jeremy.thunder.state.ShutDown
14
15
import com.jeremy.thunder.state.ThunderError
15
16
import com.jeremy.thunder.state.ThunderState
17
+ import com.jeremy.thunder.thunderLog
16
18
import com.jeremy.thunder.ws.WebSocket
19
+ import kotlinx.coroutines.CoroutineExceptionHandler
17
20
import kotlinx.coroutines.CoroutineScope
18
21
import kotlinx.coroutines.Job
19
22
import kotlinx.coroutines.delay
@@ -22,9 +25,10 @@ import kotlinx.coroutines.flow.MutableStateFlow
22
25
import kotlinx.coroutines.flow.asSharedFlow
23
26
import kotlinx.coroutines.flow.asStateFlow
24
27
import kotlinx.coroutines.flow.combine
25
- import kotlinx.coroutines.flow.getAndUpdate
26
28
import kotlinx.coroutines.flow.launchIn
27
29
import kotlinx.coroutines.flow.onEach
30
+ import kotlinx.coroutines.flow.update
31
+ import kotlinx.coroutines.plus
28
32
29
33
/* *
30
34
* Manage ThunderState as SocketState using NetworkState
@@ -39,19 +43,23 @@ class ThunderStateManager private constructor(
39
43
private val webSocketCore : WebSocket .Factory ,
40
44
private val scope : CoroutineScope
41
45
) {
46
+ private val innerScope = scope + CoroutineExceptionHandler { _, throwable ->
47
+ thunderLog(" [ThunderStateManager] = ${throwable.message} " )
48
+ }
49
+
42
50
private var socket: WebSocket ? = null
43
51
44
52
private val _socketState = MutableStateFlow <ThunderState >(ThunderState .IDLE )
45
53
46
54
private val _events = MutableSharedFlow <WebSocketEvent >(replay = 1 )
47
55
48
- private var _lastSocketState : ThunderState = ThunderState .IDLE
49
-
50
56
/* *
51
57
* If the device loses the network or the socket connection fails, it enters the error state below.
52
58
* This is used to use the cache for recovery when a [ThunderState.CONNECTED] is reached.
53
59
* */
54
- private var isFromError = false
60
+ private var isReSubscription = false
61
+
62
+ private val _retryNeedFlag = MutableStateFlow <Boolean >(false )
55
63
56
64
fun thunderStateAsFlow () = _socketState .asStateFlow()
57
65
@@ -61,85 +69,98 @@ class ThunderStateManager private constructor(
61
69
62
70
init {
63
71
/* *
64
- * The following code is used to open the valve based on the socket state.
65
- * */
66
- _socketState .onEach {
67
- if (it is ThunderState .ERROR && networkState.hasAvailableNetworks()) {
68
- closeConnection()
69
- delay(500 )
70
- openConnection()
71
- }
72
- valveCache.onUpdateValveState(it)
73
- }.launchIn(scope)
74
-
75
- /* *
76
- * When an app is present in a process but offscreen, it automatically controls the connection based on two states to maintain the connection in the meantime.
72
+ * Update SocketState as ThunderState.
77
73
* */
78
- connectionListener.collectState().onEach {
79
- when (it) {
80
- Initialize -> Unit
81
- GetReady -> openConnection()
82
- ShutDown -> closeConnection()
74
+ _events .onEach { event ->
75
+ when (event) {
76
+ is WebSocketEvent .OnConnectionOpen -> {
77
+ _socketState .update { ThunderState .CONNECTED }
78
+ }
79
+
80
+ is WebSocketEvent .OnMessageReceived -> Unit
81
+
82
+ WebSocketEvent .OnConnectionClosed -> {
83
+ _socketState .update { ThunderState .DISCONNECTED }
84
+ }
85
+
86
+ is WebSocketEvent .OnConnectionError -> {
87
+ isReSubscription = true
88
+ _socketState .update { ThunderState .ERROR (ThunderError .SocketLoss (event.error)) }
89
+ }
83
90
}
84
- }.launchIn(scope )
91
+ }.launchIn(innerScope )
85
92
86
93
/* *
87
- * Used to change the ThunderState based on the device's network connection status.
88
- * */
89
- networkState.networkStatus.onEach {
90
- when (it) {
94
+ * Open, Retry Connection work as network state.
95
+ * */
96
+ combine(
97
+ _retryNeedFlag ,
98
+ networkState.networkStatus
99
+ ) { retry, network ->
100
+ when (network) {
91
101
NetworkState .Available -> {
92
- openConnection()
102
+ if (retry) {
103
+ retryConnection()
104
+ } else {
105
+ openConnection()
106
+ }
93
107
}
94
-
95
108
NetworkState .Unavailable -> {
96
- isFromError = true
97
- _socketState .updateThunderState(ThunderState .ERROR (ThunderError .NetworkLoss (null )))
98
- closeConnection()
109
+ _socketState .update { ThunderState .ERROR (ThunderError .NetworkLoss ) }
99
110
}
100
111
}
101
- }.launchIn(scope )
112
+ }.launchIn(innerScope )
102
113
103
- _events .onEach {
104
- when (it) {
105
- is WebSocketEvent .OnConnectionOpen -> {
106
- _socketState .updateThunderState(ThunderState .CONNECTED )
114
+ /* *
115
+ * Update RetryFlag And Valve And request socket message as upstream state.
116
+ * */
117
+ combine(
118
+ _socketState ,
119
+ connectionListener.collectState()
120
+ ) { socketState, appState ->
121
+ when (appState) {
122
+ Initialize -> {
123
+ openConnection()
107
124
}
108
-
109
- is WebSocketEvent .OnMessageReceived -> Unit
110
-
111
- WebSocketEvent .OnConnectionClosed -> {
112
- _socketState .updateThunderState(ThunderState .DISCONNECTED )
125
+ Foreground -> {
126
+ if (socketState is ThunderState .ERROR ) {
127
+ _retryNeedFlag .update { true }
128
+ }
113
129
}
114
-
115
- is WebSocketEvent .OnConnectionError -> {
116
- isFromError = true
117
- _socketState .updateThunderState(ThunderState .ERROR (ThunderError .SocketLoss (it.error)))
130
+ Background -> {}
131
+ ShutDown -> {
132
+ closeConnection()
118
133
}
119
134
}
120
- }.launchIn(scope)
135
+ valveCache.onUpdateValveState(socketState)
136
+ }.launchIn(innerScope)
121
137
122
138
combine(
123
139
_socketState ,
124
140
valveCache.emissionOfValveFlow()
125
- ) { currentState, request ->
126
- when (currentState) {
127
- ThunderState .IDLE -> Unit
128
- ThunderState .CONNECTING -> {}
141
+ ) { currentSocketState, request ->
142
+ when (currentSocketState) {
129
143
ThunderState .CONNECTED -> {
130
- if (isFromError && recoveryCache.hasCache()) {
144
+ if (isReSubscription && recoveryCache.hasCache()) {
131
145
recoveryCache.get()?.let { requestSendMessage(it) }
132
146
recoveryCache.clear()
133
- isFromError = false
147
+ isReSubscription = false
134
148
} else {
135
149
request.forEach(::requestSendMessage)
136
150
}
137
151
}
138
- ThunderState .DISCONNECTING -> {}
139
- ThunderState .DISCONNECTED -> {}
140
- is ThunderState .ERROR -> {}
152
+
153
+ else -> Unit
141
154
}
142
- }.launchIn(scope)
155
+ }.launchIn(innerScope)
156
+ }
157
+
158
+ private suspend fun retryConnection () {
159
+ thunderLog(" Thunder retry connection work." )
160
+ closeConnection()
161
+ delay(RETRY_CONNECTION_GAP )
162
+ openConnection()
163
+ _retryNeedFlag .update { false }
143
164
}
144
165
145
166
/* *
@@ -152,20 +173,22 @@ class ThunderStateManager private constructor(
152
173
}
153
174
154
175
private lateinit var connectionJob: Job
155
- private fun openConnection () {
176
+ private fun openConnection () = synchronized( this ) {
156
177
if (socket == null ) {
157
- _socketState .updateThunderState(ThunderState .CONNECTING )
158
178
socket = webSocketCore.create()
159
179
socket?.let { webSocket ->
160
180
webSocket.open()
181
+ thunderLog(" Thunder open connection work." )
161
182
if (::connectionJob.isInitialized) connectionJob.cancel()
162
- connectionJob = webSocket.events().onEach { _events .tryEmit(it) }.launchIn(scope )
183
+ connectionJob = webSocket.events().onEach { _events .tryEmit(it) }.launchIn(innerScope )
163
184
}
164
185
}
165
186
}
166
187
167
- private fun closeConnection () {
188
+ private fun closeConnection () = synchronized( this ) {
168
189
socket?.let {
190
+ thunderLog(" Thunder close connection work." )
191
+ _socketState .update { ThunderState .ERROR () }
169
192
it.close(1000 , " shutdown" )
170
193
if (::connectionJob.isInitialized) connectionJob.cancel()
171
194
socket = null
@@ -176,10 +199,6 @@ class ThunderStateManager private constructor(
176
199
valveCache.requestToValve(key to message)
177
200
}
178
201
179
- private fun MutableStateFlow<ThunderState>.updateThunderState (state : ThunderState ) {
180
- _lastSocketState = getAndUpdate { state }
181
- }
182
-
183
202
class Factory (
184
203
private val connectionListener : AppConnectionListener ,
185
204
private val networkStatus : NetworkConnectivityService ,
@@ -197,4 +216,8 @@ class ThunderStateManager private constructor(
197
216
)
198
217
}
199
218
}
219
+
220
+ companion object {
221
+ private const val RETRY_CONNECTION_GAP = 500L
222
+ }
200
223
}
0 commit comments