diff --git a/.gitignore b/.gitignore index 66c5fc96b346fa69bfbbefce0b84adae5e9f1434..323adaf0cfd00df2d7f6c9e9c85c3de616009fbe 100644 --- a/.gitignore +++ b/.gitignore @@ -9,7 +9,6 @@ src/liowebrtc src/rtcpeerconnection src/signalbuddy src/yjs -src/tiny-worker src/drawing-crdt # Temporary benchmark dump files diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d8a38bb3c2930ab4e8d821f90aa41809d6894dbd..7bf1a27421042facbbf19c17c83ee62c247c8cf4 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -22,7 +22,6 @@ submodule_fetch: - src/rtcpeerconnection - src/signalbuddy - src/yjs - - src/tiny-worker - src/drawing-crdt/pkg npm_install_prod: @@ -38,7 +37,6 @@ npm_install_prod: - src/rtcpeerconnection - src/signalbuddy - src/yjs - - src/tiny-worker - src/drawing-crdt/pkg npm_install: @@ -54,7 +52,6 @@ npm_install: - src/rtcpeerconnection - src/signalbuddy - src/yjs - - src/tiny-worker - src/drawing-crdt/pkg format_check: diff --git a/.gitmodules b/.gitmodules index 568fd96df5b220ae0534a1ce7d538bee95835282..21732462748f75ea453040276f3be91637ea88b0 100644 --- a/.gitmodules +++ b/.gitmodules @@ -10,9 +10,6 @@ [submodule "src/yjs"] path = src/yjs url = git@gitlab.doc.ic.ac.uk:sweng-group-15/yjs.git -[submodule "src/tiny-worker"] - path = src/tiny-worker - url = git@gitlab.doc.ic.ac.uk:sweng-group-15/tiny-worker.git [submodule "src/drawing-crdt"] path = src/drawing-crdt url = git@gitlab.doc.ic.ac.uk:sweng-group-15/drawing-crdt.git diff --git a/package-lock.json b/package-lock.json index b66d0a859832b123ccd00d31fc8439b64af48fbf..82226db7a931df2c2478ee670c82dad165d0b60e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4083,12 +4083,6 @@ "integrity": "sha512-8y9YjtM1JBJU/A9Kc+SbaOV4y29sSWckBwMHa+FGtVj5gN/sbnKDf6xJUl+8g7FAij9LVaP8C24DUiH/f/2Z9A==", "dev": true }, - "esm": { - "version": "3.2.25", - "resolved": "https://registry.npmjs.org/esm/-/esm-3.2.25.tgz", - "integrity": "sha512-U1suiZ2oDVWv4zPO56S0NcR5QriEahGtdN2OR6FiOG4WJvcjBVFB0qI4+eKoWFH483PKGuLuu6V8Z4T5g63UVA==", - "dev": true - }, "esotope-hammerhead": { "version": "0.5.1", "resolved": "https://registry.npmjs.org/esotope-hammerhead/-/esotope-hammerhead-0.5.1.tgz", @@ -4251,12 +4245,6 @@ "jest-regex-util": "^24.9.0" } }, - "expose-gc": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/expose-gc/-/expose-gc-1.0.0.tgz", - "integrity": "sha512-ecOHrdm+zyOCGIwX18/1RHkUWgxDqGGRiGhaNC+42jReTtudbm2ID/DMa/wpaHwqy5YQHPZvsDqRM2F2iZ0uVA==", - "dev": true - }, "express": { "version": "4.17.1", "resolved": "https://registry.npmjs.org/express/-/express-4.17.1.tgz", @@ -10804,13 +10792,6 @@ "setimmediate": "^1.0.4" } }, - "tiny-worker": { - "version": "file:src/tiny-worker", - "dev": true, - "requires": { - "esm": "^3.2.25" - } - }, "tmp": { "version": "0.0.33", "resolved": "https://registry.npmjs.org/tmp/-/tmp-0.0.33.tgz", @@ -11764,12 +11745,6 @@ "integrity": "sha512-r9S/ZyXu/Xu9q1tYlpsLIsa3EeLXXk0VwlxqTcFRfg9EhMW+17kbt9G0NrgCmhGb5vT2hyhJZLfDGx+7+5Uj/w==", "dev": true }, - "yaeti": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/yaeti/-/yaeti-1.0.2.tgz", - "integrity": "sha512-sc1JByruVRqL6GYdIKbcvYw8PRmYeuwtSd376fM13DNE+JjBh37qIlKjCtqg9mKV2N2+xCfyil3Hd6BXN9W1uQ==", - "dev": true - }, "yallist": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/yallist/-/yallist-3.1.1.tgz", diff --git a/package.json b/package.json index b19b828f1a7325efbdef25d2a9dcbb097f367fc3..8366d079f027ee7c964a47939e601df6dc9318b1 100644 --- a/package.json +++ b/package.json @@ -16,9 +16,9 @@ "build:bench": "webpack --config webpack.bench.js", "watch": "webpack --watch --config webpack.dev.js", "start": "node --experimental-modules src/server.js", - "test": "jest --testPathIgnorePatterns src/liowebrtc src/rtcpeerconnection src/signalbuddy src/yjs src/tiny-worker", - "test-changed": "jest --only-changed --testPathIgnorePatterns src/liowebrtc src/rtcpeerconnection src/signalbuddy src/yjs src/tiny-worker src/drawing-crdt", - "test-coverage": "jest --coverage --testPathIgnorePatterns src/liowebrtc src/rtcpeerconnection src/signalbuddy src/yjs src/tiny-worker src/drawing-crdt", + "test": "jest --testPathIgnorePatterns src/liowebrtc src/rtcpeerconnection src/signalbuddy src/yjs src/drawing-crdt", + "test-changed": "jest --only-changed --testPathIgnorePatterns src/liowebrtc src/rtcpeerconnection src/signalbuddy src/yjs src/drawing-crdt", + "test-coverage": "jest --coverage --testPathIgnorePatterns src/liowebrtc src/rtcpeerconnection src/signalbuddy src/yjs src/drawing-crdt", "benchmarks": "node --experimental-modules __benchmarks__/puppeteer.js | npx tap-summary", "test-e2e:peer1": "testcafe chrome:headless __e2e_tests__/peer1.e2e.js", "test-e2e:peer2": "testcafe chrome:headless __e2e_tests__/peer2.e2e.js", @@ -36,6 +36,7 @@ "array-flat-polyfill": "^1.0.1", "d3-shape": "^1.3.5", "dotenv": "^8.2.0", + "drawing-crdt": "file:src/drawing-crdt/pkg", "express": "^4.17.1", "humanhash": "^1.0.4", "jdenticon": "^2.2.0", @@ -49,8 +50,7 @@ "y-array": "^10.1.4", "y-map": "^10.1.3", "y-memory": "^8.0.9", - "yjs": "file:src/yjs", - "drawing-crdt": "file:src/drawing-crdt/pkg" + "yjs": "file:src/yjs" }, "devDependencies": { "@babel/plugin-transform-modules-commonjs": "^7.6.0", @@ -59,20 +59,17 @@ "eslint": "^6.5.1", "eslint-config-prettier": "^6.5.0", "eslint-plugin-testcafe": "^0.2.1", - "expose-gc": "^1.0.0", "jest": "^24.9.0", "npm-run-all": "^4.1.5", "prettier": "^1.18.2", "puppeteer-core": "^2.0.0", "tap-summary": "^4.0.0", "testcafe": "^1.5.0", - "tiny-worker": "file:src/tiny-worker", "webpack": "^4.41.0", "webpack-bundle-analyzer": "^3.6.0", "webpack-cli": "^3.3.9", "webpack-merge": "^4.2.2", "webpack-preprocessor-loader": "^1.1.2", - "yaeti": "^1.0.2", "zora": "^3.1.8" }, "pre-commit": [ diff --git a/src/app.js b/src/app.js index 0b5087a8c7d29f62f09dd280f702c07ab015e08e..55bd310546c7b71f099f5b39668e370f3d8f2098 100644 --- a/src/app.js +++ b/src/app.js @@ -7,7 +7,10 @@ import * as canvas from "./canvas.js" import * as HTML from "./elements.js" import { computeErasureIntervals } from "./erasure.js" import { connect } from "./room.js" -import WasmCRDT from "./wasm-crdt.js" + +//import CRDT from "./wasm-crdt.js" +import CRDT from "./y-crdt.js" + import WebRTCConnection from "./connection/WebRTC.js" import * as toolSelection from "./tool-selection.js" import recognizeFromPoints, { Shapes } from "./shapes.js" @@ -354,7 +357,7 @@ function drawRecognized(pathID, points) { } const tryRoomConnect = async (roomID) => { - return await connect(roomID, WasmCRDT, WebRTCConnection) + return await connect(roomID, CRDT, WebRTCConnection) .then(onRoomConnect) .catch((err) => alert(`Error connecting to a room:\n${err}`)) } diff --git a/src/p2p-mesh.js b/src/p2p-mesh.js index 573805b4a5242ed65aa8a2a01cc37e1928b6a0bf..2c1a7b3bb3af94045511153f8db09593b08dcea2 100644 --- a/src/p2p-mesh.js +++ b/src/p2p-mesh.js @@ -1,11 +1,5 @@ "use strict" -/* webpack should NOT import Worker */ -// #!if false -import path from "path" -import Worker from "tiny-worker" -// #!endif - export default class P2PMesh { constructor(crdt, options) { if (options === undefined) { @@ -29,12 +23,7 @@ export default class P2PMesh { this.options.heartbeat.minimum = this.options.heartbeat.minimum || 1000 this.options.heartbeat.timeout = this.options.heartbeat.timeout || 10000 - /* webpack should use the packaged queue.js path */ - let workerPath = "js/queue.js" - // #!if false - workerPath = path.join(__dirname, "./queue.js") - // #!endif - this.queue = new Worker(workerPath, [], { esm: true }) + this.queue = new Worker("js/queue.js") this.queue.onmessage = (event) => { if (!this.crdt) { diff --git a/src/queue.js b/src/queue.js index b728c51dace01cd362934861af9efa34730f8afd..c4c2303feb91da1194c58596126a3bb059cd3a2f 100644 --- a/src/queue.js +++ b/src/queue.js @@ -4,8 +4,6 @@ import MessagePack from "what-the-pack" import pako from "pako" import uuidv4 from "uuid/v4" -const UintBuffer = MessagePack.Buffer || Buffer - const MESSAGE_BUFFER_SIZE = 2 ** 25 // 32MB const MESSAGE_SLICE_SIZE = 2 ** 10 // 1KB @@ -99,11 +97,7 @@ onmessage = (event) => { message = pako.inflate(Uint8Array.from(message)) } - if (!UintBuffer.isBuffer(message)) { - message = UintBuffer.from(message) - } - - message = decode(message) + message = decode(MessagePack.Buffer.from(message)) event.data.message = message diff --git a/src/room.js b/src/room.js index e14291c0da096bf1871d4c928f9995ad3bed6a30..7b32dbe251a43dc60540d73e1ec1668a317cae5e 100644 --- a/src/room.js +++ b/src/room.js @@ -1,10 +1,5 @@ import { spreadErasureIntervals, flattenErasureIntervals } from "./erasure.js" -/* webpack should NOT import the yaeti NodeJS polyfill */ -// #!if false -import { EventTarget } from "yaeti" -// #!endif - class Room extends EventTarget { constructor(name) { super() @@ -121,7 +116,6 @@ export const connect = async (roomName, CRDT, connection) => { const room = new Room(roomName) await CRDT.initialise(room, { - name: "p2p-mesh", connection, url: "/", room: room.name, diff --git a/src/tiny-worker b/src/tiny-worker deleted file mode 160000 index 60d95bad97b43bb73dda1ef86b605b1114ae1294..0000000000000000000000000000000000000000 --- a/src/tiny-worker +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 60d95bad97b43bb73dda1ef86b605b1114ae1294 diff --git a/src/y-crdt.js b/src/y-crdt.js index 1f898d753b62d2e497c1cf28f6fb72aa44889d52..8db882d7ecd93af32d9325707ace5e05c4fa215a 100644 --- a/src/y-crdt.js +++ b/src/y-crdt.js @@ -1,3 +1,5 @@ +import P2PMesh from "./p2p-mesh.js" + import uuidv4 from "uuid/v4" import yArray from "y-array" import yMap from "y-map" @@ -5,256 +7,222 @@ import yUnion, { Union } from "./y-union.js" import yMemory from "y-memory" import Y from "yjs" -import yP2PMesh from "./y-p2p-mesh.js" - yMemory(Y) Y.Struct.Union = Union yUnion(Y) yMap(Y) yArray(Y) -yP2PMesh(Y) - -import { spreadErasureIntervals, flattenErasureIntervals } from "./erasure.js" - -/* webpack should NOT import the yaeti NodeJS polyfill */ -// #!if false -import { EventTarget } from "yaeti" -// #!endif - -class Room extends EventTarget { - constructor(name) { - super() - this.name = name - this._y = null - this.ownID = null - this.undoStack = [] - } - disconnect() { - this._y.destroy() +export default class YjsCRDTWrapper extends Y.AbstractConnector { + constructor(y, options) { + if (options === undefined) { + throw new Error("Options must not be undefined!") + } + + options.role = "slave" + super(y, options) + + this.y = y + this.room = null + this.mesh = new P2PMesh(this, options) } - addPath([x, y, w, colour]) { - const id = uuidv4() + _initialise(room) { + this.room = room - this.shared.strokePoints.set(id, Y.Array).push([[x, y, w, colour]]) - this.shared.eraseIntervals.set(id, Y.Union) + super.onUserEvent((event) => { + if (event.action == "userJoined") { + const { user: id } = event + this.room.dispatchEvent(new CustomEvent("userJoin", { detail: id })) + } else if (event.action == "userLeft") { + const { user: id } = event + this.room.dispatchEvent(new CustomEvent("userLeave", { detail: id })) + } + }) - this.undoStack.push([id, 0, 0]) + const dispatchPathUpdateEvent = (lineEvent) => { + const pathID = lineEvent.name + const points = this.room.getPathPoints(pathID) - this.dispatchEvent(new CustomEvent("undoEnabled")) + const detail = { id: pathID, points } - return id - } + this.room.dispatchEvent(new CustomEvent("addOrUpdatePath", { detail })) + } - extendPath(id, [x, y, w, colour]) { - const path = this.shared.strokePoints.get(id) + const dispatchRemovedIntervalsEvent = (lineEvent) => { + const pathID = lineEvent.name + const intervals = this.room.getErasureIntervals(pathID) - path.push([[x, y, w, colour]]) + const detail = { id: pathID, intervals } - if (path.length == 2) { - this.undoStack[this.undoStack.length - 1] = [id, 0, 1] - } else { - this.undoStack.push([id, path.length - 2, path.length - 1]) + this.room.dispatchEvent( + new CustomEvent("removedIntervalsChange", { + detail, + }), + ) } - this.dispatchEvent(new CustomEvent("undoEnabled")) - } + this.y.share.strokePoints.observe((lineEvent) => { + if (lineEvent.type == "add") { + dispatchPathUpdateEvent(lineEvent) - extendErasureIntervals(pathID, pointID, newIntervals) { - this.shared.eraseIntervals - .get(pathID) - .merge(flattenErasureIntervals({ [pointID]: newIntervals })) - } + lineEvent.value.observe((pointEvent) => { + if (pointEvent.type == "insert") { + dispatchPathUpdateEvent(lineEvent) + } + }) + } + }) - replacePath(pathID, newPoints) { - this.fastUndo(true) - newPoints.forEach((point) => this.extendPath(pathID, point)) - this.undoStack.splice(this.undoStack.length - newPoints.length, 1) + this.y.share.eraseIntervals.observe((lineEvent) => { + if (lineEvent.type == "add") { + dispatchRemovedIntervalsEvent(lineEvent) + + lineEvent.value.observe(() => { + dispatchRemovedIntervalsEvent(lineEvent) + }) + } + }) } - undo() { - const operation = this.undoStack.pop() + destroy() { + // yjs connectors have an optional destroy() method that is called on y.destroy() + if (this.mesh == null) return - if (!operation) return + this.mesh.disconnect() + this.mesh = null - const [id, ...interval] = operation + this.y.destroy() + this.y = null - this.shared.eraseIntervals.get(id).merge([interval]) + this.room = null } - fastUndo(forReplacing = false) { - let from = this.undoStack.length - 1 + static async initialise(room, options) { + const y = await Y({ + db: { + name: "memory", + }, + connector: Object.assign({}, options, { name: "y-crdt" }), + share: { + strokePoints: "Map", + eraseIntervals: "Map", + }, + }) - if (from < 0) return + y.connector._initialise(room) - // eslint-disable-next-line no-unused-vars - const [id, _, end] = this.undoStack[from] - const endErasing = forReplacing ? end + 1 : end + room.crdt = y.connector + } - for (; from >= 0; from--) { - if (this.undoStack[from][0] != id) { - from++ - break - } - } + getUserID() { + return this.y.db.userId + } - this.undoStack = this.undoStack.slice(0, Math.max(0, from)) - this.shared.eraseIntervals.get(id).merge([[0, endErasing]]) + setUserID(uid) { + return super.setUserId(uid) } - canUndo() { - return this.undoStack.length > 0 + fetchDrawingEvents() { + // NOOP: twiddle thumbs } - getPaths() { - const paths = new Map() + addPath([x, y, w, colour]) { + const id = uuidv4() - for (const id of this.shared.strokePoints.keys()) { - paths.set(id, this._generatePath(id)) - } + this.y.share.strokePoints.set(id, Y.Array).push([[x, y, w, colour]]) + this.y.share.eraseIntervals.set(id, Y.Union) - return paths + return id } - getErasureIntervals(pathID) { - return this._generateRemovedIntervals(pathID) + extendPath(pathID, [x, y, w, colour]) { + const path = this.y.share.strokePoints.get(pathID) + + path.push([[x, y, w, colour]]) + + return path.length } - getPathPoints(pathID) { - return this._generatePath(pathID) + endPath(/*pathID*/) { + // NOOP: twiddle thumbs } - get shared() { - return this._y.share + extendErasureIntervals(pathID, newIntervals) { + this.y.share.eraseIntervals.get(pathID).merge(newIntervals) } - _generatePath(id) { - const points = this.shared.strokePoints.get(id) + getPathIDs() { + return this.y.share.strokePoints.keys() + } + + getPathPoints(pathID) { + const points = this.y.share.strokePoints.get(pathID) if (!points) return [] return points.toArray() } - _generateRemovedIntervals(id) { - const intervals = this.shared.eraseIntervals.get(id) + getErasureIntervals(pathID) { + const intervals = this.y.share.eraseIntervals.get(pathID) if (!intervals) return [] - return spreadErasureIntervals(intervals.get()) + return intervals.get() } - inviteUser(id) { - this._y.connector.connectToPeer(id) + userJoined(uid) { + super.userJoined(uid, "master") } - async _initialise(connection) { - this._y = await Y({ - db: { - name: "memory", - }, - connector: { - name: "p2p-mesh", - connection, - url: "/", - room: this.name, - mesh: { - minPeers: 4, - maxPeers: 8, - }, - handshake: { - initial: 100, - interval: 500, - }, - heartbeat: { - interval: 500, - minimum: 1000, - timeout: 10000, - }, - onUserEvent: (event) => { - if (event.action == "userConnection") { - const { id, quality } = event - this.dispatchEvent( - new CustomEvent("userConnection", { detail: { id, quality } }), - ) - } else if (event.action == "userID") { - const { user: id } = event - this.ownID = id - this.dispatchEvent(new CustomEvent("allocateOwnID", { detail: id })) - } else if (event.action == "userJoined") { - const { user: id } = event - this.dispatchEvent(new CustomEvent("userJoin", { detail: id })) - } else if (event.action == "userLeft") { - const { user: id } = event - this.dispatchEvent(new CustomEvent("userLeave", { detail: id })) - } else if (event.action === "peerSyncedWithUs") { - const { user: id } = event - this.dispatchEvent( - new CustomEvent("peerSyncedWithUs", { detail: id }), - ) - } else if (event.action === "waitingForSyncStep") { - const { user: id } = event - this.dispatchEvent( - new CustomEvent("waitingForSyncStep", { detail: id }), - ) - } else if (event.action === "weSyncedWithPeer") { - const { user: id } = event - this.dispatchEvent( - new CustomEvent("weSyncedWithPeer", { detail: id }), - ) - } - }, - }, - share: { - strokePoints: "Map", - eraseIntervals: "Map", - }, - }) + userLeft(uid) { + super.userLeft(uid) + } - const dispatchRemovedIntervalsEvent = (lineEvent) => { - const id = lineEvent.name - const intervals = this._generateRemovedIntervals(id) - const detail = { id, intervals } - this.dispatchEvent( - new CustomEvent("removedIntervalsChange", { - detail, - }), - ) - } + receiveMessage(uid, message) { + super.receiveMessage(uid, message) - const dispatchPathUpdateEvent = (lineEvent) => { - const id = lineEvent.name - const points = this._generatePath(id) - const detail = { id, points } - this.dispatchEvent(new CustomEvent("addOrUpdatePath", { detail })) + this.room.dispatchEvent( + new CustomEvent("peerSyncedWithUs", { detail: uid }), + ) + } + + reportConnectionQuality(uid, quality) { + this.room.dispatchEvent( + new CustomEvent("userConnection", { detail: { id: uid, quality } }), + ) + } + + disconnect() { + super.disconnect() + } + + reconnect() { + throw "Unsupported operation reconnect()" + } + + send(uid, message) { + if (message.type === "sync step 1") { + this.room.dispatchEvent( + new CustomEvent("waitingForSyncStep", { detail: uid }), + ) + } else if (message.type === "sync done") { + this.room.dispatchEvent( + new CustomEvent("weSyncedWithPeer", { detail: uid }), + ) } - this.shared.strokePoints.observe((lineEvent) => { - if (lineEvent.type == "add") { - dispatchPathUpdateEvent(lineEvent) + this.mesh.send(uid, message) + } - lineEvent.value.observe((pointEvent) => { - if (pointEvent.type == "insert") { - dispatchPathUpdateEvent(lineEvent) - } - }) - } - }) - this.shared.eraseIntervals.observe((lineEvent) => { - if (lineEvent.type == "add") { - dispatchRemovedIntervalsEvent(lineEvent) + broadcast(message) { + this.mesh.broadcast(message) + } - lineEvent.value.observe(() => { - dispatchRemovedIntervalsEvent(lineEvent) - }) - } - }) + isDisconnected() { + return false } } -export const connect = async (roomName, connection) => { - const room = new Room(roomName) - await room._initialise(connection) - return room -} +Y.extend("y-crdt", YjsCRDTWrapper)