Skip to content

Commit 02be21f

Browse files
committed
handle socket by lifecycle
1 parent 55789ad commit 02be21f

23 files changed

+570
-233
lines changed

app/src/main/java/com/jeremy/thunder/MainActivity.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ class MainActivity : ComponentActivity() {
3939
}
4040

4141
LaunchedEffect(key1 = Unit) {
42-
viewModel.observeResponse()
42+
viewModel.observeAllMarket()
43+
}
44+
45+
LaunchedEffect(key1 = Unit) {
46+
viewModel.observeTicker()
4347
}
4448

4549
val state = viewModel.response.collectAsState()

app/src/main/java/com/jeremy/thunder/MainViewModel.kt

+24-8
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,50 @@ import com.jeremy.thunder.socket.BinanceRequest
66
import com.jeremy.thunder.socket.SocketService
77
import com.jeremy.thunder.socket.Ticker
88
import dagger.hilt.android.lifecycle.HiltViewModel
9-
import kotlinx.coroutines.delay
109
import kotlinx.coroutines.flow.MutableStateFlow
1110
import kotlinx.coroutines.flow.StateFlow
1211
import kotlinx.coroutines.flow.asStateFlow
1312
import kotlinx.coroutines.flow.launchIn
1413
import kotlinx.coroutines.flow.onEach
15-
import kotlinx.coroutines.flow.update
1614
import kotlinx.coroutines.launch
1715
import javax.inject.Inject
1816

