"use strict" export default class P2PMesh { constructor(crdt, options) { if (options === undefined) { throw new Error("Options must not be undefined!") } this.crdt = crdt this.options = options this.options.mesh = this.options.mesh || {} this.options.mesh.minPeers = this.options.mesh.minPeers || 4 this.options.mesh.maxPeers = this.options.mesh.maxPeers || 8 this.options.handshake = this.options.handshake || {} this.options.handshake.initial = this.options.handshake.initial || 100 this.options.handshake.interval = this.options.handshake.interval || 500 this.options.heartbeat = this.options.heartbeat || {} this.options.heartbeat.interval = this.options.heartbeat.interval || 500 this.options.heartbeat.minimum = this.options.heartbeat.minimum || 1000 this.options.heartbeat.timeout = this.options.heartbeat.timeout || 10000 this.queue = new Worker("js/queue.js") this.queue.onmessage = (event) => { if (!this.crdt) { return } const method = event.data.method if (method == "send") { const { uid, channel, message } = event.data // CRDT (e.g. y-js db transactions) can send messages after a peer has disconnected if (channel == "crdt" && !this.peers.has(uid)) { return } this.connection.send(uid, channel, message) } else if (method == "broadcast") { const { channel, message } = event.data return this.connection.broadcast(channel, message) } else if (method == "received") { const { uid, channel, message } = event.data if (channel === "tw-ml") { // Handshakes can only be sent and received directly if (message === "tw") { // Response message in the handshake this.queue.postMessage({ method: "send", uid, channel: "tw-ml", message: "ml", compressed: false, }) } else if (message == "ml") { // Handshake completed this.checkAndInsertPeer(uid) } } else { this.checkAndInsertPeer(uid) if (channel === "crdt") { this.checkAndInsertPeer(uid) this.crdt.receiveMessage(uid, message) } } } } this.initialiseConnection() } initialiseConnection() { this.peers = new Map() this.connection = new this.options.connection(this.options) this.connection.addEventListener("roomJoined", () => { this.checkAndEnsureUser() }) this.connection.addEventListener("roomLeft", () => { console.log("TODO: LEFT ROOM") }) this.connection.addEventListener("channelOpened", ({ detail: uid }) => { this.checkAndEnsureUser() // Start a handshake to ensure both sides are able to use the channel function handshake(peer) { const _peer = this.connection.getPeerHandle(uid) if (!_peer || _peer !== peer) { return } if (this.peers.has(uid)) { return } // Initial message in the handshake this.queue.postMessage({ method: "send", uid, channel: "tw-ml", message: "tw", compressed: false, }) setTimeout(handshake.bind(this, peer), this.options.handshake.interval) } setTimeout( handshake.bind(this, this.connection.getPeerHandle(uid)), this.options.handshake.initial, ) }) this.connection.addEventListener("channelError", ({ detail: uid }) => console.log("TODO: CHANNEL ERROR", uid), ) this.connection.addEventListener("channelClosed", ({ detail: uid }) => { this.checkAndEnsureUser() this.checkAndRemovePeer(uid) }) this.connection.addEventListener( "messageReceived", ({ detail: { uid, channel, message } }) => { this.checkAndEnsureUser() this.queue.postMessage({ method: "received", uid, channel, message, }) }, ) } // Ensure that the crdt is up to date on the user's id checkAndEnsureUser() { if (!this.crdt) { return } const uid = this.connection.getUserID() if (this.crdt.getUserID() == uid) { return } this.crdt.setUserID(uid) } // Ensure that the crdt knows that the peer has joined checkAndInsertPeer(uid) { if (!this.crdt) { return } if (this.peers.has(uid)) { return } const health = { lastFootprintResolved: true, lastFootprint: 0, lastFootprintTimestamp: Date.now(), } health.cb = setInterval( this.heartbeat.bind( this, uid, this.connection.getPeerHandle(uid), health, ), this.options.heartbeat.interval, ) this.peers.set(uid, health) this.crdt.userJoined(uid) } heartbeat(uid, peer, health) { const _peer = this.connection.getPeerHandle(uid) if (!_peer || _peer !== peer || !this.peers.has(uid)) { clearInterval(health.cb) return } if (!health.lastFootprintResolved) { return this.connection.terminatePeer(uid) } health.lastFootprintResolved = false const self = this this.connection .getPeerFootprint(uid) .then((footprint) => { health.lastFootprintResolved = true const timeSinceLastFootprint = Date.now() - health.lastFootprintTimestamp if (footprint != health.lastFootprint) { health.lastFootprint = footprint health.lastFootprintTimestamp = Date.now() } else if (timeSinceLastFootprint > self.options.heartbeat.timeout) { return this.connection.terminatePeer(uid) } else if (timeSinceLastFootprint > self.options.heartbeat.interval) { self.queue.postMessage({ method: "send", uid, channel: "heartbeat", }) } this.crdt.reportConnectionQuality( uid, 1.0 - (self.options.heartbeat.timeout - Math.max( timeSinceLastFootprint, self.options.heartbeat.minimum, )) / (self.options.heartbeat.timeout - self.options.heartbeat.minimum), ) }) .catch(() => { return this.connection.terminatePeer(uid) }) } // Ensure that the crdt knows that the peer has left checkAndRemovePeer(uid) { if (!this.crdt) { return } if (!this.peers.has(uid)) { return } this.peers.delete(uid) this.crdt.userLeft(uid) } disconnect() { this.queue.terminate() this.connection.destructor() this.crdt = null } send(uid, message, compressed = true) { this.queue.postMessage({ method: "send", uid, channel: "crdt", message, compressed, }) } broadcast(message, compressed = true) { this.queue.postMessage({ method: "broadcast", channel: "crdt", message, compressed, }) } }