Newer
Older
import LioWebRTC from "liowebrtc"
function extend(Y) {
class WebRTC extends Y.AbstractConnector {
constructor(y, options) {
if (options === undefined) {
throw new Error("Options must not be undefined!")
}
options.role = "slave"
super(y, options)
this.webrtcOptions = options
this.webrtcOptions.handshake = this.webrtcOptions.handshake || {}
this.webrtcOptions.handshake.initial =
this.webrtcOptions.handshake.initial || 100
this.webrtcOptions.handshake.interval =
this.webrtcOptions.handshake.interval || 500
this.webrtcOptions.heartbeat = this.webrtcOptions.heartbeat || {}
this.webrtcOptions.heartbeat.interval =
this.webrtcOptions.heartbeat.interval || 500
this.webrtcOptions.heartbeat.minimum =
this.webrtcOptions.heartbeat.minimum || 1000
this.webrtcOptions.heartbeat.timeout =
this.webrtcOptions.heartbeat.timeout || 10000
this.queue = new Worker("js/queue.js")
this.queue.onmessage = (event) => {
const method = event.data.method
if (method == "send") {
const { uid, channel, message } = event.data
// y-js db transactions can send messages after a peer has disconnected
if (
(channel == "y-js" && !this.peers.has(uid)) ||
!this.webrtc.getPeerById(uid)
) {
return
}
if (message.type === "sync step 1") {
this.raiseUserEvent("waitingForSyncStep", { user: uid })
} else if (message.type === "sync done") {
this.raiseUserEvent("weSyncedWithPeer", { user: uid })
}
this.webrtc.whisper(this.webrtc.getPeerById(uid), channel, message)
} else if (method == "broadcast") {
const { channel, message } = event.data
return this.webrtc.shout(channel, message)
} else if (method == "received") {
const { type, message, peer } = event.data
if (type === "tw-ml") {
// Handshakes can only be sent and received directly
if (message === "tw") {
// Response message in the handshake
this.queue.postMessage({
method: "send",
uid: peer.id,
channel: "tw-ml",
message: "ml",
})
} else if (message == "ml") {
// Handshake completed
this.checkAndInsertPeer(peer.id)
}
} else {
this.checkAndInsertPeer(peer.id)
if (type === "y-js") {
this.checkAndInsertPeer(peer.id)
if (message.type === "sync done") {
this.raiseUserEvent("peerSyncedWithUs", { user: peer.id })
}
this.receiveMessage(peer.id, message)
}
}
}
}
if (options.onUserEvent) {
this.onUserEvent(options.onUserEvent)
}
this.initialiseConnection()
Momo Langenstein
committed
window.addEventListener("unload", () => {
this.y.destroy()
})
initialiseConnection() {
Momo Langenstein
committed
this.webrtc = new LioWebRTC({
url: this.webrtcOptions.url,
dataOnly: true,
Moritz Langenstein
committed
constraints: {
minPeers: 4,
maxPeers: 8,
Moritz Langenstein
committed
},
this.peers = new Map()
Momo Langenstein
committed
this.webrtc.on("ready", () => {
this.webrtc.joinRoom(this.webrtcOptions.room)
})
Moritz Langenstein
committed
Momo Langenstein
committed
this.webrtc.on("joinedRoom", () => {
this.checkAndEnsureUser()
console.log("TODO: LEFT ROOM")
this.webrtc.on("channelError", (a, b, c, d) =>
console.log("TODO: CHANNEL ERROR", a, b, c, d),
)
Momo Langenstein
committed
this.webrtc.on("channelOpen", (dataChannel, peer) => {
this.checkAndEnsureUser()
// Start a handshake to ensure both sides are able to use the channel
function handshake(peer) {
const _peer = this.webrtc.getPeerById(peer.id)
if (!_peer || _peer !== peer) {
return
}
Momo Langenstein
committed
if (this.peers.has(peer.id)) {
return
}
// Initial message in the handshake
this.queue.postMessage({
method: "send",
uid: peer.id,
channel: "tw-ml",
message: "tw",
})
Momo Langenstein
committed
setTimeout(
handshake.bind(this, peer),
this.webrtcOptions.handshake.interval,
)
setTimeout(
handshake.bind(this, peer),
this.webrtcOptions.handshake.initial,
)
})
Momo Langenstein
committed
this.webrtc.on("receivedPeerData", (type, message, peer) => {
this.checkAndEnsureUser()
// Message could have been forwarded but yjs only needs to know about directly connected peers
this.queue.postMessage({
method: "received",
type,
message,
peer: { id: peer.forwardedBy ? peer.forwardedBy.id : peer.id },
})
})
Momo Langenstein
committed
this.webrtc.on("channelClose", (dataChannel, peer) => {
this.checkAndEnsureUser()
this.checkAndRemovePeer(peer.id)
})
Momo Langenstein
committed
// Ensure that y-js is up to date on the user's id
checkAndEnsureUser() {
const id = this.webrtc.getMyId()
if (this.y.db.userId === id) {
return
}
this.raiseUserEvent("userID", { user: id })
Momo Langenstein
committed
this.setUserId(id)
}
// Ensure that y-js knows that the peer has joined
checkAndInsertPeer(uid) {
if (this.peers.has(uid)) {
return
}
const health = {
lastStatsResolved: true,
lastReceivedBytes: 0,
lastReceivedTimestamp: Date.now(),
}
health.cb = setInterval(
this.heartbeat.bind(this, this.webrtc.getPeerById(uid), health),
this.webrtcOptions.heartbeat.interval,
this.peers.set(uid, health)
Momo Langenstein
committed
this.userJoined(uid, "master")
}
heartbeat(peer, health) {
const _peer = this.webrtc.getPeerById(peer.id)
if (!_peer || _peer !== peer || !this.peers.has(peer.id)) {
clearInterval(health.cb)
return
}
if (!health.lastStatsResolved) {
return peer.end(true)
}
health.lastStatsResolved = false
const self = this
peer.getStats(null).then((stats) => {
health.lastStatsResolved = true
let disconnect = true
stats.forEach((report) => {
if (
report.type == "candidate-pair" &&
report.bytesSent > 0 &&
report.bytesReceived > 0 &&
report.writable
) {
const timeSinceLastReceived =
Date.now() - health.lastReceivedTimestamp
if (report.bytesReceived != health.lastReceivedBytes) {
health.lastReceivedBytes = report.bytesReceived
health.lastReceivedTimestamp = Date.now()
} else if (
timeSinceLastReceived > self.webrtcOptions.heartbeat.timeout
) {
return
} else if (
timeSinceLastReceived > self.webrtcOptions.heartbeat.interval
) {
self.queue.postMessage({
method: "send",
uid: peer.id,
channel: "heartbeat",
})
}
this.raiseUserEvent("userConnection", {
quality:
1.0 -
(self.webrtcOptions.heartbeat.timeout -
Math.max(
timeSinceLastReceived,
self.webrtcOptions.heartbeat.minimum,
)) /
(self.webrtcOptions.heartbeat.timeout -
self.webrtcOptions.heartbeat.minimum),
})
disconnect = false
if (disconnect) {
peer.end(true)
}
})
}
Momo Langenstein
committed
// Ensure that y-js knows that the peer has left
checkAndRemovePeer(uid) {
if (!this.peers.has(uid)) {
return
}
this.peers.delete(uid)
this.userLeft(uid)
}
connectToPeer(/*uid*/) {
// currently deprecated
this.queue.terminate()
this.webrtc.quit()
super.disconnect()
}
reconnect() {
this.initialiseConnection()
raiseUserEvent(action, data) {
const event = Object.assign({ action }, data)
f(event)
send(uid, message) {
this.queue.postMessage({ method: "send", channel: "y-js", uid, message })
}
broadcast(message) {
this.queue.postMessage({ method: "broadcast", channel: "y-js", message })
isDisconnected() {
return false
}
}
Y.extend("webrtc", WebRTC)
}
if (typeof Y !== "undefined") {
extend(Y)
}