Skip to content
Snippets Groups Projects
y-p2p-mesh.js 8.12 KiB
Newer Older
/* global Y */
"use strict"

function extend(Y) {
  class P2PMesh extends Y.AbstractConnector {
    constructor(y, options) {
      if (options === undefined) {
        throw new Error("Options must not be undefined!")
      }

      options.role = "slave"
      super(y, options)
      this.options = options

      this.options.mesh = this.options.mesh || {}
      this.options.mesh.minPeers = this.options.mesh.minPeers || 4
      this.options.mesh.maxPeers = this.options.mesh.maxPeers || 8

      this.options.handshake = this.options.handshake || {}
      this.options.handshake.initial = this.options.handshake.initial || 100
      this.options.handshake.interval = this.options.handshake.interval || 500

      this.options.heartbeat = this.options.heartbeat || {}
      this.options.heartbeat.interval = this.options.heartbeat.interval || 500
      this.options.heartbeat.minimum = this.options.heartbeat.minimum || 1000
      this.options.heartbeat.timeout = this.options.heartbeat.timeout || 10000

      this.queue = new Worker("js/queue.js")
      this.queue.onmessage = (event) => {
        const method = event.data.method

        if (method == "send") {
          const { uid, channel, message } = event.data

          // y-js db transactions can send messages after a peer has disconnected
          if (channel == "y-js" && !this.peers.has(uid)) {
            return
          }

          this.connection.send(uid, channel, message)
        } else if (method == "broadcast") {
          const { channel, message } = event.data

          return this.connection.broadcast(channel, message)
        } else if (method == "received") {
          const { uid, channel, message } = event.data

          if (channel === "tw-ml") {
            // Handshakes can only be sent and received directly
            if (message === "tw") {
              // Response message in the handshake
              this.queue.postMessage({
                method: "send",
                uid,
                channel: "tw-ml",
                message: "ml",
              })
            } else if (message == "ml") {
              // Handshake completed
              this.checkAndInsertPeer(uid)
            }
          } else {
            this.checkAndInsertPeer(uid)

            if (channel === "y-js") {
              this.checkAndInsertPeer(uid)

              if (message.type === "sync done") {
                this.raiseUserEvent("peerSyncedWithUs", { user: uid })
              }

              this.receiveMessage(uid, message)
            }
          }
        }
      }

      if (options.onUserEvent) {
        this.onUserEvent(options.onUserEvent)
      }

      this.initialiseConnection()
    }

    initialiseConnection() {
      this.peers = new Map()

      this.connection = new this.options.connection(this.options)

      this.connection.addEventListener("roomJoined", () => {
        this.checkAndEnsureUser()
      })

      this.connection.addEventListener("roomLeft", () => {
        console.log("TODO: LEFT ROOM")
      })

      this.connection.addEventListener("channelOpened", ({ detail: uid }) => {
        this.checkAndEnsureUser()

        // Start a handshake to ensure both sides are able to use the channel
        function handshake(peer) {
          const _peer = this.connection.getPeerHandle(uid)

          if (!_peer || _peer !== peer) {
            return
          }

          if (this.peers.has(uid)) {
            return
          }

          // Initial message in the handshake
          this.queue.postMessage({
            method: "send",
            uid,
            channel: "tw-ml",
            message: "tw",
          })

          setTimeout(
            handshake.bind(this, peer),
            this.options.handshake.interval,
          )
        }

        setTimeout(
          handshake.bind(this, this.connection.getPeerHandle(uid)),
          this.options.handshake.initial,
        )
      })

      this.connection.addEventListener("channelError", ({ detail: uid }) =>
        console.log("TODO: CHANNEL ERROR", uid),
      )

      this.connection.addEventListener("channelClosed", ({ detail: uid }) => {
        this.checkAndEnsureUser()
        this.checkAndRemovePeer(uid)
      })

      this.connection.addEventListener(
        "messageReceived",
        ({ detail: { uid, channel, message } }) => {
          this.checkAndEnsureUser()

          this.queue.postMessage({
            method: "received",
            uid,
            channel,
            message,
          })
        },
      )
    }

    // Ensure that y-js is up to date on the user's id
    checkAndEnsureUser() {
      const id = this.connection.getUserID()

      if (this.y.db.userId === id) {
        return
      }

      this.raiseUserEvent("userID", { user: id })

      this.setUserId(id)
    }

    // Ensure that y-js knows that the peer has joined
    checkAndInsertPeer(uid) {
      if (this.peers.has(uid)) {
        return
      }

      const health = {
        lastFootprintResolved: true,
        lastFootprint: 0,
        lastFootprintTimestamp: Date.now(),
      }
      health.cb = setInterval(
        this.heartbeat.bind(
          this,
          uid,
          this.connection.getPeerHandle(uid),
          health,
        ),
        this.options.heartbeat.interval,
      )

      this.peers.set(uid, health)

      this.userJoined(uid, "master")
    }

    heartbeat(uid, peer, health) {
      const _peer = this.connection.getPeerHandle(uid)

      if (!_peer || _peer !== peer || !this.peers.has(uid)) {
        clearInterval(health.cb)

        return
      }

      if (!health.lastFootprintResolved) {
        return this.connection.terminatePeer(uid)
      }
      health.lastFootprintResolved = false

      const self = this

      this.connection
        .getPeerFootprint(uid)
        .then((footprint) => {
          health.lastFootprintResolved = true

          const timeSinceLastFootprint =
            Date.now() - health.lastFootprintTimestamp

          if (footprint != health.lastFootprint) {
            health.lastFootprint = footprint
            health.lastFootprintTimestamp = Date.now()
          } else if (timeSinceLastFootprint > self.options.heartbeat.timeout) {
            return this.connection.terminatePeer(uid)
          } else if (timeSinceLastFootprint > self.options.heartbeat.interval) {
            self.queue.postMessage({
              method: "send",
              uid,
              channel: "heartbeat",
            })
          }

          this.raiseUserEvent("userConnection", {
            id: uid,
            quality:
              1.0 -
              (self.options.heartbeat.timeout -
                Math.max(
                  timeSinceLastFootprint,
                  self.options.heartbeat.minimum,
                )) /
                (self.options.heartbeat.timeout -
                  self.options.heartbeat.minimum),
          })
        })
        .catch(() => {
          return this.connection.terminatePeer(uid)
        })
    }

    // Ensure that y-js knows that the peer has left
    checkAndRemovePeer(uid) {
      if (!this.peers.has(uid)) {
        return
      }

      this.peers.delete(uid)

      this.userLeft(uid)
    }

    connectToPeer(/*uid*/) {
      // currently deprecated
    }

    disconnect() {
      this.queue.terminate()

      this.connection.destructor()

      super.disconnect()
    }

    reconnect() {
      this.initialiseConnection()

      super.reconnect()
    }

    raiseUserEvent(action, data) {
      const event = Object.assign({ action }, data)

      for (const f of this.userEventListeners) {
        f(event)
      }
    }

    send(uid, message) {
      if (message.type === "sync step 1") {
        this.raiseUserEvent("waitingForSyncStep", { user: uid })
      } else if (message.type === "sync done") {
        this.raiseUserEvent("weSyncedWithPeer", { user: uid })
      }

      this.queue.postMessage({ method: "send", uid, channel: "y-js", message })
    }

    broadcast(message) {
      this.queue.postMessage({ method: "broadcast", channel: "y-js", message })
    }

    isDisconnected() {
      return false
    }
  }

  Y.extend("p2p-mesh", P2PMesh)
}

export default extend
if (typeof Y !== "undefined") {
  extend(Y)
}