-
Moritz Langenstein authoredMoritz Langenstein authored
y-p2p-mesh.js 8.36 KiB
/* global Y */
"use strict"
/* webpack should NOT import Worker */
// #!if false
import Worker from "tiny-worker"
// #!endif
function extend(Y) {
class P2PMesh extends Y.AbstractConnector {
constructor(y, options) {
if (options === undefined) {
throw new Error("Options must not be undefined!")
}
options.role = "slave"
super(y, options)
this.options = options
this.options.mesh = this.options.mesh || {}
this.options.mesh.minPeers = this.options.mesh.minPeers || 4
this.options.mesh.maxPeers = this.options.mesh.maxPeers || 8
this.options.handshake = this.options.handshake || {}
this.options.handshake.initial = this.options.handshake.initial || 100
this.options.handshake.interval = this.options.handshake.interval || 500
this.options.heartbeat = this.options.heartbeat || {}
this.options.heartbeat.interval = this.options.heartbeat.interval || 500
this.options.heartbeat.minimum = this.options.heartbeat.minimum || 1000
this.options.heartbeat.timeout = this.options.heartbeat.timeout || 10000
/* webpack should use the packaged queue.js path */
let path = "js/queue.js"
// #!if false
path = "./src/queue.js"
// #!endif
this.queue = new Worker(path)
this.queue.onmessage = (event) => {
const method = event.data.method
if (method == "send") {
const { uid, channel, message } = event.data
// y-js db transactions can send messages after a peer has disconnected
if (channel == "y-js" && !this.peers.has(uid)) {
return
}
this.connection.send(uid, channel, message)
} else if (method == "broadcast") {
const { channel, message } = event.data
return this.connection.broadcast(channel, message)
} else if (method == "received") {
const { uid, channel, message } = event.data
if (channel === "tw-ml") {
// Handshakes can only be sent and received directly
if (message === "tw") {
// Response message in the handshake
this.queue.postMessage({
method: "send",
uid,
channel: "tw-ml",
message: "ml",
})
} else if (message == "ml") {
// Handshake completed
this.checkAndInsertPeer(uid)
}
} else {
this.checkAndInsertPeer(uid)
if (channel === "y-js") {
this.checkAndInsertPeer(uid)
if (message.type === "sync done") {
this.raiseUserEvent("peerSyncedWithUs", { user: uid })
}
this.receiveMessage(uid, message)
}
}
}
}
if (options.onUserEvent) {
this.onUserEvent(options.onUserEvent)
}
this.initialiseConnection()
}
initialiseConnection() {
this.peers = new Map()
this.connection = new this.options.connection(this.options)
this.connection.addEventListener("roomJoined", () => {
this.checkAndEnsureUser()
})
this.connection.addEventListener("roomLeft", () => {
console.log("TODO: LEFT ROOM")
})
this.connection.addEventListener("channelOpened", ({ detail: uid }) => {
this.checkAndEnsureUser()
// Start a handshake to ensure both sides are able to use the channel
function handshake(peer) {
const _peer = this.connection.getPeerHandle(uid)
if (!_peer || _peer !== peer) {
return
}
if (this.peers.has(uid)) {
return
}
// Initial message in the handshake
this.queue.postMessage({
method: "send",
uid,
channel: "tw-ml",
message: "tw",
})
setTimeout(
handshake.bind(this, peer),
this.options.handshake.interval,
)
}
setTimeout(
handshake.bind(this, this.connection.getPeerHandle(uid)),
this.options.handshake.initial,
)
})
this.connection.addEventListener("channelError", ({ detail: uid }) =>
console.log("TODO: CHANNEL ERROR", uid),
)
this.connection.addEventListener("channelClosed", ({ detail: uid }) => {
this.checkAndEnsureUser()
this.checkAndRemovePeer(uid)
})
this.connection.addEventListener(
"messageReceived",
({ detail: { uid, channel, message } }) => {
this.checkAndEnsureUser()
this.queue.postMessage({
method: "received",
uid,
channel,
message,
})
},
)
}
// Ensure that y-js is up to date on the user's id
checkAndEnsureUser() {
const id = this.connection.getUserID()
if (this.y.db.userId === id) {
return
}
this.raiseUserEvent("userID", { user: id })
this.setUserId(id)
}
// Ensure that y-js knows that the peer has joined
checkAndInsertPeer(uid) {
if (this.peers.has(uid)) {
return
}
const health = {
lastFootprintResolved: true,
lastFootprint: 0,
lastFootprintTimestamp: Date.now(),
}
health.cb = setInterval(
this.heartbeat.bind(
this,
uid,
this.connection.getPeerHandle(uid),
health,
),
this.options.heartbeat.interval,
)
this.peers.set(uid, health)
this.userJoined(uid, "master")
}
heartbeat(uid, peer, health) {
const _peer = this.connection.getPeerHandle(uid)
if (!_peer || _peer !== peer || !this.peers.has(uid)) {
clearInterval(health.cb)
return
}
if (!health.lastFootprintResolved) {
return this.connection.terminatePeer(uid)
}
health.lastFootprintResolved = false
const self = this
this.connection
.getPeerFootprint(uid)
.then((footprint) => {
health.lastFootprintResolved = true
const timeSinceLastFootprint =
Date.now() - health.lastFootprintTimestamp
if (footprint != health.lastFootprint) {
health.lastFootprint = footprint
health.lastFootprintTimestamp = Date.now()
} else if (timeSinceLastFootprint > self.options.heartbeat.timeout) {
return this.connection.terminatePeer(uid)
} else if (timeSinceLastFootprint > self.options.heartbeat.interval) {
self.queue.postMessage({
method: "send",
uid,
channel: "heartbeat",
})
}
this.raiseUserEvent("userConnection", {
id: uid,
quality:
1.0 -
(self.options.heartbeat.timeout -
Math.max(
timeSinceLastFootprint,
self.options.heartbeat.minimum,
)) /
(self.options.heartbeat.timeout -
self.options.heartbeat.minimum),
})
})
.catch(() => {
return this.connection.terminatePeer(uid)
})
}
// Ensure that y-js knows that the peer has left
checkAndRemovePeer(uid) {
if (!this.peers.has(uid)) {
return
}
this.peers.delete(uid)
this.userLeft(uid)
}
connectToPeer(/*uid*/) {
// currently deprecated
}
disconnect() {
this.queue.terminate()
this.connection.destructor()
super.disconnect()
}
reconnect() {
this.initialiseConnection()
super.reconnect()
}
raiseUserEvent(action, data) {
const event = Object.assign({ action }, data)
for (const f of this.userEventListeners) {
f(event)
}
}
send(uid, message) {
if (message.type === "sync step 1") {
this.raiseUserEvent("waitingForSyncStep", { user: uid })
} else if (message.type === "sync done") {
this.raiseUserEvent("weSyncedWithPeer", { user: uid })
}
this.queue.postMessage({ method: "send", uid, channel: "y-js", message })
}
broadcast(message) {
this.queue.postMessage({ method: "broadcast", channel: "y-js", message })
}
isDisconnected() {
return false
}
}
Y.extend("p2p-mesh", P2PMesh)
}
export default extend
if (typeof Y !== "undefined") {
extend(Y)
}