From 9d3642201c5b16668c61aed274243033cf844f63 Mon Sep 17 00:00:00 2001 From: Moritz Langenstein <ml5717@ic.ac.uk> Date: Fri, 3 Jan 2020 09:27:34 +0000 Subject: [PATCH] (ml5717) Parameterise wasm fetch interval, sync done now implicit in sync step 2 --- __benchmarks__/benchmarks.js | 33 +--------- src/app.js | 4 +- src/room.js | 46 ++++++++------ src/wasm-crdt.js | 114 +++-------------------------------- src/y-crdt.js | 110 +++------------------------------ 5 files changed, 49 insertions(+), 258 deletions(-) diff --git a/__benchmarks__/benchmarks.js b/__benchmarks__/benchmarks.js index 2b4b264..9135459 100644 --- a/__benchmarks__/benchmarks.js +++ b/__benchmarks__/benchmarks.js @@ -2,15 +2,12 @@ import { test, equal, ok, notOk } from "zora" import chalk from "chalk" -import MessagePack from "what-the-pack" -const { decode } = MessagePack.initialize(2 ** 11) - import { connect } from "../src/room.js" //import CRDT, { benchmark } from "../src/wasm-crdt.js" import CRDT, { benchmark } from "../src/y-crdt.js" -const { blocksize, eventsGC, syncStep1, syncDone } = benchmark +const { blocksize, eventsGC, syncStep1 } = benchmark import MockConnection, { userID, @@ -1118,30 +1115,6 @@ function eraseOnEventGroupParallel( resolve() } -function syncOnSendGroupVerify(syncPackets, resolve) { - let syncDonePacketIndex = -1 - - const syncDonePacket = decode(MessagePack.Buffer.from(syncDone)) - - syncPackets.forEach((packet, i) => { - packet = decode(MessagePack.Buffer.from(packet)) - - if ( - packet.message.length == syncDonePacket.message.length && - stringify(Object.assign({}, packet, { uuid: undefined })) == - stringify(Object.assign({}, syncDonePacket, { uuid: undefined })) - ) { - equal(syncDonePacketIndex, -1) - - syncDonePacketIndex = i - } - }) - - equal(syncDonePacketIndex, syncPackets.length - 1) - - resolve() -} - function addOnEventGroupVerify( addPackets, ITERATIONS, @@ -1273,7 +1246,7 @@ test("benchmark", async (t) => { eraseOnBroadcastGroupParallel /* eraseOnBroadcastGroup */, ".dot-ver-erase-packets.json" /* erasePacketsFilename */, 1000 /* syncSendGroupTimeout */, - syncOnSendGroupVerify /* syncOnSendGroup */, + syncOnSendGroup /* syncOnSendGroup */, ".dot-ver-sync-packets.json" /* syncPacketsFilename */, addOnInitBackendSequential /* addOnInitBackend */, 1000 /* addEventGroupTimeout */, @@ -1387,7 +1360,7 @@ test("benchmark", async (t) => { eraseOnBroadcastGroupParallel /* eraseOnBroadcastGroup */, ".path-ver-erase-packets.json" /* erasePacketsFilename */, 1000 /* syncSendGroupTimeout */, - syncOnSendGroupVerify /* syncOnSendGroup */, + syncOnSendGroup /* syncOnSendGroup */, ".path-ver-sync-packets.json" /* syncPacketsFilename */, addOnInitBackendSequential /* addOnInitBackend */, 1000 /* addEventGroupTimeout */, diff --git a/src/app.js b/src/app.js index 55bd310..ce24f00 100644 --- a/src/app.js +++ b/src/app.js @@ -357,7 +357,9 @@ function drawRecognized(pathID, points) { } const tryRoomConnect = async (roomID) => { - return await connect(roomID, CRDT, WebRTCConnection) + return await connect(roomID, CRDT, WebRTCConnection, { + wasm: { interval: 16 }, + }) .then(onRoomConnect) .catch((err) => alert(`Error connecting to a room:\n${err}`)) } diff --git a/src/room.js b/src/room.js index 7b32dbe..4c8f258 100644 --- a/src/room.js +++ b/src/room.js @@ -112,27 +112,35 @@ class Room extends EventTarget { } } -export const connect = async (roomName, CRDT, connection) => { +export const connect = async ( + roomName, + CRDT, + connection, + options = undefined, +) => { const room = new Room(roomName) - await CRDT.initialise(room, { - connection, - url: "/", - room: room.name, - mesh: { - minPeers: 4, - maxPeers: 8, - }, - handshake: { - initial: 100, - interval: 500, - }, - heartbeat: { - interval: 500, - minimum: 1000, - timeout: 10000, - }, - }) + await CRDT.initialise( + room, + Object.assign({}, options, { + connection, + url: "/", + room: room.name, + mesh: { + minPeers: 4, + maxPeers: 8, + }, + handshake: { + initial: 100, + interval: 500, + }, + heartbeat: { + interval: 500, + minimum: 1000, + timeout: 10000, + }, + }), + ) return room } diff --git a/src/wasm-crdt.js b/src/wasm-crdt.js index ad5d8a1..4709f54 100644 --- a/src/wasm-crdt.js +++ b/src/wasm-crdt.js @@ -14,7 +14,7 @@ Array.prototype.remove = function(elem) { } export default class WasmCRDTWrapper { - constructor(WasmCRDT, room) { + constructor(WasmCRDT, room, interval) { this.room = room this.mesh = null @@ -56,8 +56,6 @@ export default class WasmCRDTWrapper { true, ) - this.mesh.send(uid, { type: "sync done" }, false) - this.room.dispatchEvent( new CustomEvent("weSyncedWithPeer", { detail: uid }), ) @@ -67,8 +65,7 @@ export default class WasmCRDTWrapper { this.interval = setInterval(() => { this.crdt.fetch_events() this.crdt.fetch_deltas() - // TODO: pass an option here - }, 0) + }, interval) } destroy() { @@ -87,7 +84,11 @@ export default class WasmCRDTWrapper { static async initialise(room, options) { const { WasmCRDT } = await WasmCRDTAsync - room.crdt = new WasmCRDTWrapper(WasmCRDT, room) + room.crdt = new WasmCRDTWrapper( + WasmCRDT, + room, + (options.wasm && options.wasm.interval) || 0, + ) room.crdt.mesh = new P2PMesh(room.crdt, options) } @@ -182,7 +183,7 @@ export default class WasmCRDTWrapper { this.crdt.fetch_deltas_from_state_vector(uid, _message) } else if (type == "sync step 2" && _message instanceof Uint8Array) { this.users.check = this.crdt.apply_deltas(_message) - } else if (type == "sync done") { + if (this.users.syncing == uid) { this.users.syncing = null @@ -366,103 +367,4 @@ export const benchmark = { 100, 194, ), - syncDone: Uint8Array.of( - 133, - 164, - 117, - 117, - 105, - 100, - 217, - 36, - 54, - 101, - 53, - 97, - 49, - 52, - 50, - 102, - 45, - 98, - 99, - 99, - 102, - 45, - 52, - 49, - 99, - 98, - 45, - 98, - 48, - 99, - 51, - 45, - 49, - 53, - 102, - 97, - 51, - 49, - 98, - 54, - 98, - 100, - 49, - 54, - 167, - 109, - 101, - 115, - 115, - 97, - 103, - 101, - 196, - 16, - 129, - 164, - 116, - 121, - 112, - 101, - 169, - 115, - 121, - 110, - 99, - 32, - 100, - 111, - 110, - 101, - 165, - 115, - 108, - 105, - 99, - 101, - 0, - 166, - 108, - 101, - 110, - 103, - 116, - 104, - 16, - 170, - 99, - 111, - 109, - 112, - 114, - 101, - 115, - 115, - 101, - 100, - 194, - ), } diff --git a/src/y-crdt.js b/src/y-crdt.js index b408fb2..c1d77de 100644 --- a/src/y-crdt.js +++ b/src/y-crdt.js @@ -205,7 +205,10 @@ export default class YjsCRDTWrapper extends Y.AbstractConnector { receiveMessage(uid, message) { super.receiveMessage(uid, message) - if (message && message.type === "sync done") { + if (message && message.type === "sync step 2") { + // We emulate the sync done message as it is not sent + super.receiveMessage(uid, { type: "sync done" }) + this.room.dispatchEvent( new CustomEvent("peerSyncedWithUs", { detail: uid }), ) @@ -238,11 +241,13 @@ export default class YjsCRDTWrapper extends Y.AbstractConnector { new CustomEvent("waitingForSyncStep", { detail: uid }), ) } else if (message.type === "sync done") { - compressed = false - this.room.dispatchEvent( new CustomEvent("weSyncedWithPeer", { detail: uid }), ) + + // We supress the sync done message as it is emulated on receival of sync step 2 + + return } else if (message.type === "sync check") { compressed = false } @@ -409,103 +414,4 @@ export const benchmark = { 100, 194, ), - syncDone: Uint8Array.of( - 133, - 164, - 117, - 117, - 105, - 100, - 217, - 36, - 101, - 97, - 99, - 101, - 101, - 97, - 99, - 53, - 45, - 49, - 100, - 101, - 52, - 45, - 52, - 51, - 50, - 98, - 45, - 56, - 54, - 55, - 99, - 45, - 101, - 57, - 52, - 100, - 57, - 57, - 98, - 99, - 97, - 99, - 52, - 50, - 167, - 109, - 101, - 115, - 115, - 97, - 103, - 101, - 196, - 16, - 129, - 164, - 116, - 121, - 112, - 101, - 169, - 115, - 121, - 110, - 99, - 32, - 100, - 111, - 110, - 101, - 165, - 115, - 108, - 105, - 99, - 101, - 0, - 166, - 108, - 101, - 110, - 103, - 116, - 104, - 16, - 170, - 99, - 111, - 109, - 112, - 114, - 101, - 115, - 115, - 101, - 100, - 194, - ), } -- GitLab