Skip to content

Commit

Permalink
modify recent valve type
Browse files Browse the repository at this point in the history
  • Loading branch information
jaeyunn15 committed Dec 22, 2023
1 parent 38c72d1 commit 4c856a6
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import com.jeremy.thunder.thunder_internal.event.StompSendRequest
import com.jeremy.thunder.thunder_internal.event.StompSubscribeRequest
import com.jeremy.thunder.thunder_internal.event.ThunderRequest
import com.jeremy.thunder.thunder_internal.stateDelegate
import com.jeremy.thunder.thunder_state.Active
import com.jeremy.thunder.thunder_state.ConnectState
import com.jeremy.thunder.thunder_state.NetworkState
import com.jeremy.thunder.thunder_state.WebSocketEvent
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -98,7 +100,7 @@ class StompStateManager private constructor(
private suspend fun checkOnValidState(): Boolean = withContext(Dispatchers.Default) {
val appState = connectionListener.collectAppState().firstOrNull()
val networkState = networkState.networkStatus.firstOrNull()
appState == com.jeremy.thunder.thunder_state.Active && networkState == com.jeremy.thunder.thunder_state.NetworkState.Available
appState == Active && networkState == NetworkState.Available
}

private suspend fun connectionRecoveryProcess(onConnect: () -> Unit) {
Expand All @@ -125,7 +127,6 @@ class StompStateManager private constructor(
}

private fun requestExecute(message: ThunderRequest) = innerScope.launch(Dispatchers.IO) {
println("execute = $message")
when (message.typeOfRequest) {
RequestType.STOMP_SUBSCRIBE -> {
val request = message as StompSubscribeRequest
Expand Down Expand Up @@ -212,7 +213,6 @@ class StompStateManager private constructor(
}

override fun send(message: ThunderRequest) {
println("send = $message")
val result = when (message.typeOfRequest) {
RequestType.STOMP_SEND -> message as StompSendRequest
RequestType.STOMP_SUBSCRIBE -> message as StompSubscribeRequest
Expand All @@ -228,6 +228,7 @@ class StompStateManager private constructor(
socket?.let { websocket ->
runCatching {
val uuid = UUID.randomUUID().toString()
headerIdStore.put(topic, uuid)
websocket.send(
compileMessage(
thunderStompRequest {
Expand All @@ -240,11 +241,7 @@ class StompStateManager private constructor(
this.payload = payload
}
)
).apply {
if (this) {
headerIdStore.put(topic, uuid)
}
}
)
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions thunder/src/main/java/com/jeremy/thunder/cache/ValveCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.jeremy.thunder.cache

import com.jeremy.thunder.thunder_internal.cache.BaseValve
import com.jeremy.thunder.thunder_internal.event.ThunderRequest
import com.jeremy.thunder.thunder_state.ConnectState
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
Expand Down Expand Up @@ -32,20 +33,27 @@ class ValveCache: BaseValve<ThunderRequest> {
}
emit(emitCacheList)
}
delay(500)
delay(CACHE_EMIT_GAP)
}
}

override fun onUpdateValve(state: com.jeremy.thunder.thunder_state.ConnectState) {
isEmissiable.update { state is com.jeremy.thunder.thunder_state.ConnectState.Establish }
override fun onUpdateValve(state: ConnectState) {
isEmissiable.update { state is ConnectState.Establish }
}

override fun requestToValve(request: ThunderRequest) {
innerQueue.add(request)
val lastRequest = innerQueue.poll()
if (lastRequest != request) {
innerQueue.add(request)
}
}

override fun emissionOfValveFlow(): Flow<List<ThunderRequest>> = cacheFlow()

companion object {
private const val CACHE_EMIT_GAP = 500L
}

class Factory: BaseValve.BaseValveFactory<ThunderRequest> {
override fun create(): ValveCache {
return ValveCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ class ReceivePipeline<T>(
scope: CoroutineScope
) {
init {
socketEventFlow.onEach {
store(it)
}.launchIn(scope)
socketEventFlow.onEach(::store).launchIn(scope)
}

private val _cache = Channel<T>(capacity = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import java.lang.reflect.Type
* create logic by annotation
* @Receive - create pipeline flow for observer
* @Send - create send data
* @Subscribe - create subscribe for stomp
* @StompSubscribe - create subscribe for stomp
* @StompSend - create send data for stomp
* */

class ServiceExecutor internal constructor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ class ThunderProvider internal constructor(
) {

init {
stateManager.collectWebSocketEvent().onEach {
eventProcessor.onEventDelivery(it)
}.launchIn(scope)
stateManager.collectWebSocketEvent().onEach(eventProcessor::onEventDelivery).launchIn(scope)
}

fun observeEvent(): Flow<WebSocketEvent> {
return eventProcessor.collectEvent()
}
fun observeEvent(): Flow<WebSocketEvent> = eventProcessor.collectEvent()

fun send(request: ThunderRequest) {
stateManager.send(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.jeremy.thunder.thunder_state.WebSocketEvent
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
Expand Down

0 comments on commit 4c856a6

Please sign in to comment.