Skip to content
Snippets Groups Projects
p2p-mesh.js 6.81 KiB
Newer Older
  • Learn to ignore specific revisions
  • "use strict"
    
    export default class P2PMesh {
      constructor(crdt, options) {
        if (options === undefined) {
          throw new Error("Options must not be undefined!")
        }
    
        this.crdt = crdt
    
        this.options = options
    
        this.options.mesh = this.options.mesh || {}
        this.options.mesh.minPeers = this.options.mesh.minPeers || 4
        this.options.mesh.maxPeers = this.options.mesh.maxPeers || 8
    
        this.options.handshake = this.options.handshake || {}
        this.options.handshake.initial = this.options.handshake.initial || 100
        this.options.handshake.interval = this.options.handshake.interval || 500
    
        this.options.heartbeat = this.options.heartbeat || {}
        this.options.heartbeat.interval = this.options.heartbeat.interval || 500
        this.options.heartbeat.minimum = this.options.heartbeat.minimum || 1000
        this.options.heartbeat.timeout = this.options.heartbeat.timeout || 10000
    
    
        this.queue = new Worker("js/queue.js")
    
    
        this.queue.onmessage = (event) => {
          if (!this.crdt) {
            return
          }
    
          const method = event.data.method
    
          if (method == "send") {
            const { uid, channel, message } = event.data
    
            // CRDT (e.g. y-js db transactions) can send messages after a peer has disconnected
            if (channel == "crdt" && !this.peers.has(uid)) {
              return
            }
    
            this.connection.send(uid, channel, message)
          } else if (method == "broadcast") {
            const { channel, message } = event.data
    
            return this.connection.broadcast(channel, message)
          } else if (method == "received") {
            const { uid, channel, message } = event.data
    
            if (channel === "tw-ml") {
              // Handshakes can only be sent and received directly
              if (message === "tw") {
                // Response message in the handshake
                this.queue.postMessage({
                  method: "send",
                  uid,
                  channel: "tw-ml",
                  message: "ml",
                })
              } else if (message == "ml") {
                // Handshake completed
                this.checkAndInsertPeer(uid)
              }
            } else {
              this.checkAndInsertPeer(uid)
    
              if (channel === "crdt") {
                this.checkAndInsertPeer(uid)
    
                this.crdt.receiveMessage(uid, message)
              }
            }
          }
        }
    
        this.initialiseConnection()
      }
    
      initialiseConnection() {
        this.peers = new Map()
    
        this.connection = new this.options.connection(this.options)
    
        this.connection.addEventListener("roomJoined", () => {
          this.checkAndEnsureUser()
        })
    
        this.connection.addEventListener("roomLeft", () => {
          console.log("TODO: LEFT ROOM")
        })
    
        this.connection.addEventListener("channelOpened", ({ detail: uid }) => {
          this.checkAndEnsureUser()
    
          // Start a handshake to ensure both sides are able to use the channel
          function handshake(peer) {
            const _peer = this.connection.getPeerHandle(uid)
    
            if (!_peer || _peer !== peer) {
              return
            }
    
            if (this.peers.has(uid)) {
              return
            }
    
            // Initial message in the handshake
            this.queue.postMessage({
              method: "send",
              uid,
              channel: "tw-ml",
              message: "tw",
            })
    
            setTimeout(handshake.bind(this, peer), this.options.handshake.interval)
          }
    
          setTimeout(
            handshake.bind(this, this.connection.getPeerHandle(uid)),
            this.options.handshake.initial,
          )
        })
    
        this.connection.addEventListener("channelError", ({ detail: uid }) =>
          console.log("TODO: CHANNEL ERROR", uid),
        )
    
        this.connection.addEventListener("channelClosed", ({ detail: uid }) => {
          this.checkAndEnsureUser()
          this.checkAndRemovePeer(uid)
        })
    
        this.connection.addEventListener(
          "messageReceived",
          ({ detail: { uid, channel, message } }) => {
            this.checkAndEnsureUser()
    
            this.queue.postMessage({
              method: "received",
              uid,
              channel,
              message,
            })
          },
        )
      }
    
      // Ensure that the crdt is up to date on the user's id
      checkAndEnsureUser() {
        if (!this.crdt) {
          return
        }
    
        const uid = this.connection.getUserID()
    
        if (this.crdt.getUserID() == uid) {
          return
        }
    
        this.crdt.setUserID(uid)
      }
    
      // Ensure that the crdt knows that the peer has joined
      checkAndInsertPeer(uid) {
        if (!this.crdt) {
          return
        }
    
        if (this.peers.has(uid)) {
          return
        }
    
        const health = {
          lastFootprintResolved: true,
          lastFootprint: 0,
          lastFootprintTimestamp: Date.now(),
        }
        health.cb = setInterval(
          this.heartbeat.bind(
            this,
            uid,
            this.connection.getPeerHandle(uid),
            health,
          ),
          this.options.heartbeat.interval,
        )
    
        this.peers.set(uid, health)
    
        this.crdt.userJoined(uid)
      }
    
      heartbeat(uid, peer, health) {
        const _peer = this.connection.getPeerHandle(uid)
    
        if (!_peer || _peer !== peer || !this.peers.has(uid)) {
          clearInterval(health.cb)
    
          return
        }
    
        if (!health.lastFootprintResolved) {
          return this.connection.terminatePeer(uid)
        }
        health.lastFootprintResolved = false
    
        const self = this
    
        this.connection
          .getPeerFootprint(uid)
          .then((footprint) => {
            health.lastFootprintResolved = true
    
            const timeSinceLastFootprint =
              Date.now() - health.lastFootprintTimestamp
    
            if (footprint != health.lastFootprint) {
              health.lastFootprint = footprint
              health.lastFootprintTimestamp = Date.now()
            } else if (timeSinceLastFootprint > self.options.heartbeat.timeout) {
              return this.connection.terminatePeer(uid)
            } else if (timeSinceLastFootprint > self.options.heartbeat.interval) {
              self.queue.postMessage({
                method: "send",
                uid,
                channel: "heartbeat",
              })
            }
    
            this.crdt.reportConnectionQuality(
              uid,
              1.0 -
                (self.options.heartbeat.timeout -
                  Math.max(
                    timeSinceLastFootprint,
                    self.options.heartbeat.minimum,
                  )) /
                  (self.options.heartbeat.timeout - self.options.heartbeat.minimum),
            )
          })
          .catch(() => {
            return this.connection.terminatePeer(uid)
          })
      }
    
      // Ensure that the crdt knows that the peer has left
      checkAndRemovePeer(uid) {
        if (!this.crdt) {
          return
        }
    
        if (!this.peers.has(uid)) {
          return
        }
    
        this.peers.delete(uid)
    
        this.crdt.userLeft(uid)
      }
    
      disconnect() {
        this.queue.terminate()
    
        this.connection.destructor()
    
        this.crdt = null
      }
    
      send(uid, message) {
        this.queue.postMessage({ method: "send", uid, channel: "crdt", message })
      }
    
      broadcast(message) {
        this.queue.postMessage({ method: "broadcast", channel: "crdt", message })
      }
    }