Skip to content

Commit

Permalink
enhance(rtc): update ws apis
Browse files Browse the repository at this point in the history
  • Loading branch information
RCmerci committed Apr 29, 2024
1 parent 0a0b7e0 commit 5cd1bd2
Showing 1 changed file with 46 additions and 42 deletions.
88 changes: 46 additions & 42 deletions src/main/frontend/worker/rtc/ws2.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
(throw x)
x))

(defn- create-ws-task
(defn- create-mws*
[url]
(m/sp
(if-let [[mbx ws close-dfv] (m/? (m/timeout (open-ws-task url) 10000))]
(if-let [[mbx ws close-dfv] (m/? (open-ws-task url))]
{:raw-ws ws
:send (fn [data]
(m/sp
Expand All @@ -80,82 +80,86 @@
[m-ws]
(contains? #{:closing :closed} (get-state (:raw-ws m-ws))))

(defn create-ws-flow
"Return a missionary-webocket flow.
Always produce NOT-closed websockets if possible.
If open&connect websocket failed, will retry with backoff(`c.m/delays`)

(defn get-mws-create
"Returns a task :get a mws(missionary-websocket), creating one if needed.
Always try to produce NOT-closed websocket.
When failed to open websocket, retry with backoff.
TODO: retry ASAP once network condition changed"
[url]
[url & {:keys [retry-count open-ws-timeout] :or {retry-count 10 open-ws-timeout 10000}}]
(assert (and (pos-int? retry-count)
(pos-int? open-ws-timeout))
[retry-count open-ws-timeout])
(let [*last-m-ws (atom nil)
new-m-ws-task
backoff-create-ws-task
(c.m/backoff
(take 10 c.m/delays)
(m/sp (let [r (try
(m/? (create-ws-task url))
(catch js/CloseEvent e
(throw (ex-info "failed to open ws conn" {:missionary/retry true} e))))]
(reset! *last-m-ws r)
r)))]
(m/stream
(m/ap
(loop []
(m/amb
(if-let [m-ws @*last-m-ws]
(if (closed? m-ws)
(m/? new-m-ws-task)
m-ws)
(m/? new-m-ws-task))
(recur)))))))
(take retry-count c.m/delays)
(m/sp
(let [m-ws
(try
(m/? (m/timeout (create-mws* url) open-ws-timeout))
(catch js/CloseEvent e
(throw (ex-info "failed to open websocket conn"
{:missionary/retry true}
e))))]
(reset! *last-m-ws m-ws)
m-ws)))]
(m/sp
(let [m-ws @*last-m-ws]
(if (and m-ws (not (closed? m-ws)))
m-ws
(m/? backoff-create-ws-task))))))

(defn close
[m-ws]
(.close (:raw-ws m-ws)))

(defn send-task
"return m-ws"
[ws-flow message]
(defn send
"Returns a task: send message and return mws"
[get-mws-task message]
(m/sp
(let [m-ws (m/? (m/reduce (fn [_ m-ws] (when m-ws (reduced m-ws))) ws-flow))
(let [mws (m/? get-mws-task)
decoded-message (rtc-const/data-to-ws-coercer message)
message-str (js/JSON.stringify (clj->js (rtc-const/data-to-ws-encoder decoded-message)))]
(m/? ((:send m-ws) message-str))
m-ws)))
(m/? ((:send mws) message-str))
mws)))

(defn recv-flow
[m-ws]
(m/eduction
(map #(js->clj (js/JSON.parse %) :keywordize-keys true))
(:recv-flow m-ws)))

(defn send&recv-task
[ws-flow message & {:keys [timeout-ms] :or {timeout-ms 10000}}]
(defn send&recv
"Return a task: send message wait to recv its response and return it"
[get-mws-task message & {:keys [timeout-ms] :or {timeout-ms 10000}}]
(assert (pos-int? timeout-ms))
(let [req-id (str (random-uuid))
message (assoc message :req-id req-id)]
(m/sp
(let [m-ws (m/? (send-task ws-flow message))]
(let [mws (m/? (send get-mws-task message))]
(m/? (m/timeout
(m/reduce
(fn [_ v]
(when (= req-id (:req-id v))
(reduced v)))
(recv-flow m-ws))
(recv-flow mws))
timeout-ms))))))

(comment
(do
(def url "wss://ws-dev.logseq.com/rtc-sync?token=????")
(def ws-flow (create-ws-flow url)))
(def get-mws-task (get-mws-create url)))
(def cancel1
((m/reduce conj (m/eduction (take 1) ws-flow)) #(prn :s %) #(js/console.log %)))
(get-mws-task #(prn :s %) #(js/console.log %)))
(cancel1)

(do
(def cancel ((m/sp
(m/? (send&recv-task ws-flow {:action "list-graphs"} :timeout-ms 1000))
(m/? (send&recv-task ws-flow {:action "list-graphs"})))
(m/? (send&recv get-mws-task {:action "list-graphs"} :timeout-ms 1000))
(m/? (send&recv get-mws-task {:action "list-graphs"})))
#(prn :s %) #(js/console.log :f %)))
(cancel))

(cancel)
)

)
)

0 comments on commit 5cd1bd2

Please sign in to comment.