diff --git a/package-lock.json b/package-lock.json index c9fc18114add6658b91187915966d9037935199f..51e5ec1581f8c44133cb5bedfe5574245bddc23b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -732,9 +732,9 @@ "dev": true }, "abab": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/abab/-/abab-2.0.2.tgz", - "integrity": "sha512-2scffjvioEmNz0OyDSLGWDfKCVwaKc6l9Pm9kOIREU13ClXZvHpg/nRL5xyjSSSLhOnXqft2HpsAzNEEA8cFFg==", + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/abab/-/abab-2.0.3.tgz", + "integrity": "sha512-tsFzPpcttalNjFBCFMqsKYQcWxxen1pgJR56by//QwvJc4/OUS3kPOOttx2tSIfjsylB0pYu7f5D3K1RCxUnUg==", "dev": true }, "accepts": { @@ -821,9 +821,9 @@ } }, "ansi-regex": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-4.1.0.tgz", - "integrity": "sha512-1apePfXM1UOSqw0o9IiFAovVz9M5S1Dg+4TrDwfMewQ6p/rmMueb7tWZjQ1rx4Loy1ArBggoqGpfqqdI4rondg==", + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-5.0.0.tgz", + "integrity": "sha512-bY6fj56OUQ0hU1KjFNDQuJFezqKdrAyFdIevADiqrWHwSlbmBNMHp5ak2f40Pm8JTFyM2mqxkG6ngkHO11f/lg==", "dev": true }, "ansi-styles": { @@ -2034,9 +2034,9 @@ "integrity": "sha1-m81S4UwJd2PnSbJ0xDRu0uVgtak=" }, "des.js": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/des.js/-/des.js-1.0.0.tgz", - "integrity": "sha1-wHTS4qpqipoH29YfmhXCzYPsjsw=", + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/des.js/-/des.js-1.0.1.tgz", + "integrity": "sha512-Q0I4pfFrv2VPd34/vfLrFOoRmlYj3OV50i7fskps1jZWK1kApMWWT9G6RRUeYedLcBDIhnSDaUvJMb3AhUlaEA==", "dev": true, "requires": { "inherits": "^2.0.1", @@ -3904,9 +3904,9 @@ "dev": true }, "import-fresh": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/import-fresh/-/import-fresh-3.1.0.tgz", - "integrity": "sha512-PpuksHKGt8rXfWEr9m9EHIpgyyaltBy8+eF6GJM0QCAxMgxCfucMF3mjecK2QsJr0amJW7gTqh5/wht0z2UhEQ==", + "version": "3.2.1", + "resolved": "https://registry.npmjs.org/import-fresh/-/import-fresh-3.2.1.tgz", + "integrity": "sha512-6e1q1cnWP2RXD9/keSkxHScg508CdXqXWgWBaETNhyuBFz+kUZlKboh+ISK+bU++DmbHimVBrOz/zzPe0sZ3sQ==", "dev": true, "requires": { "parent-module": "^1.0.0", @@ -5183,16 +5183,16 @@ "integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==" }, "mime-db": { - "version": "1.40.0", - "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.40.0.tgz", - "integrity": "sha512-jYdeOMPy9vnxEqFRRo6ZvTZ8d9oPb+k18PKoYNYUe2stVEBPPwsln/qWzdbmaIvnhZ9v2P+CuecK+fpUfsV2mA==" + "version": "1.42.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.42.0.tgz", + "integrity": "sha512-UbfJCR4UAVRNgMpfImz05smAXK7+c+ZntjaA26ANtkXLlOe947Aag5zdIcKQULAiF9Cq4WxBi9jUs5zkA84bYQ==" }, "mime-types": { - "version": "2.1.24", - "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.24.tgz", - "integrity": "sha512-WaFHS3MCl5fapm3oLxU4eYDw77IQM2ACcxQ9RIxfaC3ooc6PFuBMGZZsYpvoXS5D5QTWPieo1jjLdAm3TBP3cQ==", + "version": "2.1.25", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.25.tgz", + "integrity": "sha512-5KhStqB5xpTAeGqKBAMgwaYMnQik7teQN4IAzC7npDv6kzeU6prfkR67bc87J1kWMPGkoaZSq1npmexMgkmEVg==", "requires": { - "mime-db": "1.40.0" + "mime-db": "1.42.0" } }, "mimic-fn": { @@ -5949,6 +5949,14 @@ "ansi-regex": "^4.0.0", "ansi-styles": "^3.2.0", "react-is": "^16.8.4" + }, + "dependencies": { + "ansi-regex": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-4.1.0.tgz", + "integrity": "sha512-1apePfXM1UOSqw0o9IiFAovVz9M5S1Dg+4TrDwfMewQ6p/rmMueb7tWZjQ1rx4Loy1ArBggoqGpfqqdI4rondg==", + "dev": true + } } }, "process": { @@ -7121,14 +7129,25 @@ } }, "string-width": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/string-width/-/string-width-4.1.0.tgz", - "integrity": "sha512-NrX+1dVVh+6Y9dnQ19pR0pP4FiEIlUvdTGn8pw6CKTNq5sgib2nIhmUNT5TAmhWmvKr3WcxBcP3E8nWezuipuQ==", + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/string-width/-/string-width-4.2.0.tgz", + "integrity": "sha512-zUz5JD+tgqtuDjMhwIg5uFVV3dtqZ9yQJlZVfq4I01/K5Paj5UHj7VyrQOJvzawSVlKpObApbfD0Ed6yJc+1eg==", "dev": true, "requires": { "emoji-regex": "^8.0.0", "is-fullwidth-code-point": "^3.0.0", - "strip-ansi": "^5.2.0" + "strip-ansi": "^6.0.0" + }, + "dependencies": { + "strip-ansi": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-6.0.0.tgz", + "integrity": "sha512-AuvKTrTfQNYNIctbR1K/YGTR1756GycPsg7b9bdV9Duqur4gv6aKqHXah67Z8ImS7WEz5QVcOtlfW2rZEugt6w==", + "dev": true, + "requires": { + "ansi-regex": "^5.0.0" + } + } } }, "string.prototype.trimleft": { @@ -7167,6 +7186,14 @@ "dev": true, "requires": { "ansi-regex": "^4.1.0" + }, + "dependencies": { + "ansi-regex": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-4.1.0.tgz", + "integrity": "sha512-1apePfXM1UOSqw0o9IiFAovVz9M5S1Dg+4TrDwfMewQ6p/rmMueb7tWZjQ1rx4Loy1ArBggoqGpfqqdI4rondg==", + "dev": true + } } }, "strip-bom": { @@ -7484,9 +7511,9 @@ "dev": true }, "uglify-js": { - "version": "3.6.8", - "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.6.8.tgz", - "integrity": "sha512-XhHJ3S3ZyMwP8kY1Gkugqx3CJh2C3O0y8NPiSxtm1tyD/pktLAkFZsFGpuNfTZddKDQ/bbDBLAd2YyA1pbi8HQ==", + "version": "3.6.9", + "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.6.9.tgz", + "integrity": "sha512-pcnnhaoG6RtrvHJ1dFncAe8Od6Nuy30oaJ82ts6//sGSXOP5UjBMEthiProjXmMNHOfd93sqlkztifFMcb+4yw==", "dev": true, "optional": true, "requires": { diff --git a/src/app.js b/src/app.js index 26d218099ef4feb70d030d1de7a2f050f3276949..f817c586ab12bde72f72650dab55d765eca859ac 100644 --- a/src/app.js +++ b/src/app.js @@ -287,4 +287,10 @@ canvas.input.addEventListener("strokemove", ({ detail: e }) => { } }) +window.addEventListener("unload", () => { + if (room) { + room.disconnect() + } +}) + tryRoomConnect(DEFAULT_ROOM) diff --git a/src/connection/Connection.js b/src/connection/Connection.js new file mode 100644 index 0000000000000000000000000000000000000000..df2e376703cad8a5e1a7868e1cf9b7a83dd3555e --- /dev/null +++ b/src/connection/Connection.js @@ -0,0 +1,45 @@ +export default class AbstractConnection extends EventTarget { + constructor(options) { + super() + + this.options = options + } + + /* + Supported events: + - roomJoined => () + - roomLeft => () + - channelOpened => ({detail: uid}) + - channelError => ({detail: uid}) + - channelClosed => ({detail: uid}) + - messageReceived => ({detail: {uid, channel, message}}) + */ + + getUserID() { + // => int + } + + getPeerHandle(/*uid*/) { + // => opaque + } + + getPeerFootprint(/*uid*/) { + // => Promise => int + } + + send(/*uid, channel, message*/) { + // => void + } + + broadcast(/*channel, message*/) { + // => void + } + + terminatePeer(/*uid*/) { + // => void + } + + destructor() { + // => void + } +} diff --git a/src/connection/WebRTC.js b/src/connection/WebRTC.js new file mode 100644 index 0000000000000000000000000000000000000000..97adf9177d885044a4fd99357c142085e8eb9542 --- /dev/null +++ b/src/connection/WebRTC.js @@ -0,0 +1,116 @@ +import AbstractConnection from "./Connection.js" + +import LioWebRTC from "liowebrtc" + +export default class WebRTCConnection extends AbstractConnection { + constructor(options) { + super(options) + + this.webrtc = new LioWebRTC({ + url: this.options.url, + dataOnly: true, + constraints: { + minPeers: this.options.mesh.minPeers, + maxPeers: this.options.mesh.maxPeers, + }, + }) + + this.webrtc.on("ready", () => { + this.webrtc.joinRoom(this.options.room) + }) + + this.webrtc.on("joinedRoom", () => { + this.dispatchEvent(new CustomEvent("roomJoined")) + }) + + this.webrtc.on("leftRoom", () => { + this.dispatchEvent(new CustomEvent("roomLeft")) + }) + + this.webrtc.on("channelOpen", (dataChannel, peer) => { + this.dispatchEvent(new CustomEvent("channelOpened", { detail: peer.id })) + }) + + this.webrtc.on("channelError", (dataChannel, peer) => { + this.dispatchEvent(new CustomEvent("channelError", { detail: peer.id })) + }) + + this.webrtc.on("channelClose", (dataChannel, peer) => { + this.dispatchEvent(new CustomEvent("channelClosed", { detail: peer.id })) + }) + + this.webrtc.on("receivedPeerData", (channel, message, peer) => { + // Message could have been forwarded but interface only needs to know about directly connected peers + this.dispatchEvent( + new CustomEvent("messageReceived", { + detail: { + uid: peer.forwardedBy ? peer.forwardedBy.id : peer.id, + channel, + message, + }, + }), + ) + }) + } + + getUserID() { + return this.webrtc.getMyId() + } + + getPeerHandle(uid) { + return this.webrtc.getPeerById(uid) + } + + getPeerFootprint(uid) { + const peer = this.webrtc.getPeerById(uid) + + if (!peer) return Promise.reject() + + return new Promise(function(resolve, reject) { + peer.getStats(null).then((stats) => { + let footprint = -1 + + stats.forEach((report) => { + if ( + report.type == "candidate-pair" && + report.bytesSent > 0 && + report.bytesReceived > 0 && + report.writable + ) { + footprint = Math.max(footprint, report.bytesReceived) + } + }) + + if (footprint != -1) { + resolve(footprint) + } else { + reject() + } + }) + }) + } + + send(uid, channel, message) { + const peer = this.webrtc.getPeerById(uid) + + if (!peer) return + + this.webrtc.whisper(peer, channel, message) + } + + broadcast(channel, message) { + this.webrtc.shout(channel, message) + } + + terminatePeer(uid) { + const peer = this.webrtc.getPeerById(uid) + + if (!peer) return + + peer.end() + } + + destructor() { + this.webrtc.quit() + } +} diff --git a/src/queue.js b/src/queue.js index 6aeed4438621d58337dab675e90ae85bef79c7a5..82473f424d6d0ea1e6155bd7774b5349875b98b3 100644 --- a/src/queue.js +++ b/src/queue.js @@ -46,10 +46,10 @@ self.onmessage = (event) => { let message = event.data.message.message if (event.data.message.length > 1) { - let messages = buffer[event.data.peer.id] + let messages = buffer[event.data.uid] if (!messages) { messages = {} - buffer[event.data.peer.id] = messages + buffer[event.data.uid] = messages } let slices = messages[event.data.message.uuid] diff --git a/src/room.js b/src/room.js index 84c1928e87125292f9f858e9dd04c0916ac46861..f44fe68b5804b3c629aefcc42eebd30d5d314e5a 100644 --- a/src/room.js +++ b/src/room.js @@ -4,12 +4,13 @@ import yMap from "y-map" import yMemory from "y-memory" import Y from "yjs" -import yWebrtc from "./y-webrtc/index.js" +import yP2PMesh from "./y-p2p-mesh.js" +import WebRTCConnection from "./connection/WebRTC.js" yMemory(Y) yMap(Y) yArray(Y) -yWebrtc(Y) +yP2PMesh(Y) import { combineErasureIntervals, @@ -102,9 +103,14 @@ class Room extends EventTarget { name: "memory", }, connector: { - name: "webrtc", + name: "p2p-mesh", + connection: WebRTCConnection, url: "/", room: this.name, + mesh: { + minPeers: 4, + maxPeers: 8, + }, handshake: { initial: 100, interval: 500, diff --git a/src/y-p2p-mesh.js b/src/y-p2p-mesh.js new file mode 100644 index 0000000000000000000000000000000000000000..9cdf2234defca2df16ccefc04508955ac1213c40 --- /dev/null +++ b/src/y-p2p-mesh.js @@ -0,0 +1,313 @@ +/* global Y */ +"use strict" + +function extend(Y) { + class P2PMesh extends Y.AbstractConnector { + constructor(y, options) { + if (options === undefined) { + throw new Error("Options must not be undefined!") + } + + options.role = "slave" + super(y, options) + this.options = options + + this.options.mesh = this.options.mesh || {} + this.options.mesh.minPeers = this.options.mesh.minPeers || 4 + this.options.mesh.maxPeers = this.options.mesh.maxPeers || 8 + + this.options.handshake = this.options.handshake || {} + this.options.handshake.initial = this.options.handshake.initial || 100 + this.options.handshake.interval = this.options.handshake.interval || 500 + + this.options.heartbeat = this.options.heartbeat || {} + this.options.heartbeat.interval = this.options.heartbeat.interval || 500 + this.options.heartbeat.minimum = this.options.heartbeat.minimum || 1000 + this.options.heartbeat.timeout = this.options.heartbeat.timeout || 10000 + + this.queue = new Worker("js/queue.js") + this.queue.onmessage = (event) => { + const method = event.data.method + + if (method == "send") { + const { uid, channel, message } = event.data + + // y-js db transactions can send messages after a peer has disconnected + if (channel == "y-js" && !this.peers.has(uid)) { + return + } + + this.connection.send(uid, channel, message) + } else if (method == "broadcast") { + const { channel, message } = event.data + + return this.connection.broadcast(channel, message) + } else if (method == "received") { + const { uid, channel, message } = event.data + + if (channel === "tw-ml") { + // Handshakes can only be sent and received directly + if (message === "tw") { + // Response message in the handshake + this.queue.postMessage({ + method: "send", + uid, + channel: "tw-ml", + message: "ml", + }) + } else if (message == "ml") { + // Handshake completed + this.checkAndInsertPeer(uid) + } + } else { + this.checkAndInsertPeer(uid) + + if (channel === "y-js") { + this.checkAndInsertPeer(uid) + + if (message.type === "sync done") { + this.raiseUserEvent("peerSyncedWithUs", { user: uid }) + } + + this.receiveMessage(uid, message) + } + } + } + } + + if (options.onUserEvent) { + this.onUserEvent(options.onUserEvent) + } + + this.initialiseConnection() + } + + initialiseConnection() { + this.peers = new Map() + + this.connection = new this.options.connection(this.options) + + this.connection.addEventListener("roomJoined", () => { + this.checkAndEnsureUser() + }) + + this.connection.addEventListener("roomLeft", () => { + console.log("TODO: LEFT ROOM") + }) + + this.connection.addEventListener("channelOpened", ({ detail: uid }) => { + this.checkAndEnsureUser() + + // Start a handshake to ensure both sides are able to use the channel + function handshake(peer) { + const _peer = this.connection.getPeerHandle(uid) + + if (!_peer || _peer !== peer) { + return + } + + if (this.peers.has(uid)) { + return + } + + // Initial message in the handshake + this.queue.postMessage({ + method: "send", + uid, + channel: "tw-ml", + message: "tw", + }) + + setTimeout( + handshake.bind(this, peer), + this.options.handshake.interval, + ) + } + + setTimeout( + handshake.bind(this, this.connection.getPeerHandle(uid)), + this.options.handshake.initial, + ) + }) + + this.connection.addEventListener("channelError", ({ detail: uid }) => + console.log("TODO: CHANNEL ERROR", uid), + ) + + this.connection.addEventListener("channelClosed", ({ detail: uid }) => { + this.checkAndEnsureUser() + this.checkAndRemovePeer(uid) + }) + + this.connection.addEventListener( + "messageReceived", + ({ detail: { uid, channel, message } }) => { + this.checkAndEnsureUser() + + this.queue.postMessage({ + method: "received", + uid, + channel, + message, + }) + }, + ) + } + + // Ensure that y-js is up to date on the user's id + checkAndEnsureUser() { + const id = this.connection.getUserID() + + if (this.y.db.userId === id) { + return + } + + this.raiseUserEvent("userID", { user: id }) + + this.setUserId(id) + } + + // Ensure that y-js knows that the peer has joined + checkAndInsertPeer(uid) { + if (this.peers.has(uid)) { + return + } + + const health = { + lastFootprintResolved: true, + lastFootprint: 0, + lastFootprintTimestamp: Date.now(), + } + health.cb = setInterval( + this.heartbeat.bind( + this, + uid, + this.connection.getPeerHandle(uid), + health, + ), + this.options.heartbeat.interval, + ) + + this.peers.set(uid, health) + + this.userJoined(uid, "master") + } + + heartbeat(uid, peer, health) { + const _peer = this.connection.getPeerHandle(uid) + + if (!_peer || _peer !== peer || !this.peers.has(uid)) { + clearInterval(health.cb) + + return + } + + if (!health.lastFootprintResolved) { + return this.connection.terminatePeer(uid) + } + health.lastFootprintResolved = false + + const self = this + + this.connection + .getPeerFootprint(uid) + .then((footprint) => { + health.lastFootprintResolved = true + + const timeSinceLastFootprint = + Date.now() - health.lastFootprintTimestamp + + if (footprint != health.lastFootprint) { + health.lastFootprint = footprint + health.lastFootprintTimestamp = Date.now() + } else if (timeSinceLastFootprint > self.options.heartbeat.timeout) { + return this.connection.terminatePeer(uid) + } else if (timeSinceLastFootprint > self.options.heartbeat.interval) { + self.queue.postMessage({ + method: "send", + uid, + channel: "heartbeat", + }) + } + + this.raiseUserEvent("userConnection", { + id: uid, + quality: + 1.0 - + (self.options.heartbeat.timeout - + Math.max( + timeSinceLastFootprint, + self.options.heartbeat.minimum, + )) / + (self.options.heartbeat.timeout - + self.options.heartbeat.minimum), + }) + }) + .catch(() => { + return this.connection.terminatePeer(uid) + }) + } + + // Ensure that y-js knows that the peer has left + checkAndRemovePeer(uid) { + if (!this.peers.has(uid)) { + return + } + + this.peers.delete(uid) + + this.userLeft(uid) + } + + connectToPeer(/*uid*/) { + // currently deprecated + } + + disconnect() { + this.queue.terminate() + + this.connection.destructor() + + super.disconnect() + } + + reconnect() { + this.initialiseConnection() + + super.reconnect() + } + + raiseUserEvent(action, data) { + const event = Object.assign({ action }, data) + + for (const f of this.userEventListeners) { + f(event) + } + } + + send(uid, message) { + if (message.type === "sync step 1") { + this.raiseUserEvent("waitingForSyncStep", { user: uid }) + } else if (message.type === "sync done") { + this.raiseUserEvent("weSyncedWithPeer", { user: uid }) + } + + this.queue.postMessage({ method: "send", uid, channel: "y-js", message }) + } + + broadcast(message) { + this.queue.postMessage({ method: "broadcast", channel: "y-js", message }) + } + + isDisconnected() { + return false + } + } + + Y.extend("p2p-mesh", P2PMesh) +} + +export default extend +if (typeof Y !== "undefined") { + extend(Y) +} diff --git a/src/y-webrtc/LICENSE b/src/y-webrtc/LICENSE deleted file mode 100644 index fa48ced3a719e6b62dc246b53d96b75334197d16..0000000000000000000000000000000000000000 --- a/src/y-webrtc/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2014 Kevin Jahns <kevin.jahns@rwth-aachen.de>. - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/src/y-webrtc/index.js b/src/y-webrtc/index.js deleted file mode 100644 index 3ef2c68d222b15dd7d18d028e451dad5b83d0be6..0000000000000000000000000000000000000000 --- a/src/y-webrtc/index.js +++ /dev/null @@ -1,342 +0,0 @@ -/* global Y */ -"use strict" - -import LioWebRTC from "liowebrtc" - -function extend(Y) { - class WebRTC extends Y.AbstractConnector { - constructor(y, options) { - if (options === undefined) { - throw new Error("Options must not be undefined!") - } - - options.role = "slave" - super(y, options) - this.webrtcOptions = options - - this.webrtcOptions.handshake = this.webrtcOptions.handshake || {} - this.webrtcOptions.handshake.initial = - this.webrtcOptions.handshake.initial || 100 - this.webrtcOptions.handshake.interval = - this.webrtcOptions.handshake.interval || 500 - - this.webrtcOptions.heartbeat = this.webrtcOptions.heartbeat || {} - this.webrtcOptions.heartbeat.interval = - this.webrtcOptions.heartbeat.interval || 500 - this.webrtcOptions.heartbeat.minimum = - this.webrtcOptions.heartbeat.minimum || 1000 - this.webrtcOptions.heartbeat.timeout = - this.webrtcOptions.heartbeat.timeout || 10000 - - this.queue = new Worker("js/queue.js") - this.queue.onmessage = (event) => { - const method = event.data.method - - if (method == "send") { - const { uid, channel, message } = event.data - - // y-js db transactions can send messages after a peer has disconnected - if ( - (channel == "y-js" && !this.peers.has(uid)) || - !this.webrtc.getPeerById(uid) - ) { - return - } - - this.webrtc.whisper(this.webrtc.getPeerById(uid), channel, message) - } else if (method == "broadcast") { - const { channel, message } = event.data - - return this.webrtc.shout(channel, message) - } else if (method == "received") { - const { type, message, peer } = event.data - - if (type === "tw-ml") { - // Handshakes can only be sent and received directly - if (message === "tw") { - // Response message in the handshake - this.queue.postMessage({ - method: "send", - uid: peer.id, - channel: "tw-ml", - message: "ml", - }) - } else if (message == "ml") { - // Handshake completed - this.checkAndInsertPeer(peer.id) - } - } else { - this.checkAndInsertPeer(peer.id) - - if (type === "y-js") { - this.checkAndInsertPeer(peer.id) - - if (message.type === "sync done") { - this.raiseUserEvent("peerSyncedWithUs", { user: peer.id }) - } - - this.receiveMessage(peer.id, message) - } - } - } - } - - if (options.onUserEvent) { - this.onUserEvent(options.onUserEvent) - } - this.initialiseConnection() - - window.addEventListener("unload", () => { - this.y.destroy() - }) - } - - initialiseConnection() { - this.webrtc = new LioWebRTC({ - url: this.webrtcOptions.url, - dataOnly: true, - constraints: { - minPeers: 4, - maxPeers: 8, - }, - }) - - this.peers = new Map() - - this.webrtc.on("ready", () => { - this.webrtc.joinRoom(this.webrtcOptions.room) - }) - - this.webrtc.on("joinedRoom", () => { - this.checkAndEnsureUser() - }) - - this.webrtc.on("leftRoom", () => { - console.log("TODO: LEFT ROOM") - }) - - this.webrtc.on("channelError", (a, b, c, d) => - console.log("TODO: CHANNEL ERROR", a, b, c, d), - ) - - this.webrtc.on("channelOpen", (dataChannel, peer) => { - this.checkAndEnsureUser() - - // Start a handshake to ensure both sides are able to use the channel - function handshake(peer) { - const _peer = this.webrtc.getPeerById(peer.id) - - if (!_peer || _peer !== peer) { - return - } - - if (this.peers.has(peer.id)) { - return - } - - // Initial message in the handshake - this.queue.postMessage({ - method: "send", - uid: peer.id, - channel: "tw-ml", - message: "tw", - }) - - setTimeout( - handshake.bind(this, peer), - this.webrtcOptions.handshake.interval, - ) - } - - setTimeout( - handshake.bind(this, peer), - this.webrtcOptions.handshake.initial, - ) - }) - - this.webrtc.on("receivedPeerData", (type, message, peer) => { - this.checkAndEnsureUser() - - // Message could have been forwarded but yjs only needs to know about directly connected peers - this.queue.postMessage({ - method: "received", - type, - message, - peer: { id: peer.forwardedBy ? peer.forwardedBy.id : peer.id }, - }) - }) - - this.webrtc.on("channelClose", (dataChannel, peer) => { - this.checkAndEnsureUser() - this.checkAndRemovePeer(peer.id) - }) - } - - // Ensure that y-js is up to date on the user's id - checkAndEnsureUser() { - const id = this.webrtc.getMyId() - - if (this.y.db.userId === id) { - return - } - - this.raiseUserEvent("userID", { user: id }) - - this.setUserId(id) - } - - // Ensure that y-js knows that the peer has joined - checkAndInsertPeer(uid) { - if (this.peers.has(uid)) { - return - } - - const health = { - lastStatsResolved: true, - lastReceivedBytes: 0, - lastReceivedTimestamp: Date.now(), - } - health.cb = setInterval( - this.heartbeat.bind(this, this.webrtc.getPeerById(uid), health), - this.webrtcOptions.heartbeat.interval, - ) - - this.peers.set(uid, health) - - this.userJoined(uid, "master") - } - - heartbeat(peer, health) { - const _peer = this.webrtc.getPeerById(peer.id) - - if (!_peer || _peer !== peer || !this.peers.has(peer.id)) { - clearInterval(health.cb) - - return - } - - if (!health.lastStatsResolved) { - return peer.end(true) - } - health.lastStatsResolved = false - - const self = this - - peer.getStats(null).then((stats) => { - health.lastStatsResolved = true - - let disconnect = true - - stats.forEach((report) => { - if ( - report.type == "candidate-pair" && - report.bytesSent > 0 && - report.bytesReceived > 0 && - report.writable - ) { - const timeSinceLastReceived = - Date.now() - health.lastReceivedTimestamp - - if (report.bytesReceived != health.lastReceivedBytes) { - health.lastReceivedBytes = report.bytesReceived - health.lastReceivedTimestamp = Date.now() - } else if ( - timeSinceLastReceived > self.webrtcOptions.heartbeat.timeout - ) { - return - } else if ( - timeSinceLastReceived > self.webrtcOptions.heartbeat.interval - ) { - self.queue.postMessage({ - method: "send", - uid: peer.id, - channel: "heartbeat", - }) - } - - this.raiseUserEvent("userConnection", { - id: peer.id, - quality: - 1.0 - - (self.webrtcOptions.heartbeat.timeout - - Math.max( - timeSinceLastReceived, - self.webrtcOptions.heartbeat.minimum, - )) / - (self.webrtcOptions.heartbeat.timeout - - self.webrtcOptions.heartbeat.minimum), - }) - - disconnect = false - } - }) - - if (disconnect) { - peer.end(true) - } - }) - } - - // Ensure that y-js knows that the peer has left - checkAndRemovePeer(uid) { - if (!this.peers.has(uid)) { - return - } - - this.peers.delete(uid) - - this.userLeft(uid) - } - - connectToPeer(/*uid*/) { - // currently deprecated - } - - disconnect() { - this.queue.terminate() - - this.webrtc.quit() - - super.disconnect() - } - - reconnect() { - this.initialiseConnection() - - super.reconnect() - } - - raiseUserEvent(action, data) { - const event = Object.assign({ action }, data) - - for (const f of this.userEventListeners) { - f(event) - } - } - - send(uid, message) { - if (message.type === "sync step 1") { - this.raiseUserEvent("waitingForSyncStep", { user: uid }) - } else if (message.type === "sync done") { - this.raiseUserEvent("weSyncedWithPeer", { user: uid }) - } - - this.queue.postMessage({ method: "send", channel: "y-js", uid, message }) - } - - broadcast(message) { - this.queue.postMessage({ method: "broadcast", channel: "y-js", message }) - } - - isDisconnected() { - return false - } - } - - Y.extend("webrtc", WebRTC) -} - -export default extend -if (typeof Y !== "undefined") { - extend(Y) -}