Skip to content
Snippets Groups Projects
p2p-mesh.js 6.99 KiB
Newer Older
"use strict"

export default class P2PMesh {
  constructor(crdt, options) {
    if (options === undefined) {
      throw new Error("Options must not be undefined!")
    }

    this.crdt = crdt

    this.options = options

    this.options.mesh = this.options.mesh || {}
    this.options.mesh.minPeers = this.options.mesh.minPeers || 4
    this.options.mesh.maxPeers = this.options.mesh.maxPeers || 8

    this.options.handshake = this.options.handshake || {}
    this.options.handshake.initial = this.options.handshake.initial || 100
    this.options.handshake.interval = this.options.handshake.interval || 500

    this.options.heartbeat = this.options.heartbeat || {}
    this.options.heartbeat.interval = this.options.heartbeat.interval || 500
    this.options.heartbeat.minimum = this.options.heartbeat.minimum || 1000
    this.options.heartbeat.timeout = this.options.heartbeat.timeout || 10000

    this.queue = new Worker("js/queue.js")

    this.queue.onmessage = (event) => {
      if (!this.crdt) {
        return
      }

      const method = event.data.method

      if (method == "send") {
        const { uid, channel, message } = event.data

        // CRDT (e.g. y-js db transactions) can send messages after a peer has disconnected
        if (channel == "crdt" && !this.peers.has(uid)) {
          return
        }

        this.connection.send(uid, channel, message)
      } else if (method == "broadcast") {
        const { channel, message } = event.data

        return this.connection.broadcast(channel, message)
      } else if (method == "received") {
        const { uid, channel, message } = event.data

        if (channel === "tw-ml") {
          // Handshakes can only be sent and received directly
          if (message === "tw") {
            // Response message in the handshake
            this.queue.postMessage({
              method: "send",
              uid,
              channel: "tw-ml",
              message: "ml",
            })
          } else if (message == "ml") {
            // Handshake completed
            this.checkAndInsertPeer(uid)
          }
        } else {
          this.checkAndInsertPeer(uid)

          if (channel === "crdt") {
            this.checkAndInsertPeer(uid)

            this.crdt.receiveMessage(uid, message)
          }
        }
      }
    }

    this.initialiseConnection()
  }

  initialiseConnection() {
    this.peers = new Map()

    this.connection = new this.options.connection(this.options)

    this.connection.addEventListener("roomJoined", () => {
      this.checkAndEnsureUser()
    })

    this.connection.addEventListener("roomLeft", () => {
      console.log("TODO: LEFT ROOM")
    })

    this.connection.addEventListener("channelOpened", ({ detail: uid }) => {
      this.checkAndEnsureUser()

      // Start a handshake to ensure both sides are able to use the channel
      function handshake(peer) {
        const _peer = this.connection.getPeerHandle(uid)

        if (!_peer || _peer !== peer) {
          return
        }

        if (this.peers.has(uid)) {
          return
        }

        // Initial message in the handshake
        this.queue.postMessage({
          method: "send",
          uid,
          channel: "tw-ml",
          message: "tw",
        })

        setTimeout(handshake.bind(this, peer), this.options.handshake.interval)
      }

      setTimeout(
        handshake.bind(this, this.connection.getPeerHandle(uid)),
        this.options.handshake.initial,
      )
    })

    this.connection.addEventListener("channelError", ({ detail: uid }) =>
      console.log("TODO: CHANNEL ERROR", uid),
    )

    this.connection.addEventListener("channelClosed", ({ detail: uid }) => {
      this.checkAndEnsureUser()
      this.checkAndRemovePeer(uid)
    })

    this.connection.addEventListener(
      "messageReceived",
      ({ detail: { uid, channel, message } }) => {
        this.checkAndEnsureUser()

        this.queue.postMessage({
          method: "received",
          uid,
          channel,
          message,
        })
      },
    )
  }

  // Ensure that the crdt is up to date on the user's id
  checkAndEnsureUser() {
    if (!this.crdt) {
      return
    }

    const uid = this.connection.getUserID()

    if (this.crdt.getUserID() == uid) {
      return
    }

    this.crdt.setUserID(uid)
  }

  // Ensure that the crdt knows that the peer has joined
  checkAndInsertPeer(uid) {
    if (!this.crdt) {
      return
    }

    if (this.peers.has(uid)) {
      return
    }

    const health = {
      lastFootprintResolved: true,
      lastFootprint: 0,
      lastFootprintTimestamp: Date.now(),
    }
    health.cb = setInterval(
      this.heartbeat.bind(
        this,
        uid,
        this.connection.getPeerHandle(uid),
        health,
      ),
      this.options.heartbeat.interval,
    )

    this.peers.set(uid, health)

    this.crdt.userJoined(uid)
  }

  heartbeat(uid, peer, health) {
    const _peer = this.connection.getPeerHandle(uid)

    if (!_peer || _peer !== peer || !this.peers.has(uid)) {
      clearInterval(health.cb)

      return
    }

    if (!health.lastFootprintResolved) {
      return this.connection.terminatePeer(uid)
    }
    health.lastFootprintResolved = false

    const self = this

    this.connection
      .getPeerFootprint(uid)
      .then((footprint) => {
        health.lastFootprintResolved = true

        const timeSinceLastFootprint =
          Date.now() - health.lastFootprintTimestamp

        if (footprint != health.lastFootprint) {
          health.lastFootprint = footprint
          health.lastFootprintTimestamp = Date.now()
        } else if (timeSinceLastFootprint > self.options.heartbeat.timeout) {
          return this.connection.terminatePeer(uid)
        } else if (timeSinceLastFootprint > self.options.heartbeat.interval) {
          self.queue.postMessage({
            method: "send",
            uid,
            channel: "heartbeat",
          })
        }

        this.crdt.reportConnectionQuality(
          uid,
          1.0 -
            (self.options.heartbeat.timeout -
              Math.max(
                timeSinceLastFootprint,
                self.options.heartbeat.minimum,
              )) /
              (self.options.heartbeat.timeout - self.options.heartbeat.minimum),
        )
      })
      .catch(() => {
        return this.connection.terminatePeer(uid)
      })
  }

  // Ensure that the crdt knows that the peer has left
  checkAndRemovePeer(uid) {
    if (!this.crdt) {
      return
    }

    if (!this.peers.has(uid)) {
      return
    }

    this.peers.delete(uid)

    this.crdt.userLeft(uid)
  }

  disconnect() {
    this.queue.terminate()

    this.connection.destructor()

    this.crdt = null
  }

  send(uid, message, compressed = true) {
    this.queue.postMessage({
      method: "send",
      uid,
      channel: "crdt",
      message,
      compressed,
    })
  broadcast(message, compressed = true) {
    this.queue.postMessage({
      method: "broadcast",
      channel: "crdt",
      message,
      compressed,
    })