From 75f69ffb48613de4102e7e619f355966b35f5edd Mon Sep 17 00:00:00 2001 From: rcmerci Date: Tue, 30 Apr 2024 02:05:09 +0800 Subject: [PATCH] refactor(rtc): split core by adding ns remote, client, exception and re-impl using missionary --- .../src/logseq/common/missionary_util.cljs | 59 +- src/main/frontend/worker/rtc/client.cljs | 336 +++++++++++ src/main/frontend/worker/rtc/core.cljs | 3 +- src/main/frontend/worker/rtc/core2.cljs | 119 ++++ src/main/frontend/worker/rtc/exception.cljs | 11 + .../frontend/worker/rtc/remote_update.cljs | 561 ++++++++++++++++++ src/main/frontend/worker/rtc/ws2.cljs | 57 +- 7 files changed, 1096 insertions(+), 50 deletions(-) create mode 100644 src/main/frontend/worker/rtc/client.cljs create mode 100644 src/main/frontend/worker/rtc/core2.cljs create mode 100644 src/main/frontend/worker/rtc/exception.cljs create mode 100644 src/main/frontend/worker/rtc/remote_update.cljs diff --git a/deps/common/src/logseq/common/missionary_util.cljs b/deps/common/src/logseq/common/missionary_util.cljs index 6f6cbd35c6f..f21fd536670 100644 --- a/deps/common/src/logseq/common/missionary_util.cljs +++ b/deps/common/src/logseq/common/missionary_util.cljs @@ -1,27 +1,52 @@ (ns logseq.common.missionary-util "Utils based on missionary." + (:import [missionary Cancelled]) (:require [missionary.core :as m])) - - -(def ^:private retry-sentinel (js-obj)) - (def delays (reductions * 1000 (repeat 2))) +(def ^:private retry-sentinel (js-obj)) (defn backoff "Retry task when it throw exception `(get ex-data :missionary/retry)`" [delays task] (m/sp - (loop [[delay & delays] (seq delays)] - (let [r (try - (m/? task) - (catch :default e - (if (and (some-> e ex-data :missionary/retry) - (pos-int? delay)) - (do (m/? (m/sleep delay)) - (println :missionary/retry "after" delay "ms (" (ex-message e) ")") - retry-sentinel) - (throw e))))] - (if (identical? r retry-sentinel) - (recur delays) - r))))) + (loop [[delay & delays] (seq delays)] + (let [r (try + (m/? task) + (catch :default e + (if (and (some-> e ex-data :missionary/retry) + (pos-int? delay)) + (do (m/? (m/sleep delay)) + (println :missionary/retry "after" delay "ms (" (ex-message e) ")") + retry-sentinel) + (throw e))))] + (if (identical? r retry-sentinel) + (recur delays) + r))))) + +(defn mix + "Return a flow which is mixed by `flows`" + [& flows] + (m/ap (m/?> (m/?> (count flows) (m/seed flows))))) + +(defn clock + "Return a flow that emits `value` every `interval-ms`." + ([interval-ms] + (clock interval-ms nil)) + ([interval-ms value] + (->> + (m/ap + (loop [] + (m/amb + (m/? (m/sleep interval-ms value)) + (recur)))) + (m/reductions {} value) + (m/latest identity)))) + +(defn debounce + [duration-ms flow] + (m/ap + (let [x (m/?< flow)] + (try (m/? (m/sleep duration-ms x)) + (catch Cancelled _ + (m/amb)))))) diff --git a/src/main/frontend/worker/rtc/client.cljs b/src/main/frontend/worker/rtc/client.cljs new file mode 100644 index 00000000000..5cc31fdd0e0 --- /dev/null +++ b/src/main/frontend/worker/rtc/client.cljs @@ -0,0 +1,336 @@ +(ns frontend.worker.rtc.client + "Fns about push local updates" + (:require [missionary.core :as m] + [frontend.worker.rtc.op-mem-layer :as op-mem-layer] + [frontend.worker.rtc.const :as rtc-const] + [datascript.core :as d] + [cognitect.transit :as transit] + [frontend.worker.rtc.ws2 :as ws] + [clojure.set :as set] + [frontend.worker.rtc.exception :as r.ex] + [frontend.worker.rtc.remote-update :as r.remote-update])) + + +(def ^:private transit-w (transit/writer :json)) + + +(defn- handle-remote-ex + [resp] + (if-let [e ({:graph-not-exist r.ex/ex-remote-graph-not-exist + :graph-not-ready r.ex/ex-remote-graph-not-ready} + (:type (:ex-data resp)))] + (throw e) + resp)) + +(defn send&recv + "Return a task: throw exception if recv ex-data response" + [get-mws-task message] + (m/sp + (handle-remote-ex + (m/? (ws/send&recv get-mws-task message))))) + +(defn- register-graph-updates + [get-mws-task graph-uuid] + (send&recv get-mws-task {:action "register-graph-updates" + :graph-uuid graph-uuid})) + +(defn ensure-register-graph-updates + "Return a task: get or create a mws(missionary wrapped websocket). + see also `ws/get-mws-create`. + But ensure `register-graph-updates` has been sent" + [get-mws-task graph-uuid] + (assert (some? graph-uuid)) + (let [*sent (atom {})] + (m/sp + (let [mws (m/? get-mws-task)] + (when (contains? @*sent mws) + (swap! *sent mws false)) + (when (not (@*sent mws)) + (m/? (register-graph-updates (m/sp mws) graph-uuid)) + (swap! *sent mws true)) + mws)))) + + +(defn- remove-non-exist-block-uuids-in-add-retract-map + [conn add-retract-map] + (let [{:keys [add retract]} add-retract-map + add* (->> add + (map (fn [x] [:block/uuid x])) + (d/pull-many @conn [:block/uuid]) + (keep :block/uuid))] + (cond-> {} + (seq add*) (assoc :add add*) + (seq retract) (assoc :retract retract)))) + +(defn- ->pos + [left-uuid parent-uuid] + (cond + (or (nil? left-uuid) (nil? parent-uuid)) :no-order + (not= left-uuid parent-uuid) :sibling + :else :child)) + +(defmulti ^:private local-block-ops->remote-ops-aux (fn [tp & _] tp)) + +(defmethod local-block-ops->remote-ops-aux :move-op + [_ & {:keys [parent-uuid left-uuid block-uuid *remote-ops *depend-on-block-uuid-set]}] + (when parent-uuid + (let [target-uuid (or left-uuid parent-uuid) + pos (->pos left-uuid parent-uuid)] + (swap! *remote-ops conj [:move {:block-uuid block-uuid :target-uuid target-uuid :pos pos}]) + (swap! *depend-on-block-uuid-set conj target-uuid)))) + + +(defmethod local-block-ops->remote-ops-aux :update-op + [_ & {:keys [conn user-uuid block update-op left-uuid parent-uuid *remote-ops]}] + (let [block-uuid (:block/uuid block) + attr-map (:updated-attrs (second update-op)) + attr-alias-map (when (contains? attr-map :alias) + (remove-non-exist-block-uuids-in-add-retract-map conn (:alias attr-map))) + attr-tags-map (when (contains? attr-map :tags) + (remove-non-exist-block-uuids-in-add-retract-map conn (:tags attr-map))) + attr-type-map (when (contains? attr-map :type) + (let [{:keys [add retract]} (:type attr-map) + current-type-value (set (:block/type block)) + add (set/intersection add current-type-value) + retract (set/difference retract current-type-value)] + (cond-> {} + (seq add) (assoc :add add) + (seq retract) (assoc :retract retract)))) + attr-properties-map (when (contains? attr-map :properties) + (let [{:keys [add retract]} (:properties attr-map) + properties (:block/properties block) + add* (into [] + (update-vals (select-keys properties add) + (partial transit/write transit-w)))] + (cond-> {} + (seq add*) (assoc :add add*) + (seq retract) (assoc :retract retract)))) + target-uuid (or left-uuid parent-uuid) + pos (->pos left-uuid parent-uuid)] + (swap! *remote-ops conj + [:update + (cond-> {:block-uuid block-uuid} + (:block/journal-day block) (assoc :journal-day (:block/journal-day block)) + (:block/updated-at block) (assoc :updated-at (:block/updated-at block)) + (:block/created-at block) (assoc :created-at (:block/created-at block)) + (= (:block/updated-at block) + (:block/created-at block)) (assoc :created-by user-uuid) + (contains? attr-map :schema) (assoc :schema + (transit/write transit-w (:block/schema block))) + attr-alias-map (assoc :alias attr-alias-map) + attr-type-map (assoc :type attr-type-map) + attr-tags-map (assoc :tags attr-tags-map) + attr-properties-map (assoc :properties attr-properties-map) + (and (contains? attr-map :content) + (:block/raw-content block)) + (assoc :content (:block/raw-content block)) + (and (contains? attr-map :link) + (:block/uuid (:block/link block))) + (assoc :link (:block/uuid (:block/link block))) + target-uuid (assoc :target-uuid target-uuid :pos pos))]))) + +(defmethod local-block-ops->remote-ops-aux :update-page-op + [_ & {:keys [conn block-uuid *remote-ops]}] + (when-let [{page-name :block/name original-name :block/original-name} + (d/entity @conn [:block/uuid block-uuid])] + (swap! *remote-ops conj + [:update-page {:block-uuid block-uuid + :page-name page-name + :original-name (or original-name page-name)}]))) + +(defmethod local-block-ops->remote-ops-aux :remove-op + [_ & {:keys [conn remove-op *remote-ops]}] + (when-let [block-uuid (:block-uuid (second remove-op))] + (when (nil? (d/entity @conn [:block/uuid block-uuid])) + (swap! *remote-ops conj [:remove {:block-uuids [block-uuid]}])))) + +(defmethod local-block-ops->remote-ops-aux :remove-page-op + [_ & {:keys [conn remove-page-op *remote-ops]}] + (when-let [block-uuid (:block-uuid (second remove-page-op))] + (when (nil? (d/entity @conn [:block/uuid block-uuid])) + (swap! *remote-ops conj [:remove-page {:block-uuid block-uuid}])))) + +(defn- local-block-ops->remote-ops + [repo conn user-uuid block-ops] + (let [*depend-on-block-uuid-set (atom #{}) + *remote-ops (atom []) + {move-op :move remove-op :remove update-op :update update-page-op :update-page remove-page-op :remove-page} + block-ops] + (when-let [block-uuid + (some (comp :block-uuid second) [move-op update-op update-page-op])] + (when-let [block (d/entity @conn [:block/uuid block-uuid])] + (let [left-uuid (some-> block :block/left :block/uuid) + parent-uuid (some-> block :block/parent :block/uuid)] + (when parent-uuid ; whiteboard blocks don't have :block/left + ;; remote-move-op + (when move-op + (local-block-ops->remote-ops-aux :move-op + :parent-uuid parent-uuid + :left-uuid left-uuid + :block-uuid block-uuid + :*remote-ops *remote-ops + :*depend-on-block-uuid-set *depend-on-block-uuid-set))) + ;; remote-update-op + (when update-op + (local-block-ops->remote-ops-aux :update-op + :repo repo + :user-uuid user-uuid + :conn conn + :block block + :update-op update-op + :parent-uuid parent-uuid + :left-uuid left-uuid + :*remote-ops *remote-ops + :created-by user-uuid))) + ;; remote-update-page-op + (when update-page-op + (local-block-ops->remote-ops-aux :update-page-op + :repo repo + :conn conn + :block-uuid block-uuid + :*remote-ops *remote-ops)))) + ;; remote-remove-op + (when remove-op + (local-block-ops->remote-ops-aux :remove-op + :repo repo + :conn conn + :remove-op remove-op + :*remote-ops *remote-ops)) + + ;; remote-remove-page-op + (when remove-page-op + (local-block-ops->remote-ops-aux :remove-page-op + :repo repo + :conn conn + :remove-page-op remove-page-op + :*remote-ops *remote-ops)) + + {:remote-ops @*remote-ops + :depend-on-block-uuids @*depend-on-block-uuid-set})) + +(defn- gen-block-uuid->remote-ops + [repo conn user-uuid & {:keys [n] :or {n 50}}] + (loop [current-handling-block-ops nil + current-handling-block-uuid nil + depend-on-block-uuid-coll nil + r {}] + (cond + (and (empty? current-handling-block-ops) + (empty? depend-on-block-uuid-coll) + (>= (count r) n)) + r + + (and (empty? current-handling-block-ops) + (empty? depend-on-block-uuid-coll)) + (if-let [{min-epoch-block-ops :ops block-uuid :block-uuid} (op-mem-layer/get-min-epoch-block-ops repo)] + (do (assert (not (contains? r block-uuid)) {:r r :block-uuid block-uuid}) + (op-mem-layer/remove-block-ops! repo block-uuid) + (recur min-epoch-block-ops block-uuid depend-on-block-uuid-coll r)) + ;; finish + r) + + (and (empty? current-handling-block-ops) + (seq depend-on-block-uuid-coll)) + (let [[block-uuid & other-block-uuids] depend-on-block-uuid-coll + block-ops (op-mem-layer/get-block-ops repo block-uuid)] + (op-mem-layer/remove-block-ops! repo block-uuid) + (recur block-ops block-uuid other-block-uuids r)) + + (seq current-handling-block-ops) + (let [{:keys [remote-ops depend-on-block-uuids]} + (local-block-ops->remote-ops repo conn user-uuid current-handling-block-ops)] + (recur nil nil + (set/union (set depend-on-block-uuid-coll) + (op-mem-layer/intersection-block-uuids repo depend-on-block-uuids)) + (assoc r current-handling-block-uuid (into {} remote-ops))))))) + +(defn- merge-remove-remove-ops + [remote-remove-ops] + (when-let [block-uuids (->> remote-remove-ops + (mapcat (fn [[_ {:keys [block-uuids]}]] block-uuids)) + distinct + seq)] + [[:remove {:block-uuids block-uuids}]])) + +(defn- sort-remote-ops + [block-uuid->remote-ops] + (let [block-uuid->dep-uuid + (into {} + (keep (fn [[block-uuid remote-ops]] + (when-let [move-op (get remote-ops :move)] + [block-uuid (:target-uuid move-op)]))) + block-uuid->remote-ops) + all-move-uuids (set (keys block-uuid->dep-uuid)) + sorted-uuids + (loop [r [] + rest-uuids all-move-uuids + uuid (first rest-uuids)] + (if-not uuid + r + (let [dep-uuid (block-uuid->dep-uuid uuid)] + (if-let [next-uuid (get rest-uuids dep-uuid)] + (recur r rest-uuids next-uuid) + (let [rest-uuids* (disj rest-uuids uuid)] + (recur (conj r uuid) rest-uuids* (first rest-uuids*))))))) + sorted-move-ops (keep + (fn [block-uuid] + (some->> (get-in block-uuid->remote-ops [block-uuid :move]) + (vector :move))) + sorted-uuids) + remove-ops (merge-remove-remove-ops + (keep + (fn [[_ remote-ops]] + (some->> (:remove remote-ops) (vector :remove))) + block-uuid->remote-ops)) + update-ops (keep + (fn [[_ remote-ops]] + (some->> (:update remote-ops) (vector :update))) + block-uuid->remote-ops) + update-page-ops (keep + (fn [[_ remote-ops]] + (some->> (:update-page remote-ops) (vector :update-page))) + block-uuid->remote-ops) + remove-page-ops (keep + (fn [[_ remote-ops]] + (some->> (:remove-page remote-ops) (vector :remove-page))) + block-uuid->remote-ops)] + (concat update-page-ops remove-ops sorted-move-ops update-ops remove-page-ops))) + +(defn create-push-local-ops-task + "Return a task: push local updates" + [repo conn user-uuid graph-uuid date-formatter get-mws-task add-log-fn] + (m/sp + (when-let [ops-for-remote (rtc-const/to-ws-ops-decoder + (sort-remote-ops + (gen-block-uuid->remote-ops repo conn user-uuid)))] + (op-mem-layer/new-branch! repo) + (let [local-tx (op-mem-layer/get-local-tx repo) + r (m/? (send&recv get-mws-task {:action "apply-ops" :graph-uuid graph-uuid + :ops ops-for-remote :t-before (or local-tx 1)}))] + (if-let [remote-ex (:ex-data r)] + (do (add-log-fn remote-ex) + (case (:type remote-ex) + ;; - :graph-lock-failed + ;; conflict-update remote-graph, keep these local-pending-ops + ;; and try to send ops later + :graph-lock-failed + (do (op-mem-layer/rollback! repo) + nil) + ;; - :graph-lock-missing + ;; this case means something wrong in remote-graph data, + ;; nothing to do at client-side + :graph-lock-missing + (do (op-mem-layer/rollback! repo) + (throw r.ex/ex-remote-graph-lock-missing)) + ;; TODO: support read s3-obj when websocket return specific data + :get-s3-object-failed + (do (op-mem-layer/rollback! repo) + nil) + ;; else + (do (op-mem-layer/rollback! repo) + (throw (ex-info "Unavailable" {:remote-ex remote-ex}))))) + + (do (assert (pos? (:t r)) r) + (op-mem-layer/commit! repo) + (r.remote-update/apply-remote-update repo conn date-formatter r add-log-fn) + (add-log-fn {:type ::push-client-updates :remote-t (:t r)}))))))) diff --git a/src/main/frontend/worker/rtc/core.cljs b/src/main/frontend/worker/rtc/core.cljs index b1e3cb27057..01af1bcf796 100644 --- a/src/main/frontend/worker/rtc/core.cljs +++ b/src/main/frontend/worker/rtc/core.cljs @@ -30,7 +30,8 @@ [logseq.outliner.transaction :as outliner-tx] [malli.core :as m] [malli.util :as mu] - [frontend.worker.rtc.ws2])) + [frontend.worker.rtc.ws2] + [frontend.worker.rtc.core2])) ;; +-------------+ ;; | | diff --git a/src/main/frontend/worker/rtc/core2.cljs b/src/main/frontend/worker/rtc/core2.cljs new file mode 100644 index 00000000000..09b6b25c14b --- /dev/null +++ b/src/main/frontend/worker/rtc/core2.cljs @@ -0,0 +1,119 @@ +(ns frontend.worker.rtc.core2 + "Main(use missionary) ns for rtc related fns" + (:require [frontend.worker.rtc.client :as r.client] + [frontend.worker.rtc.remote-update :as r.remote-update] + [frontend.worker.rtc.ws2 :as ws] + [frontend.worker.state :as worker-state] + [goog.string :as gstring] + [logseq.common.missionary-util :as c.m] + [malli.core :as ma] + [missionary.core :as m])) + +(def ^:private rtc-state-schema + [:map + [:ws-state [:enum :open :connecting :cancelled]]]) +(def ^:private rtc-state-validator (ma/validator rtc-state-schema)) + +(defn- get-ws-url + [token] + (gstring/format @worker-state/*rtc-ws-url token)) + +(def ^:private sentinel (js-obj)) +(defn get-remote-updates + "Return a flow: receive messages from mws, and filter messages with :req-id=`push-updates`." + [get-mws-task] + (m/stream + (m/ap + (loop [] + (let [mws (m/? get-mws-task) + x (try + (m/?> (m/eduction + (filter (fn [data] (= "push-updates" (:req-id data)))) + (ws/recv-flow mws))) + (catch js/CloseEvent _ + sentinel))] + (if (identical? x sentinel) + (recur) + (m/amb x (recur)))))))) + +(defn- create-local-updates-check-flow + "Return a flow" + [*auto-push? interval-ms] + (let [auto-push-flow (m/watch *auto-push?) + clock-flow (c.m/clock interval-ms :clock) + merge-flow (m/latest vector auto-push-flow clock-flow)] + (m/eduction (filter first) + (map second) + merge-flow))) + +(comment + (def *push (atom true)) + (def f (create-local-updates-check-flow *push 2000)) + (def cancel ((m/reduce (fn [_ v] (prn :v v) v) f) #(js/console.log :s %) #(js/console.log :f %))) + (reset! *push not) + (cancel)) + +(defn- create-mixed-flow + "Return a flow that emits all kinds of events: + `:remote-update`: remote-updates data from server + `:local-update-check`: event to notify to check if there're some new local-updates, then push to remote." + [get-mws-task *auto-push?] + (let [remote-updates-flow (m/eduction + (map (fn [data] {:type :remote-update :value data})) + (get-remote-updates get-mws-task)) + local-updates-check-flow (m/eduction + (map (fn [data] {:type :local-update-check :value data})) + (create-local-updates-check-flow *auto-push? 2000))] + (c.m/mix remote-updates-flow local-updates-check-flow))) + +(defn- wrap-set-rtc-ws-state + "Return a task" + [get-mws-task set-state-fn] + (m/sp + (let [mws (m/? (m/race + (m/sp (m/? (m/sleep 100)) + (set-state-fn :ws-state :connecting) + (m/? m/never)) + get-mws-task))] + (set-state-fn :ws-state :open) + mws))) + +(def send&recv r.client/send&recv) + +(defn create-rtc-loop + "Return a map with [:rtc-log-flow :*rtc-state :rtc-loop-task :*rtc-auto-push?] + TODO: auto refresh token if needed" + [user-uuid graph-uuid repo conn date-formatter token & {:keys [auto-push?] :or {auto-push? true}}] + (let [ws-url (get-ws-url token) + *auto-push? (atom auto-push?) + *log (atom nil) + add-log-fn #(reset! *log [(js/Date.) %]) + rtc-log-flow (m/buffer 100 (m/watch *log)) + *rtc-state (atom {} :validator rtc-state-validator) + set-state-fn (fn [k v] (swap! *rtc-state assoc k v)) + get-mws-task (wrap-set-rtc-ws-state + (r.client/ensure-register-graph-updates + (ws/get-mws-create ws-url) + graph-uuid) + set-state-fn) + mixed-flow (create-mixed-flow get-mws-task *auto-push?)] + {:rtc-log-flow rtc-log-flow + :*rtc-state *rtc-state + :*rtc-auto-push? *auto-push? + :rtc-loop-task + (m/sp + ;; init run to open a ws + (m/? get-mws-task) + (->> + (let [event (m/?> mixed-flow)] + (case (:type event) + :remote-update + (r.remote-update/apply-remote-update repo conn date-formatter event add-log-fn) + + :local-update-check + (m/? (r.client/create-push-local-ops-task + repo conn user-uuid graph-uuid date-formatter + get-mws-task add-log-fn)))) + (m/ap) + (m/reduce {}) + (m/?)))})) diff --git a/src/main/frontend/worker/rtc/exception.cljs b/src/main/frontend/worker/rtc/exception.cljs new file mode 100644 index 00000000000..456bb56dea0 --- /dev/null +++ b/src/main/frontend/worker/rtc/exception.cljs @@ -0,0 +1,11 @@ +(ns frontend.worker.rtc.exception + "Exception list") + +(def ex-remote-graph-not-exist + (ex-info "remote graph not exist" {:type ::remote-graph-not-exist})) + +(def ex-remote-graph-not-ready + (ex-info "remote graph still creating" {:type ::remote-graph-not-ready})) + +(def ex-remote-graph-lock-missing + (ex-info "remote graph lock missing(server error)" {:type ::remote-graph-lock-missing})) diff --git a/src/main/frontend/worker/rtc/remote_update.cljs b/src/main/frontend/worker/rtc/remote_update.cljs new file mode 100644 index 00000000000..e2f00860d17 --- /dev/null +++ b/src/main/frontend/worker/rtc/remote_update.cljs @@ -0,0 +1,561 @@ +(ns frontend.worker.rtc.remote-update + "Fns about applying remote updates" + (:require [cljs-time.coerce :as tc] + [cljs-time.core :as t] + [clojure.set :as set] + [clojure.string :as string] + [cognitect.transit :as transit] + [datascript.core :as d] + [frontend.schema-register :as sr] + [frontend.worker.batch-tx :as batch-tx] + [frontend.worker.handler.page :as worker-page] + [frontend.worker.handler.page.rename :as worker-page-rename] + [frontend.worker.rtc.const :as rtc-const] + [frontend.worker.rtc.op-mem-layer :as op-mem-layer] + [frontend.worker.state :as worker-state] + [frontend.worker.util :as worker-util] + [logseq.common.util :as common-util] + [logseq.db :as ldb] + [logseq.db.frontend.content :as db-content] + [logseq.db.frontend.property :as db-property] + [logseq.graph-parser.whiteboard :as gp-whiteboard] + [logseq.outliner.core :as outliner-core] + [logseq.outliner.transaction :as outliner-tx])) + +(sr/defkeyword ::need-pull-remote-data + "remote-update's :remote-t-before > :local-tx, + so need to pull earlier remote-data from websocket.") + +(def ^:private transit-r (transit/reader :json)) + +(defmulti ^:private transact-db! (fn [action & _args] action)) + +(defmethod transact-db! :delete-blocks [_ & args] + (outliner-tx/transact! + {:persist-op? false + :gen-undo-ops? false + :outliner-op :delete-blocks + :transact-opts {:repo (first args) + :conn (second args)}} + (apply outliner-core/delete-blocks! args))) + +(defmethod transact-db! :move-blocks [_ & args] + (outliner-tx/transact! + {:persist-op? false + :gen-undo-ops? false + :outliner-op :move-blocks + :transact-opts {:repo (first args) + :conn (second args)}} + (apply outliner-core/move-blocks! args))) + +(defmethod transact-db! :move-blocks&persist-op [_ & args] + (outliner-tx/transact! + {:persist-op? true + :gen-undo-ops? false + :outliner-op :move-blocks + :transact-opts {:repo (first args) + :conn (second args)}} + (apply outliner-core/move-blocks! args))) + +(defmethod transact-db! :insert-blocks [_ & args] + (outliner-tx/transact! + {:persist-op? false + :gen-undo-ops? false + :outliner-op :insert-blocks + :transact-opts {:repo (first args) + :conn (second args)}} + (apply outliner-core/insert-blocks! args))) + +(defmethod transact-db! :insert-no-order-blocks [_ conn block-uuids] + (ldb/transact! conn + (mapv (fn [block-uuid] + ;; add block/content block/format to satisfy the normal-block schema + {:block/uuid block-uuid + ;; NOTE: block without :block/left + ;; must be `logseq.db.frontend.malli-schema.closed-value-block` + :block/type #{"closed value"}}) + block-uuids) + {:persist-op? false + :gen-undo-ops? false})) + +(defmethod transact-db! :save-block [_ & args] + (outliner-tx/transact! + {:persist-op? false + :gen-undo-ops? false + :outliner-op :save-block + :transact-opts {:repo (first args) + :conn (second args)}} + (apply outliner-core/save-block! args))) + +(defmethod transact-db! :delete-whiteboard-blocks [_ conn block-uuids] + (ldb/transact! conn + (mapv (fn [block-uuid] [:db/retractEntity [:block/uuid block-uuid]]) block-uuids) + {:persist-op? false + :gen-undo-ops? false})) + +(defmethod transact-db! :upsert-whiteboard-block [_ conn blocks] + (ldb/transact! conn blocks {:persist-op? false + :gen-undo-ops? false})) + +(defn- whiteboard-page-block? + [block] + (contains? (set (:block/type block)) "whiteboard")) + +(defn- group-remote-remove-ops-by-whiteboard-block + "return {true [], false []}" + [db remote-remove-ops] + (group-by (fn [{:keys [block-uuid]}] + (boolean + (when-let [block (d/entity db [:block/uuid block-uuid])] + (whiteboard-page-block? (:block/parent block))))) + remote-remove-ops)) + +(defn- apply-remote-remove-ops-helper + [conn remove-ops] + (let [block-uuid->entity (into {} + (keep + (fn [op] + (when-let [block-uuid (:block-uuid op)] + (when-let [ent (d/entity @conn [:block/uuid block-uuid])] + [block-uuid ent]))) + remove-ops)) + block-uuid-set (set (keys block-uuid->entity)) + block-uuids-need-move + (set + (mapcat + (fn [[_block-uuid ent]] + (set/difference (set (map :block/uuid (:block/_parent ent))) block-uuid-set)) + block-uuid->entity))] + {:block-uuids-need-move block-uuids-need-move + :block-uuids-to-remove block-uuid-set})) + +(defn- apply-remote-remove-ops + [repo conn date-formatter remove-ops] + (let [{whiteboard-block-ops true other-ops false} (group-remote-remove-ops-by-whiteboard-block @conn remove-ops)] + (transact-db! :delete-whiteboard-blocks conn (map :block-uuid whiteboard-block-ops)) + + (let [{:keys [block-uuids-need-move block-uuids-to-remove]} + (apply-remote-remove-ops-helper conn other-ops)] + ;; move to page-block's first child + (doseq [block-uuid block-uuids-need-move] + (transact-db! :move-blocks&persist-op + repo conn + [(d/entity @conn [:block/uuid block-uuid])] + (d/entity @conn (:db/id (:block/page (d/entity @conn [:block/uuid block-uuid])))) + false)) + (doseq [block-uuid block-uuids-to-remove] + (transact-db! :delete-blocks + repo conn date-formatter + [(d/entity @conn [:block/uuid block-uuid])] + {}))))) + +(defn- insert-or-move-block + [repo conn block-uuid remote-parents remote-left-uuid move? op-value] + (when (seq remote-parents) + (let [first-remote-parent (first remote-parents) + local-parent (d/entity @conn [:block/uuid first-remote-parent]) + whiteboard-page-block? (whiteboard-page-block? local-parent) + ;; when insert blocks in whiteboard, local-left is ignored + ;; remote-left-uuid is nil when it's :no-order block + local-left (when-not whiteboard-page-block? + (when remote-left-uuid + (d/entity @conn [:block/uuid remote-left-uuid]))) + b (d/entity @conn [:block/uuid block-uuid])] + (case [whiteboard-page-block? (some? local-parent) (some? local-left) (some? remote-left-uuid)] + [false false true true] + (if move? + (transact-db! :move-blocks repo conn [b] local-left true) + (transact-db! :insert-blocks repo conn + [{:block/uuid block-uuid + :block/content "" + :block/format :markdown}] + local-left {:sibling? true :keep-uuid? true})) + + [false true true true] + (let [sibling? (not= (:block/uuid local-parent) (:block/uuid local-left))] + (if move? + (transact-db! :move-blocks repo conn [b] local-left sibling?) + (transact-db! :insert-blocks repo conn + [{:block/uuid block-uuid :block/content "" + :block/format :markdown}] + local-left {:sibling? sibling? :keep-uuid? true}))) + + [false true false true] + (if move? + (transact-db! :move-blocks repo conn [b] local-parent false) + (transact-db! :insert-blocks repo conn + [{:block/uuid block-uuid :block/content "" + :block/format :markdown}] + local-parent {:sibling? false :keep-uuid? true})) + + [false true false false] + (if move? + (transact-db! :move-blocks repo conn [b] local-parent false) + (transact-db! :insert-no-order-blocks conn [block-uuid])) + + ;; Don't need to insert-whiteboard-block here, + ;; will do :upsert-whiteboard-block in `update-block-attrs` + ([true true false true] [true true false false]) + (when (nil? (:properties op-value)) + ;; when :properties is nil, this block should be treat as normal block + (if move? + (transact-db! :move-blocks repo conn [b] local-parent false) + (transact-db! :insert-blocks repo conn [{:block/uuid block-uuid :block/content "" :block/format :markdown}] + local-parent {:sibling? false :keep-uuid? true}))) + ([true true true true] [true true true false]) + (when (nil? (:properties op-value)) + (let [sibling? (not= (:block/uuid local-parent) (:block/uuid local-left))] + (if move? + (transact-db! :move-blocks repo conn [b] local-left sibling?) + (transact-db! :insert-blocks repo conn [{:block/uuid block-uuid :block/content "" :block/format :markdown}] + local-left {:sibling? sibling? :keep-uuid? true})))) + + (throw (ex-info "Don't know where to insert" {:block-uuid block-uuid :remote-parents remote-parents + :remote-left remote-left-uuid})))))) + +(defn- move-ops-map->sorted-move-ops + [move-ops-map] + (let [uuid->dep-uuids (into {} (map (fn [[uuid env]] [uuid (set (conj (:parents env) (:left env)))]) move-ops-map)) + all-uuids (set (keys move-ops-map)) + sorted-uuids + (loop [r [] + rest-uuids all-uuids + uuid (first rest-uuids)] + (if-not uuid + r + (let [dep-uuids (uuid->dep-uuids uuid)] + (if-let [next-uuid (first (set/intersection dep-uuids rest-uuids))] + (recur r rest-uuids next-uuid) + (let [rest-uuids* (disj rest-uuids uuid)] + (recur (conj r uuid) rest-uuids* (first rest-uuids*)))))))] + (mapv move-ops-map sorted-uuids))) + +(defn- apply-remote-remove-page-ops + [repo conn remove-page-ops] + (doseq [op remove-page-ops] + (when-let [page-name (:block/name (d/entity @conn [:block/uuid (:block-uuid op)]))] + (worker-page/delete! repo conn page-name {:persist-op? false})))) + +(defn- filter-remote-data-by-local-unpushed-ops + "when remote-data request client to move/update/remove/... blocks, + these updates maybe not needed, because this client just updated some of these blocks, + so we need to filter these just-updated blocks out, according to the unpushed-local-ops" + [affected-blocks-map local-unpushed-ops] + ;; (assert (op-mem-layer/ops-coercer local-unpushed-ops) local-unpushed-ops) + (reduce + (fn [affected-blocks-map local-op] + (case (first local-op) + "move" + (let [block-uuid (:block-uuid (second local-op)) + remote-op (get affected-blocks-map block-uuid)] + (case (:op remote-op) + :remove (dissoc affected-blocks-map (:block-uuid remote-op)) + :move (dissoc affected-blocks-map (:self remote-op)) + ;; default + affected-blocks-map)) + + "update" + (let [block-uuid (:block-uuid (second local-op)) + local-updated-attr-set (set (keys (:updated-attrs (second local-op))))] + (if-let [remote-op (get affected-blocks-map block-uuid)] + (assoc affected-blocks-map block-uuid + (if (#{:update-attrs :move} (:op remote-op)) + (apply dissoc remote-op local-updated-attr-set) + remote-op)) + affected-blocks-map)) + ;;else + affected-blocks-map)) + affected-blocks-map local-unpushed-ops)) + +(defn- affected-blocks->diff-type-ops + [repo affected-blocks] + (let [unpushed-ops (op-mem-layer/get-all-ops repo) + affected-blocks-map* (if unpushed-ops + (filter-remote-data-by-local-unpushed-ops + affected-blocks unpushed-ops) + affected-blocks) + {remove-ops-map :remove move-ops-map :move update-ops-map :update-attrs + update-page-ops-map :update-page remove-page-ops-map :remove-page} + (update-vals + (group-by (fn [[_ env]] (get env :op)) affected-blocks-map*) + (partial into {}))] + {:remove-ops-map remove-ops-map + :move-ops-map move-ops-map + :update-ops-map update-ops-map + :update-page-ops-map update-page-ops-map + :remove-page-ops-map remove-page-ops-map})) + +(defn- empty-page? + "1. page has no child-block + 2. page has child-blocks and all these blocks only have empty :block/content" + [page-entity] + (not + (when-let [children-blocks (and page-entity + (seq (map #(into {} %) (:block/_parent page-entity))))] + (some + (fn [block] + (not= {:block/content ""} + (-> (apply dissoc block [:block/tx-id + :block/uuid + :block/updated-at + :block/left + :block/created-at + :block/format + :db/id + :block/parent + :block/page + :block/path-refs]) + (update :block/content string/trim)))) + children-blocks)))) + +(defn- check-block-pos + "NOTE: some blocks don't have :block/left (e.g. whiteboard blocks)" + [db block-uuid remote-parents remote-left-uuid] + (let [local-b (d/entity db [:block/uuid block-uuid]) + remote-parent-uuid (first remote-parents)] + (cond + (nil? local-b) + :not-exist + + (not= [remote-left-uuid remote-parent-uuid] + [(:block/uuid (:block/left local-b)) (:block/uuid (:block/parent local-b))]) + :wrong-pos + + :else nil))) + +(defn- upsert-whiteboard-block + [repo conn {:keys [parents properties] :as _op-value}] + (let [db @conn + first-remote-parent (first parents)] + (when-let [local-parent (d/entity db [:block/uuid first-remote-parent])] + (let [page-name (:block/name local-parent) + properties* (transit/read transit-r properties) + shape-property-id (db-property/get-pid repo db :logseq.property.tldraw/shape) + shape (and (map? properties*) + (get properties* shape-property-id))] + (assert (some? page-name) local-parent) + (assert (some? shape) properties*) + (transact-db! :upsert-whiteboard-block conn [(gp-whiteboard/shape->block repo db shape page-name)]))))) + +(defn- need-update-block? + [conn block-uuid op-value] + (let [ent (d/entity @conn [:block/uuid block-uuid])] + (worker-util/profile + :need-update-block? + (let [r (some (fn [[k v]] + (case k + :content (not= v (:block/raw-content ent)) + :updated-at (not= v (:block/updated-at ent)) + :created-at (not= v (:block/created-at ent)) + :alias (not= (set v) (set (map :block/uuid (:block/alias ent)))) + :type (not= (set v) (set (:block/type ent))) + :schema (not= (transit/read transit-r v) (:block/schema ent)) + :tags (not= (set v) (set (map :block/uuid (:block/tags ent)))) + :properties (not= (transit/read transit-r v) (:block/properties ent)) + :link (not= v (:block/uuid (:block/link ent))) + :journal-day (not= v (:block/journal-day ent)) + false)) + op-value)] + (prn :need-update-block? r) + r)))) + +(defn- update-block-attrs + [repo conn date-formatter block-uuid {:keys [parents properties _content] :as op-value}] + (let [key-set (set/intersection + (conj rtc-const/general-attr-set :content) + (set (keys op-value)))] + (when (seq key-set) + (let [first-remote-parent (first parents) + local-parent (d/entity @conn [:block/uuid first-remote-parent]) + whiteboard-page-block? (whiteboard-page-block? local-parent)] + (cond + (and whiteboard-page-block? properties) + (upsert-whiteboard-block repo conn op-value) + + (need-update-block? conn block-uuid op-value) + (let [b-ent (d/entity @conn [:block/uuid block-uuid]) + db-id (:db/id b-ent) + new-block + (cond-> b-ent + (and (contains? key-set :content) + (not= (:content op-value) + (:block/raw-content b-ent))) + (assoc :block/content + (db-content/db-special-id-ref->page @conn (:content op-value))) + + (contains? key-set :updated-at) (assoc :block/updated-at (:updated-at op-value)) + (contains? key-set :created-at) (assoc :block/created-at (:created-at op-value)) + (contains? key-set :alias) (assoc :block/alias (some->> (seq (:alias op-value)) + (map (partial vector :block/uuid)) + (d/pull-many @conn [:db/id]) + (keep :db/id))) + (contains? key-set :type) (assoc :block/type (:type op-value)) + (and (contains? key-set :schema) + (some? (:schema op-value))) + (assoc :block/schema (transit/read transit-r (:schema op-value))) + + (contains? key-set :tags) (assoc :block/tags (some->> (seq (:tags op-value)) + (map (partial vector :block/uuid)) + (d/pull-many @conn [:db/id]) + (keep :db/id))) + (contains? key-set :properties) (assoc :block/properties + (transit/read transit-r (:properties op-value))) + (and (contains? key-set :link) + (some? (:link op-value))) + (assoc :block/link (some->> (:link op-value) + (vector :block/uuid) + (d/pull @conn [:db/id]) + :db/id)) + + (and (contains? key-set :journal-day) + (some? (:journal-day op-value))) + (assoc :block/journal-day (:journal-day op-value) + :block/journal? true)) + *other-tx-data (atom [])] + ;; 'save-block' dont handle card-many attrs well? + (when (contains? key-set :alias) + (swap! *other-tx-data conj [:db/retract db-id :block/alias])) + (when (contains? key-set :tags) + (swap! *other-tx-data conj [:db/retract db-id :block/tags])) + (when (contains? key-set :type) + (swap! *other-tx-data conj [:db/retract db-id :block/type])) + (when (and (contains? key-set :link) (nil? (:link op-value))) + (swap! *other-tx-data conj [:db/retract db-id :block/link])) + (when (and (contains? key-set :schema) (nil? (:schema op-value))) + (swap! *other-tx-data conj [:db/retract db-id :block/schema])) + (when (and (contains? key-set :properties) (nil? (:properties op-value))) + (swap! *other-tx-data conj [:db/retract db-id :block/properties])) + (when (and (contains? key-set :journal-day) (nil? (:journal-day op-value))) + (swap! *other-tx-data conj + [:db/retract db-id :block/journal-day] + [:db/retract db-id :block/journal?])) + (when (seq @*other-tx-data) + (ldb/transact! conn @*other-tx-data {:persist-op? false + :gen-undo-ops? false})) + (transact-db! :save-block repo conn date-formatter new-block))))))) + +(defn- apply-remote-update-ops + [repo conn date-formatter update-ops] + (doseq [{:keys [parents left self] :as op-value} update-ops] + (when (and parents left) + (let [r (check-block-pos @conn self parents left)] + (case r + :not-exist + (insert-or-move-block repo conn self parents left false op-value) + :wrong-pos + (insert-or-move-block repo conn self parents left true op-value) + nil))) + (update-block-attrs repo conn date-formatter self op-value))) + +(defn- move-all-blocks-to-another-page + [repo conn from-page-name to-page-name] + (let [blocks (ldb/get-page-blocks @conn from-page-name {}) + from-page-block (some-> (first blocks) :block/page) + target-page-block (d/entity @conn [:block/name to-page-name])] + (when (and (seq blocks) target-page-block) + (let [blocks* (ldb/sort-by-left blocks from-page-block)] + (outliner-tx/transact! + {:persist-op? true + :gen-undo-ops? false + :transact-opts {:repo repo + :conn conn}} + (outliner-core/move-blocks! repo conn blocks* target-page-block false)))))) + +(defn- apply-remote-move-ops + [repo conn date-formatter sorted-move-ops] + (doseq [{:keys [parents left self] :as op-value} sorted-move-ops] + (let [r (check-block-pos @conn self parents left)] + (case r + :not-exist + (insert-or-move-block repo conn self parents left false op-value) + :wrong-pos + (insert-or-move-block repo conn self parents left true op-value) + nil ; do nothing + nil) + (update-block-attrs repo conn date-formatter self op-value)))) + +(defn- apply-remote-update-page-ops + [repo conn date-formatter update-page-ops] + (let [config (worker-state/get-config repo)] + (doseq [{:keys [self page-name original-name] :as op-value} update-page-ops] + (let [old-page-original-name (:block/original-name (d/entity @conn [:block/uuid self])) + exist-page (d/entity @conn [:block/name page-name]) + create-opts {:create-first-block? false + :uuid self :persist-op? false}] + (cond + ;; same name but different uuid, and local-existed-page is empty(`empty-page?`) + ;; just remove local-existed-page + (and exist-page + (not= (:block/uuid exist-page) self) + (empty-page? exist-page)) + (do (worker-page/delete! repo conn page-name {:persist-op? false}) + (worker-page/create! repo conn config original-name create-opts)) + + ;; same name but different uuid + ;; remote page has same block/name as local's, but they don't have same block/uuid. + ;; 1. rename local page's name to '--Conflict' + ;; 2. create page, name=, uuid=remote-uuid + (and exist-page + (not= (:block/uuid exist-page) self)) + (let [conflict-page-name (common-util/format "%s-%s-CONFLICT" original-name (tc/to-long (t/now)))] + (worker-page-rename/rename! repo conn config original-name conflict-page-name {:persist-op? false}) + (worker-page/create! repo conn config original-name create-opts) + (move-all-blocks-to-another-page repo conn conflict-page-name original-name)) + + ;; a client-page has same uuid as remote but different page-names, + ;; then we need to rename the client-page to remote-page-name + (and old-page-original-name (not= old-page-original-name original-name)) + (worker-page-rename/rename! repo conn config old-page-original-name original-name {:persist-op? false}) + + ;; no such page, name=remote-page-name, OR, uuid=remote-block-uuid + ;; just create-page + :else + (worker-page/create! repo conn config original-name create-opts)) + + (update-block-attrs repo conn date-formatter self op-value))))) + +(defn apply-remote-update + "Apply remote-update(`remote-update-event`)" + [repo conn date-formatter remote-update-event add-log-fn] + (let [remote-update-data (:value remote-update-event)] + (assert (rtc-const/data-from-ws-validator remote-update-data) remote-update-data) + (let [remote-t (:t remote-update-data) + remote-t-before (:t-before remote-update-data) + local-tx (op-mem-layer/get-local-tx repo)] + (cond + (not (and (pos? remote-t) + (pos? remote-t-before))) + (throw (ex-info "invalid remote-data" {:data remote-update-data})) + + (<= remote-t local-tx) + (add-log-fn {:type ::skip :remote-t remote-t :local-t local-tx}) + + (< local-tx remote-t-before) + (do (add-log-fn {:type ::need-pull-remote-data :remote-t remote-t :local-t local-tx}) + (throw (ex-info "need pull earlier remote-data" + {:type ::need-pull-remote-data + :local-tx local-tx}))) + + (<= remote-t-before local-tx remote-t) + (let [affected-blocks-map (:affected-blocks remote-update-data) + {:keys [remove-ops-map move-ops-map update-ops-map update-page-ops-map remove-page-ops-map]} + (affected-blocks->diff-type-ops repo affected-blocks-map) + remove-ops (vals remove-ops-map) + sorted-move-ops (move-ops-map->sorted-move-ops move-ops-map) + update-ops (vals update-ops-map) + update-page-ops (vals update-page-ops-map) + remove-page-ops (vals remove-page-ops-map)] + + (batch-tx/with-batch-tx-mode conn {:rtc-tx? true} + (js/console.groupCollapsed "rtc/apply-remote-ops-log") + (worker-util/profile :apply-remote-update-page-ops (apply-remote-update-page-ops repo conn date-formatter update-page-ops)) + (worker-util/profile :apply-remote-remove-ops (apply-remote-remove-ops repo conn date-formatter remove-ops)) + (worker-util/profile :apply-remote-move-ops (apply-remote-move-ops repo conn date-formatter sorted-move-ops)) + (worker-util/profile :apply-remote-update-ops (apply-remote-update-ops repo conn date-formatter update-ops)) + (worker-util/profile :apply-remote-remove-page-ops (apply-remote-remove-page-ops repo conn remove-page-ops)) + (js/console.groupEnd)) + + (op-mem-layer/update-local-tx! repo remote-t)) + :else (throw (ex-info "unreachable" {:remote-t remote-t + :remote-t-before remote-t-before + :local-t local-tx})))))) diff --git a/src/main/frontend/worker/rtc/ws2.cljs b/src/main/frontend/worker/rtc/ws2.cljs index 9763e3d4b7b..937e08dc8d0 100644 --- a/src/main/frontend/worker/rtc/ws2.cljs +++ b/src/main/frontend/worker/rtc/ws2.cljs @@ -2,7 +2,6 @@ "Websocket wrapped by missionary. based on https://github.com/ReilySiegel/missionary-websocket/blob/master/src/com/reilysiegel/missionary/websocket.cljs" - {:clj-kondo/ignore true} (:require [frontend.worker.rtc.const :as rtc-const] [logseq.common.missionary-util :as c.m] [missionary.core :as m])) @@ -54,35 +53,32 @@ (defn- create-mws* [url] (m/sp - (if-let [[mbx ws close-dfv] (m/? (open-ws-task url))] - {:raw-ws ws - :send (fn [data] - (m/sp - (handle-close - (m/? - (m/race close-dfv - (m/sp (while (< 4096 (.-bufferedAmount ws)) - (m/? (m/sleep 50))) - (.send ws data))))))) - :recv-flow - (m/stream - (m/ap - (loop [] - (m/amb - (handle-close - (m/? (m/race close-dfv mbx))) - (recur)))))} - (throw (ex-info "open ws timeout(10s)" {:missionary/retry true}))))) - - + (if-let [[mbx ws close-dfv] (m/? (open-ws-task url))] + {:raw-ws ws + :send (fn [data] + (m/sp + (handle-close + (m/? + (m/race close-dfv + (m/sp (while (< 4096 (.-bufferedAmount ws)) + (m/? (m/sleep 50))) + (.send ws data))))))) + :recv-flow + (m/stream + (m/ap + (loop [] + (m/amb + (handle-close + (m/? (m/race close-dfv mbx))) + (recur)))))} + (throw (ex-info "open ws timeout(10s)" {:missionary/retry true}))))) (defn- closed? [m-ws] (contains? #{:closing :closed} (get-state (:raw-ws m-ws)))) - (defn get-mws-create - "Returns a task :get a mws(missionary-websocket), creating one if needed. + "Returns a task to get a mws(missionary-websocket), creating one if needed. Always try to produce NOT-closed websocket. When failed to open websocket, retry with backoff. TODO: retry ASAP once network condition changed" @@ -109,10 +105,10 @@ (if (and m-ws (not (closed? m-ws))) m-ws (m/? backoff-create-ws-task)))))) - -(defn close - [m-ws] - (.close (:raw-ws m-ws))) +(comment + (defn close + [m-ws] + (.close (:raw-ws m-ws)))) (defn send "Returns a task: send message and return mws" @@ -159,7 +155,4 @@ (m/? (send&recv get-mws-task {:action "list-graphs"} :timeout-ms 1000)) (m/? (send&recv get-mws-task {:action "list-graphs"}))) #(prn :s %) #(js/console.log :f %))) - (cancel) - ) - -) + (cancel)))