Skip to content
Snippets Groups Projects
y-p2p-mesh.js 8.36 KiB
Newer Older
  • Learn to ignore specific revisions
  • /* webpack should NOT import Worker */
    // #!if false
    import Worker from "tiny-worker"
    // #!endif
    
    
    function extend(Y) {
      class P2PMesh extends Y.AbstractConnector {
        constructor(y, options) {
          if (options === undefined) {
            throw new Error("Options must not be undefined!")
          }
    
          options.role = "slave"
          super(y, options)
          this.options = options
    
          this.options.mesh = this.options.mesh || {}
          this.options.mesh.minPeers = this.options.mesh.minPeers || 4
          this.options.mesh.maxPeers = this.options.mesh.maxPeers || 8
    
          this.options.handshake = this.options.handshake || {}
          this.options.handshake.initial = this.options.handshake.initial || 100
          this.options.handshake.interval = this.options.handshake.interval || 500
    
          this.options.heartbeat = this.options.heartbeat || {}
          this.options.heartbeat.interval = this.options.heartbeat.interval || 500
          this.options.heartbeat.minimum = this.options.heartbeat.minimum || 1000
          this.options.heartbeat.timeout = this.options.heartbeat.timeout || 10000
    
    
          /* webpack should use the packaged queue.js path */
          let path = "js/queue.js"
          // #!if false
          path = "./src/queue.js"
          // #!endif
          this.queue = new Worker(path)
    
    
          this.queue.onmessage = (event) => {
            const method = event.data.method
    
            if (method == "send") {
              const { uid, channel, message } = event.data
    
              // y-js db transactions can send messages after a peer has disconnected
              if (channel == "y-js" && !this.peers.has(uid)) {
                return
              }
    
              this.connection.send(uid, channel, message)
            } else if (method == "broadcast") {
              const { channel, message } = event.data
    
              return this.connection.broadcast(channel, message)
            } else if (method == "received") {
              const { uid, channel, message } = event.data
    
              if (channel === "tw-ml") {
                // Handshakes can only be sent and received directly
                if (message === "tw") {
                  // Response message in the handshake
                  this.queue.postMessage({
                    method: "send",
                    uid,
                    channel: "tw-ml",
                    message: "ml",
                  })
                } else if (message == "ml") {
                  // Handshake completed
                  this.checkAndInsertPeer(uid)
                }
              } else {
                this.checkAndInsertPeer(uid)
    
                if (channel === "y-js") {
                  this.checkAndInsertPeer(uid)
    
                  if (message.type === "sync done") {
                    this.raiseUserEvent("peerSyncedWithUs", { user: uid })
                  }
    
                  this.receiveMessage(uid, message)
                }
              }
            }
          }
    
          if (options.onUserEvent) {
            this.onUserEvent(options.onUserEvent)
          }
    
          this.initialiseConnection()
        }
    
        initialiseConnection() {
          this.peers = new Map()
    
          this.connection = new this.options.connection(this.options)
    
          this.connection.addEventListener("roomJoined", () => {
            this.checkAndEnsureUser()
          })
    
          this.connection.addEventListener("roomLeft", () => {
            console.log("TODO: LEFT ROOM")
          })
    
          this.connection.addEventListener("channelOpened", ({ detail: uid }) => {
            this.checkAndEnsureUser()
    
            // Start a handshake to ensure both sides are able to use the channel
            function handshake(peer) {
              const _peer = this.connection.getPeerHandle(uid)
    
              if (!_peer || _peer !== peer) {
                return
              }
    
              if (this.peers.has(uid)) {
                return
              }
    
              // Initial message in the handshake
              this.queue.postMessage({
                method: "send",
                uid,
                channel: "tw-ml",
                message: "tw",
              })
    
              setTimeout(
                handshake.bind(this, peer),
                this.options.handshake.interval,
              )
            }
    
            setTimeout(
              handshake.bind(this, this.connection.getPeerHandle(uid)),
              this.options.handshake.initial,
            )
          })
    
          this.connection.addEventListener("channelError", ({ detail: uid }) =>
            console.log("TODO: CHANNEL ERROR", uid),
          )
    
          this.connection.addEventListener("channelClosed", ({ detail: uid }) => {
            this.checkAndEnsureUser()
            this.checkAndRemovePeer(uid)
          })
    
          this.connection.addEventListener(
            "messageReceived",
            ({ detail: { uid, channel, message } }) => {
              this.checkAndEnsureUser()
    
              this.queue.postMessage({
                method: "received",
                uid,
                channel,
                message,
              })
            },
          )
        }
    
        // Ensure that y-js is up to date on the user's id
        checkAndEnsureUser() {
          const id = this.connection.getUserID()
    
          if (this.y.db.userId === id) {
            return
          }
    
          this.raiseUserEvent("userID", { user: id })
    
          this.setUserId(id)
        }
    
        // Ensure that y-js knows that the peer has joined
        checkAndInsertPeer(uid) {
          if (this.peers.has(uid)) {
            return
          }
    
          const health = {
            lastFootprintResolved: true,
            lastFootprint: 0,
            lastFootprintTimestamp: Date.now(),
          }
          health.cb = setInterval(
            this.heartbeat.bind(
              this,
              uid,
              this.connection.getPeerHandle(uid),
              health,
            ),
            this.options.heartbeat.interval,
          )
    
          this.peers.set(uid, health)
    
          this.userJoined(uid, "master")
        }
    
        heartbeat(uid, peer, health) {
          const _peer = this.connection.getPeerHandle(uid)
    
          if (!_peer || _peer !== peer || !this.peers.has(uid)) {
            clearInterval(health.cb)
    
            return
          }
    
          if (!health.lastFootprintResolved) {
            return this.connection.terminatePeer(uid)
          }
          health.lastFootprintResolved = false
    
          const self = this
    
          this.connection
            .getPeerFootprint(uid)
            .then((footprint) => {
              health.lastFootprintResolved = true
    
              const timeSinceLastFootprint =
                Date.now() - health.lastFootprintTimestamp
    
              if (footprint != health.lastFootprint) {
                health.lastFootprint = footprint
                health.lastFootprintTimestamp = Date.now()
              } else if (timeSinceLastFootprint > self.options.heartbeat.timeout) {
                return this.connection.terminatePeer(uid)
              } else if (timeSinceLastFootprint > self.options.heartbeat.interval) {
                self.queue.postMessage({
                  method: "send",
                  uid,
                  channel: "heartbeat",
                })
              }
    
              this.raiseUserEvent("userConnection", {
                id: uid,
                quality:
                  1.0 -
                  (self.options.heartbeat.timeout -
                    Math.max(
                      timeSinceLastFootprint,
                      self.options.heartbeat.minimum,
                    )) /
                    (self.options.heartbeat.timeout -
                      self.options.heartbeat.minimum),
              })
            })
            .catch(() => {
              return this.connection.terminatePeer(uid)
            })
        }
    
        // Ensure that y-js knows that the peer has left
        checkAndRemovePeer(uid) {
          if (!this.peers.has(uid)) {
            return
          }
    
          this.peers.delete(uid)
    
          this.userLeft(uid)
        }
    
        connectToPeer(/*uid*/) {
          // currently deprecated
        }
    
        disconnect() {
          this.queue.terminate()
    
          this.connection.destructor()
    
          super.disconnect()
        }
    
        reconnect() {
          this.initialiseConnection()
    
          super.reconnect()
        }
    
        raiseUserEvent(action, data) {
          const event = Object.assign({ action }, data)
    
          for (const f of this.userEventListeners) {
            f(event)
          }
        }
    
        send(uid, message) {
          if (message.type === "sync step 1") {
            this.raiseUserEvent("waitingForSyncStep", { user: uid })
          } else if (message.type === "sync done") {
            this.raiseUserEvent("weSyncedWithPeer", { user: uid })
          }
    
          this.queue.postMessage({ method: "send", uid, channel: "y-js", message })
        }
    
        broadcast(message) {
          this.queue.postMessage({ method: "broadcast", channel: "y-js", message })
        }
    
        isDisconnected() {
          return false
        }
      }
    
      Y.extend("p2p-mesh", P2PMesh)
    }
    
    export default extend
    if (typeof Y !== "undefined") {
      extend(Y)
    }