Skip to content

Commit

Permalink
enh(y-websocket): always send full diff to server state
Browse files Browse the repository at this point in the history
Keep an internal ydoc tracking updates that came from the server.
Send updates, that would sync this doc with the current doc state.

Signed-off-by: Max <[email protected]>
  • Loading branch information
max-nextcloud committed Nov 14, 2024
1 parent 0f54bc0 commit 2cfb371
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/helpers/yjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export function applyUpdateMessage(ydoc, updateMessage, origin = 'origin') {
export function getSteps(queue) {
return queue.map(s => encodeArrayBuffer(s))
.filter(s => s < 'AQ')
.slice(-1)
}

/**
Expand Down
32 changes: 28 additions & 4 deletions src/services/y-websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,31 @@ messageHandlers[messageSync] = (
_messageType,
) => {
encoding.writeVarUint(encoder, messageSync)
const decoderForRemote = decoding.clone(decoder)
const syncMessageType = syncProtocol.readSyncMessage(
decoder,
encoder,
provider.doc,
provider,
)
// Message came from the broadcast channel
// Do not track in this.remote and do not emit sync.
if (!emitSynced) {
return
}
if (
emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2
syncMessageType === syncProtocol.messageYjsSyncStep2
|| syncMessageType === syncProtocol.messageYjsUpdate
) {
syncProtocol.readSyncMessage(
decoderForRemote,
encoding.createEncoder(),
provider.remote,
provider,
)
}
if (
syncMessageType === syncProtocol.messageYjsSyncStep2
&& !provider.synced
) {
provider.synced = true
Expand Down Expand Up @@ -289,6 +306,10 @@ export class WebsocketProvider extends Observable {
this.disableBc = disableBc
this.wsUnsuccessfulReconnects = 0
this.messageHandlers = messageHandlers.slice()
/**
* @type {Y.Doc}
*/
this.remote = new Y.Doc()
/**
* @type {boolean}
*/
Expand Down Expand Up @@ -334,14 +355,17 @@ export class WebsocketProvider extends Observable {
}
/**
* Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel)
* @param {Uint8Array} update
* @param {Uint8Array} _update
* @param {any} origin
* @param {Y.Doc} doc
*/
this._updateHandler = (update, origin) => {
this._updateHandler = (_update, origin, doc) => {
if (origin !== this) {
const from = Y.encodeStateVector(this.remote)
const fullUpdate = Y.encodeStateAsUpdate(doc, from)
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, update)
syncProtocol.writeUpdate(encoder, fullUpdate)
broadcastMessage(this, encoding.toUint8Array(encoder))
}
}
Expand Down

0 comments on commit 2cfb371

Please sign in to comment.