/* global Y */ "use strict" import LioWebRTC from "liowebrtc" function extend(Y) { class WebRTC extends Y.AbstractConnector { constructor(y, options) { if (options === undefined) { throw new Error("Options must not be undefined!") } options.role = "slave" super(y, options) this.webrtcOptions = options this.webrtcOptions.handshake = this.webrtcOptions.handshake || {} this.webrtcOptions.handshake.initial = this.webrtcOptions.handshake.initial || 100 this.webrtcOptions.handshake.interval = this.webrtcOptions.handshake.interval || 500 this.webrtcOptions.heartbeat = this.webrtcOptions.heartbeat || {} this.webrtcOptions.heartbeat.interval = this.webrtcOptions.heartbeat.interval || 500 this.webrtcOptions.heartbeat.minimum = this.webrtcOptions.heartbeat.minimum || 1000 this.webrtcOptions.heartbeat.timeout = this.webrtcOptions.heartbeat.timeout || 10000 this.queue = new Worker("js/queue.js") this.queue.onmessage = (event) => { const method = event.data.method if (method == "send") { const { uid, channel, message } = event.data // y-js db transactions can send messages after a peer has disconnected if ( (channel == "y-js" && !this.peers.has(uid)) || !this.webrtc.getPeerById(uid) ) { return } this.webrtc.whisper(this.webrtc.getPeerById(uid), channel, message) } else if (method == "broadcast") { const { channel, message } = event.data return this.webrtc.shout(channel, message) } else if (method == "received") { const { type, message, peer } = event.data if (type === "tw-ml") { // Handshakes can only be sent and received directly if (message === "tw") { // Response message in the handshake this.queue.postMessage({ method: "send", uid: peer.id, channel: "tw-ml", message: "ml", }) } else if (message == "ml") { // Handshake completed this.checkAndInsertPeer(peer.id) } } else { this.checkAndInsertPeer(peer.id) if (type === "y-js") { this.checkAndInsertPeer(peer.id) if (message.type === "sync done") { this.raiseUserEvent("peerSyncedWithUs", { user: peer.id }) } this.receiveMessage(peer.id, message) } } } } if (options.onUserEvent) { this.onUserEvent(options.onUserEvent) } this.initialiseConnection() window.addEventListener("unload", () => { this.y.destroy() }) } initialiseConnection() { this.webrtc = new LioWebRTC({ url: this.webrtcOptions.url, dataOnly: true, constraints: { minPeers: 4, maxPeers: 8, }, }) this.peers = new Map() this.webrtc.on("ready", () => { this.webrtc.joinRoom(this.webrtcOptions.room) }) this.webrtc.on("joinedRoom", () => { this.checkAndEnsureUser() }) this.webrtc.on("leftRoom", () => { console.log("TODO: LEFT ROOM") }) this.webrtc.on("channelError", (a, b, c, d) => console.log("TODO: CHANNEL ERROR", a, b, c, d), ) this.webrtc.on("channelOpen", (dataChannel, peer) => { this.checkAndEnsureUser() // Start a handshake to ensure both sides are able to use the channel function handshake(peer) { const _peer = this.webrtc.getPeerById(peer.id) if (!_peer || _peer !== peer) { return } if (this.peers.has(peer.id)) { return } // Initial message in the handshake this.queue.postMessage({ method: "send", uid: peer.id, channel: "tw-ml", message: "tw", }) setTimeout( handshake.bind(this, peer), this.webrtcOptions.handshake.interval, ) } setTimeout( handshake.bind(this, peer), this.webrtcOptions.handshake.initial, ) }) this.webrtc.on("receivedPeerData", (type, message, peer) => { this.checkAndEnsureUser() // Message could have been forwarded but yjs only needs to know about directly connected peers this.queue.postMessage({ method: "received", type, message, peer: { id: peer.forwardedBy ? peer.forwardedBy.id : peer.id }, }) }) this.webrtc.on("channelClose", (dataChannel, peer) => { this.checkAndEnsureUser() this.checkAndRemovePeer(peer.id) }) } // Ensure that y-js is up to date on the user's id checkAndEnsureUser() { const id = this.webrtc.getMyId() if (this.y.db.userId === id) { return } this.raiseUserEvent("userID", { user: id }) this.setUserId(id) } // Ensure that y-js knows that the peer has joined checkAndInsertPeer(uid) { if (this.peers.has(uid)) { return } const health = { lastStatsResolved: true, lastReceivedBytes: 0, lastReceivedTimestamp: Date.now(), } health.cb = setInterval( this.heartbeat.bind(this, this.webrtc.getPeerById(uid), health), this.webrtcOptions.heartbeat.interval, ) this.peers.set(uid, health) this.userJoined(uid, "master") } heartbeat(peer, health) { const _peer = this.webrtc.getPeerById(peer.id) if (!_peer || _peer !== peer || !this.peers.has(peer.id)) { clearInterval(health.cb) return } if (!health.lastStatsResolved) { return peer.end(true) } health.lastStatsResolved = false const self = this peer.getStats(null).then((stats) => { health.lastStatsResolved = true let disconnect = true stats.forEach((report) => { if ( report.type == "candidate-pair" && report.bytesSent > 0 && report.bytesReceived > 0 && report.writable ) { const timeSinceLastReceived = Date.now() - health.lastReceivedTimestamp if (report.bytesReceived != health.lastReceivedBytes) { health.lastReceivedBytes = report.bytesReceived health.lastReceivedTimestamp = Date.now() } else if ( timeSinceLastReceived > self.webrtcOptions.heartbeat.timeout ) { return } else if ( timeSinceLastReceived > self.webrtcOptions.heartbeat.interval ) { self.queue.postMessage({ method: "send", uid: peer.id, channel: "heartbeat", }) } this.raiseUserEvent("userConnection", { id: peer.id, quality: 1.0 - (self.webrtcOptions.heartbeat.timeout - Math.max( timeSinceLastReceived, self.webrtcOptions.heartbeat.minimum, )) / (self.webrtcOptions.heartbeat.timeout - self.webrtcOptions.heartbeat.minimum), }) disconnect = false } }) if (disconnect) { peer.end(true) } }) } // Ensure that y-js knows that the peer has left checkAndRemovePeer(uid) { if (!this.peers.has(uid)) { return } this.peers.delete(uid) this.userLeft(uid) } connectToPeer(/*uid*/) { // currently deprecated } disconnect() { this.queue.terminate() this.webrtc.quit() super.disconnect() } reconnect() { this.initialiseConnection() super.reconnect() } raiseUserEvent(action, data) { const event = Object.assign({ action }, data) for (const f of this.userEventListeners) { f(event) } } send(uid, message) { if (message.type === "sync step 1") { this.raiseUserEvent("waitingForSyncStep", { user: uid }) } else if (message.type === "sync done") { this.raiseUserEvent("weSyncedWithPeer", { user: uid }) } this.queue.postMessage({ method: "send", channel: "y-js", uid, message }) } broadcast(message) { this.queue.postMessage({ method: "broadcast", channel: "y-js", message }) } isDisconnected() { return false } } Y.extend("webrtc", WebRTC) } export default extend if (typeof Y !== "undefined") { extend(Y) }