1917
@HiltViewModel
2018
class MainViewModel @Inject constructor(
2119
private val service: SocketService
22-
): ViewModel() {
20+
) : ViewModel() {
2321

2422
private val _response = MutableStateFlow<Ticker?>(null)
2523
val response: StateFlow<Ticker?> = _response.asStateFlow()
2624

2725
fun request() {
2826
viewModelScope.launch {
29-
delay(4000) // thunder state observer가 생성 되기 이전 임으로 임의 딜레이
30-
service.request(request = BinanceRequest())
27+
// 특정 2개의 티커만 조회
28+
service.request(
29+
request = BinanceRequest(
30+
params = listOf(
31+
"btcusdt@markPrice",
32+
"ethusdt@markPrice"
33+
)
34+
)
35+
)
3136
}
3237
}
3338

34-
fun observeResponse() {
35-
service.observeTicker().onEach { result ->
36-
_response.update { result.data }
39+
fun requestAllMarket() {
40+
//전체 마켓 데이터 조회
41+
service.request(request = BinanceRequest(params = listOf("!markPrice@arr")))
42+
}
43+
44+
fun observeAllMarket() {
45+
service.observeAllMarkets().onEach {
46+
47+
}.launchIn(viewModelScope)
48+
}
49+
50+
fun observeTicker() {
51+
service.observeTicker().onEach {
52+
3753
}.launchIn(viewModelScope)
3854
}
3955
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.jeremy.thunder.socket
2+
3+
import com.google.gson.annotations.SerializedName
4+
5+
data class AllMarketResponse(
6+
@SerializedName("id")
7+
val id: Int,
8+
9+
@SerializedName("result")
10+
val result: Any? = null,
11+
12+
@SerializedName("stream")
13+
val stream: String = "",
14+
15+
@SerializedName("data")
16+
val data: List<Ticker> = emptyList()
17+
)

app/src/main/java/com/jeremy/thunder/socket/BinanceRequest.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,7 @@ package com.jeremy.thunder.socket
33
data class BinanceRequest(
44
val id: Int = 1,
55
val method: String = "SUBSCRIBE",
6-
val params: List<String> = listOf("btcusdt@markPrice", "ethusdt@markPrice")
6+
val params: List<String> = listOf("btcusdt@markPrice")
77
)
8+
9+
//val params: List<String> = listOf("btcusdt@markPrice", "ethusdt@markPrice")

app/src/main/java/com/jeremy/thunder/socket/SocketService.kt

+3
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,7 @@ interface SocketService {
1010

1111
@Receive
1212
fun observeTicker(): Flow<TickerResponse>
13+
14+
@Receive
15+
fun observeAllMarkets(): Flow<AllMarketResponse>
1316
}

thunder-core/src/main/java/com/jeremy/thunder/ThunderState.kt

+28
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,44 @@
11
package com.jeremy.thunder
22

3+
/**
4+
* Socket Thunder State
5+
* 기존 Scarlet 라이브러리에선 Event 상태를 체크해서 활성화 상태일 때만 Send를 할 수 있는 구조였기에 CONNECTED 상태 외에 모두 캐시 처리하고 저장하는 것이 필요
6+
* */
37
sealed interface ThunderState {
8+
9+
/*
10+
* 초기 소켓 상태
11+
* */
412
object IDLE : ThunderState
513

14+
/*
15+
* 연결 중인 상태
16+
* Send 요청 시, Cache 처리
17+
* */
618
object CONNECTING : ThunderState
719

20+
/*
21+
* 연결 된 상태
22+
* Send 요청 시, Cache 체크 후 데이터 전송
23+
* */
824
object CONNECTED : ThunderState
925

26+
/*
27+
* 연결을 끊고 있는 상태
28+
* Send 요청 시, 무시 (Cache Clear)
29+
* */
1030
object DISCONNECTING : ThunderState
1131

32+
/*
33+
* 연결이 끊긴 상태
34+
* Send 요청 시, 무시 (Cache Clear)
35+
* */
1236
object DISCONNECTED : ThunderState
1337

38+
/*
39+
* 에러가 난 상태
40+
* Send 요청 시, Cache 처리
41+
* */
1442
data class ERROR(
1543
val error: ThunderError
1644
) : ThunderState

thunder-okhttp/src/main/java/com/jeremy/thunder/OkHttpUtils.kt

+15-6
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,22 @@ import okhttp3.OkHttpClient
66
import okhttp3.Request
77

88
fun OkHttpClient.makeWebSocketCore(url: String): WebSocket.Factory {
9-
val socketListener = SocketListener()
10-
newWebSocket(
11-
request = Request.Builder().url(url).build(),
12-
listener = socketListener
13-
)
149
return OkHttpWebSocket.Factory(
15-
socketListener = socketListener,
10+
provider = ConnectionProvider(this, url),
1611
scope = CoroutineScope(SupervisorJob())
1712
)
13+
}
14+
15+
interface SocketListenerProvider {
16+
fun provide(socketListener: SocketListener)
17+
}
18+
19+
class ConnectionProvider(
20+
private val okHttpClient: OkHttpClient,
21+
private val url: String
22+
) : SocketListenerProvider {
23+
24+
override fun provide(socketListener: SocketListener) {
25+
okHttpClient.newWebSocket(Request.Builder().url(url).build(), socketListener)
26+
}
1827
}

thunder-okhttp/src/main/java/com/jeremy/thunder/OkHttpWebSocket.kt

+13-12
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,31 @@ import kotlinx.coroutines.flow.asSharedFlow
88
import kotlinx.coroutines.flow.filterNotNull
99
import kotlinx.coroutines.flow.launchIn
1010
import kotlinx.coroutines.flow.onEach
11+
import kotlinx.coroutines.flow.onStart
1112

1213

1314
class OkHttpWebSocket internal constructor(
15+
private val provider: ConnectionProvider,
1416
private val socketListener: SocketListener,
1517
private val socketHandler: SocketHandler,
1618
private val scope: CoroutineScope
1719
) : WebSocket {
1820

1921
private val _event = MutableStateFlow<com.jeremy.thunder.event.WebSocketEvent?>(null)
20-
21-
init {
22-
socketListener.collectEvent().onEach {
22+
override fun open() {
23+
socketListener.collectEvent().onStart {
24+
provider.provide(socketListener)
25+
}.onEach {
26+
_event.tryEmit(it)
2327
when (it) {
24-
is com.jeremy.thunder.event.WebSocketEvent.OnConnectionOpen -> {
25-
socketHandler.open(it.webSocket as okhttp3.WebSocket)
28+
is WebSocketEvent.OnConnectionOpen -> {
29+
socketHandler.initWebSocket(it.webSocket as okhttp3.WebSocket)
2630
}
27-
else -> _event.tryEmit(it)
31+
else -> Unit
2832
}
2933
}.launchIn(scope)
3034
}
3135

32-
override fun open(webSocket: okhttp3.WebSocket) {
33-
// ?
34-
}
35-
3636
override fun events(): Flow<com.jeremy.thunder.event.WebSocketEvent> {
3737
return _event.asSharedFlow().filterNotNull()
3838
}
@@ -54,12 +54,13 @@ class OkHttpWebSocket internal constructor(
5454
}
5555

5656
class Factory(
57-
private val socketListener: SocketListener,
57+
private val provider: ConnectionProvider,
5858
private val scope: CoroutineScope
5959
) : WebSocket.Factory {
6060
override fun create(): WebSocket =
6161
OkHttpWebSocket(
62-
socketListener = socketListener,
62+
provider = provider ,
63+
socketListener = SocketListener(),
6364
socketHandler = SocketHandler(),
6465
scope = scope
6566
)

thunder-okhttp/src/main/java/com/jeremy/thunder/SocketHandler.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@ class SocketHandler : WebSocket {
77

88
private var socket: okhttp3.WebSocket? = null
99

10-
override fun open(webSocket: okhttp3.WebSocket) {
11-
socket = webSocket
10+
fun initWebSocket(socket: okhttp3.WebSocket) {
11+
this.socket = socket
12+
}
13+
14+
override fun open() {
15+
1216
}
1317

1418
override fun events(): Flow<com.jeremy.thunder.event.WebSocketEvent> {
@@ -21,10 +25,12 @@ class SocketHandler : WebSocket {
2125

2226
override fun close(code: Int, reason: String) {
2327
socket?.close(code, reason)
28+
socket = null
2429
}
2530

2631
override fun cancel() {
2732
socket?.cancel()
33+
socket = null
2834
}
2935

3036
override fun error(t: String) {

thunder/src/main/java/com/jeremy/thunder/Converter.kt

+6-5
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import com.google.gson.reflect.TypeToken
66
import java.io.StringReader
77
import java.lang.reflect.Type
88

9-
interface Converter <T> {
10-
fun convert(data: String) : T
9+
interface Converter<T> {
10+
fun convert(data: String): T
1111
}
1212

1313
class ConvertAdapter<T> private constructor(
1414
private val gson: Gson,
15-
private val typeAdapter: TypeAdapter<T>
16-
): Converter<T> {
15+
private val typeAdapter: TypeAdapter<T>,
16+
private val type: Type
17+
) : Converter<T> {
1718

1819
override fun convert(data: String): T {
1920
val jsonReader = gson.newJsonReader(StringReader(data))
@@ -23,7 +24,7 @@ class ConvertAdapter<T> private constructor(
2324
class Factory {
2425
fun create(type: Type): ConvertAdapter<*> {
2526
val typeAdapter = Gson().getAdapter(TypeToken.get(type))
26-
return ConvertAdapter(Gson(), typeAdapter)
27+
return ConvertAdapter(Gson(), typeAdapter, type)
2728
}
2829
}
2930
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.jeremy.thunder
2+
3+
import com.jeremy.thunder.event.WebSocketEvent
4+
import kotlinx.coroutines.flow.Flow
5+
import kotlinx.coroutines.flow.filter
6+
import kotlinx.coroutines.flow.map
7+
8+
/**
9+
* mapEvent 내에서 convert가 실패하면 다른 응답이기에 실패했다고 봐야 하는지?
10+
* */
11+
12+
class EventMapper<T> constructor(
13+
private val converter: Converter<T>
14+
) {
15+
16+
fun mapEvent(flow: Flow<WebSocketEvent>): Flow<T?> = flow.filter {
17+
it is WebSocketEvent.OnMessageReceived
18+
}.map {
19+
(it as WebSocketEvent.OnMessageReceived).data
20+
}.map {
21+
try {
22+
val result = converter.convert(it)
23+
result
24+
} catch (e: Exception) {
25+
null
26+
}
27+
}
28+
29+
class Factory {
30+
fun create(converter: Converter<*>): EventMapper<*> {
31+
return EventMapper(converter)
32+
}
33+
}
34+
}

thunder/src/main/java/com/jeremy/thunder/ReceivePipeline.kt

-39
This file was deleted.

0 commit comments

Comments
 (0)