/* global Y */ "use strict" /* webpack should NOT import Worker */ // #!if false import path from "path" import Worker from "tiny-worker" // #!endif function extend(Y) { class P2PMesh extends Y.AbstractConnector { constructor(y, options) { if (options === undefined) { throw new Error("Options must not be undefined!") } options.role = "slave" super(y, options) this.options = options this.options.mesh = this.options.mesh || {} this.options.mesh.minPeers = this.options.mesh.minPeers || 4 this.options.mesh.maxPeers = this.options.mesh.maxPeers || 8 this.options.handshake = this.options.handshake || {} this.options.handshake.initial = this.options.handshake.initial || 100 this.options.handshake.interval = this.options.handshake.interval || 500 this.options.heartbeat = this.options.heartbeat || {} this.options.heartbeat.interval = this.options.heartbeat.interval || 500 this.options.heartbeat.minimum = this.options.heartbeat.minimum || 1000 this.options.heartbeat.timeout = this.options.heartbeat.timeout || 10000 /* webpack should use the packaged queue.js path */ let workerPath = "js/queue.js" // #!if false workerPath = path.join(__dirname, "./queue.js") // #!endif this.queue = new Worker(workerPath, [], { esm: true }) this.queue.onmessage = (event) => { const method = event.data.method if (method == "send") { const { uid, channel, message } = event.data // y-js db transactions can send messages after a peer has disconnected if (channel == "y-js" && !this.peers.has(uid)) { return } this.connection.send(uid, channel, message) } else if (method == "broadcast") { const { channel, message } = event.data return this.connection.broadcast(channel, message) } else if (method == "received") { const { uid, channel, message } = event.data if (channel === "tw-ml") { // Handshakes can only be sent and received directly if (message === "tw") { // Response message in the handshake this.queue.postMessage({ method: "send", uid, channel: "tw-ml", message: "ml", }) } else if (message == "ml") { // Handshake completed this.checkAndInsertPeer(uid) } } else { this.checkAndInsertPeer(uid) if (channel === "y-js") { this.checkAndInsertPeer(uid) if (message.type === "sync done") { this.raiseUserEvent("peerSyncedWithUs", { user: uid }) } this.receiveMessage(uid, message) } } } } if (options.onUserEvent) { this.onUserEvent(options.onUserEvent) } this.initialiseConnection() } initialiseConnection() { this.peers = new Map() this.connection = new this.options.connection(this.options) this.connection.addEventListener("roomJoined", () => { this.checkAndEnsureUser() }) this.connection.addEventListener("roomLeft", () => { console.log("TODO: LEFT ROOM") }) this.connection.addEventListener("channelOpened", ({ detail: uid }) => { this.checkAndEnsureUser() // Start a handshake to ensure both sides are able to use the channel function handshake(peer) { const _peer = this.connection.getPeerHandle(uid) if (!_peer || _peer !== peer) { return } if (this.peers.has(uid)) { return } // Initial message in the handshake this.queue.postMessage({ method: "send", uid, channel: "tw-ml", message: "tw", }) setTimeout( handshake.bind(this, peer), this.options.handshake.interval, ) } setTimeout( handshake.bind(this, this.connection.getPeerHandle(uid)), this.options.handshake.initial, ) }) this.connection.addEventListener("channelError", ({ detail: uid }) => console.log("TODO: CHANNEL ERROR", uid), ) this.connection.addEventListener("channelClosed", ({ detail: uid }) => { this.checkAndEnsureUser() this.checkAndRemovePeer(uid) }) this.connection.addEventListener( "messageReceived", ({ detail: { uid, channel, message } }) => { this.checkAndEnsureUser() this.queue.postMessage({ method: "received", uid, channel, message, }) }, ) } // Ensure that y-js is up to date on the user's id checkAndEnsureUser() { const id = this.connection.getUserID() if (this.y.db.userId === id) { return } this.raiseUserEvent("userID", { user: id }) this.setUserId(id) } // Ensure that y-js knows that the peer has joined checkAndInsertPeer(uid) { if (this.peers.has(uid)) { return } const health = { lastFootprintResolved: true, lastFootprint: 0, lastFootprintTimestamp: Date.now(), } health.cb = setInterval( this.heartbeat.bind( this, uid, this.connection.getPeerHandle(uid), health, ), this.options.heartbeat.interval, ) this.peers.set(uid, health) this.userJoined(uid, "master") } heartbeat(uid, peer, health) { const _peer = this.connection.getPeerHandle(uid) if (!_peer || _peer !== peer || !this.peers.has(uid)) { clearInterval(health.cb) return } if (!health.lastFootprintResolved) { return this.connection.terminatePeer(uid) } health.lastFootprintResolved = false const self = this this.connection .getPeerFootprint(uid) .then((footprint) => { health.lastFootprintResolved = true const timeSinceLastFootprint = Date.now() - health.lastFootprintTimestamp if (footprint != health.lastFootprint) { health.lastFootprint = footprint health.lastFootprintTimestamp = Date.now() } else if (timeSinceLastFootprint > self.options.heartbeat.timeout) { return this.connection.terminatePeer(uid) } else if (timeSinceLastFootprint > self.options.heartbeat.interval) { self.queue.postMessage({ method: "send", uid, channel: "heartbeat", }) } this.raiseUserEvent("userConnection", { id: uid, quality: 1.0 - (self.options.heartbeat.timeout - Math.max( timeSinceLastFootprint, self.options.heartbeat.minimum, )) / (self.options.heartbeat.timeout - self.options.heartbeat.minimum), }) }) .catch(() => { return this.connection.terminatePeer(uid) }) } // Ensure that y-js knows that the peer has left checkAndRemovePeer(uid) { if (!this.peers.has(uid)) { return } this.peers.delete(uid) this.userLeft(uid) } connectToPeer(/*uid*/) { // currently deprecated } disconnect() { this.queue.terminate() this.connection.destructor() super.disconnect() } reconnect() { this.initialiseConnection() super.reconnect() } raiseUserEvent(action, data) { const event = Object.assign({ action }, data) for (const f of this.userEventListeners) { f(event) } } send(uid, message) { if (message.type === "sync step 1") { this.raiseUserEvent("waitingForSyncStep", { user: uid }) } else if (message.type === "sync done") { this.raiseUserEvent("weSyncedWithPeer", { user: uid }) } this.queue.postMessage({ method: "send", uid, channel: "y-js", message }) } broadcast(message) { this.queue.postMessage({ method: "broadcast", channel: "y-js", message }) } isDisconnected() { return false } } Y.extend("p2p-mesh", P2PMesh) } export default extend if (typeof Y !== "undefined") { extend(Y) }