-
Moritz Langenstein authoredMoritz Langenstein authored
benchmark.test.js 37.41 KiB
import chalk from "chalk"
import { connect } from "../src/room.js"
import MockConnection, {
getUserID,
getPeerHandle,
getPeerFootprint,
send,
sendListener,
broadcast,
broadcastListener,
terminatePeer,
destructor,
addEventListener,
getEventListener,
} from "../src/connection/MockConnection.js"
import {
createMessageReceivedEvent,
handshake,
syncStep1,
syncDone,
dotDraw,
dotErase,
pathDraw,
pathErase,
} from "./benchmark.data.js"
function printBenchmark(title, iterations, results) {
process.stdout.write(`\n ${title} (${iterations} iterations):\n`)
for (const title in results) {
const { timeLoc, packets, size, timeRem, events } = results[title]
const synchronisation = title == "synchronisation"
process.stdout.write(
chalk` {yellow ⧗} {dim ${title}:} {yellow.inverse ${(
timeLoc /
(1e6 * (synchronisation ? 1 : iterations))
).toFixed(3)}ms ${
synchronisation ? "total" : "/ it"
}} => {dim ${packets} packet(s)} => {magenta.inverse ${size}B} => {yellow.inverse ${(
timeRem /
(1e6 * (synchronisation ? 1 : iterations))
).toFixed(3)}ms ${
synchronisation ? "total" : "/ it"
}} => {dim ${events} event(s)}\n`,
)
}
process.stdout.write(`\n`)
}
let room = null
let updateRoom = null
let syncRoom = null
describe("drawing app mesh", () => {
beforeEach(async () => {
getUserID.mockClear()
getPeerHandle.mockClear()
getPeerFootprint.mockClear()
send.mockClear()
broadcast.mockClear()
terminatePeer.mockClear()
destructor.mockClear()
addEventListener.mockClear()
MockConnection.mockClear()
room = await connect("room", MockConnection)
getEventListener(
"room",
"messageReceived",
)(createMessageReceivedEvent(handshake, "tw-ml"))
updateRoom = await connect("update", MockConnection)
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(handshake, "tw-ml"))
syncRoom = await connect("sync", MockConnection)
getEventListener(
"sync",
"messageReceived",
)(createMessageReceivedEvent(handshake, "tw-ml"))
})
afterEach(() => {
room.disconnect()
room = null
updateRoom.disconnect()
updateRoom = null
syncRoom.disconnect()
syncRoom = null
})
it("benchmarks a single draw and erase update sequentially", () => {
const ITERATIONS = 1000
jest.setTimeout(ITERATIONS * (10 + 300))
const dotIDs = []
let prevTime
let broadcasts = 0
let addLocTime = 0
const addPackets = []
let addSize = 0
let eraseLocTime = 0
const erasePackets = []
let eraseSize = 0
let syncLocTime
const syncPackets = []
let syncSize = 0
let addRemTime = 0
let addEvents = 0
let eraseRemTime = 0
let eraseEvents = 0
let syncRemTime = 0
let syncEvents = 0
let timeout
return new Promise((resolve) => {
broadcastListener.callback = (channel, message) => {
const currTime = process.hrtime()
broadcasts += 1
if (broadcasts <= ITERATIONS) {
addLocTime +=
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
addPackets.push(message)
addSize += message.message.length
} else {
eraseLocTime +=
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
erasePackets.push(message)
eraseSize += message.message.length
}
prevTime = process.hrtime()
if (broadcasts < ITERATIONS) {
dotIDs.push(room.addPath(dotDraw[0]))
} else if (broadcasts < ITERATIONS * 2) {
room.extendErasureIntervals(
dotIDs[broadcasts - ITERATIONS],
dotErase[0][0],
dotErase[0][1],
)
} else {
resolve()
}
}
prevTime = process.hrtime()
dotIDs.push(room.addPath(dotDraw[0]))
})
.then(
() =>
new Promise((resolve) => {
sendListener.callback = (uid, channel, message) => {
const currTime = process.hrtime()
syncLocTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncPackets.push(message)
syncSize += message.message.length
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
}
prevTime = process.hrtime()
getEventListener(
"room",
"messageReceived",
)(createMessageReceivedEvent(syncStep1))
}),
)
.then(
() =>
new Promise((resolve) => {
broadcasts = 0
let currTime
const timeoutCallback = () => {
broadcasts += 1
// TODO: can we assume that we only use one message here?
if (broadcasts < ITERATIONS) {
addRemTime +=
(currTime[0] - prevTime[0]) * 1e9 +
(currTime[1] - prevTime[1])
} else {
eraseRemTime +=
(currTime[0] - prevTime[0]) * 1e9 +
(currTime[1] - prevTime[1])
}
let packet
if (broadcasts <= ITERATIONS) {
packet = addPackets[broadcasts]
} else if (broadcasts < ITERATIONS * 2) {
packet = erasePackets[broadcasts - ITERATIONS]
} else {
resolve()
}
prevTime = process.hrtime()
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(packet))
}
updateRoom.addEventListener("addOrUpdatePath", () => {
currTime = process.hrtime()
addEvents += 1
clearTimeout(timeout)
timeout = setTimeout(timeoutCallback, 100)
})
updateRoom.addEventListener("removedIntervalsChange", () => {
currTime = process.hrtime()
if (broadcasts < ITERATIONS) {
addEvents += 1
} else if (broadcasts < ITERATIONS * 2) {
eraseEvents += 1
}
clearTimeout(timeout)
timeout = setTimeout(timeoutCallback, 100)
})
prevTime = process.hrtime()
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(addPackets[0]))
}),
)
.then(
() =>
new Promise((resolve) => {
syncRoom.addEventListener("addOrUpdatePath", () => {
const currTime = process.hrtime()
syncRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
syncRoom.addEventListener("removedIntervalsChange", () => {
const currTime = process.hrtime()
syncRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
prevTime = process.hrtime()
for (const syncPacket of syncPackets) {
getEventListener(
"sync",
"messageReceived",
)(createMessageReceivedEvent(syncPacket))
}
}),
)
.then(() => {
printBenchmark("single draw and erase [sequential]", ITERATIONS, {
addPath: {
timeLoc: addLocTime,
packets: addPackets.length,
size: addSize,
timeRem: addRemTime,
events: addEvents,
},
extendErasureIntervals: {
timeLoc: eraseLocTime,
packets: erasePackets.length,
size: eraseSize,
timeRem: eraseRemTime,
events: eraseEvents,
},
synchronisation: {
timeLoc: syncLocTime,
packets: syncPackets.length,
size: syncSize,
timeRem: syncRemTime,
events: syncEvents,
},
})
})
})
it("benchmarks a single draw and erase update in parallel", () => {
const ITERATIONS = 1000
jest.setTimeout(ITERATIONS * 20)
const dotIDs = []
let prevTime
let addLocTime
const addPackets = []
let addSize = 0
let eraseLocTime
const erasePackets = []
let eraseSize = 0
let syncLocTime
const syncPackets = []
let syncSize = 0
let addRemTime = 0
let addEvents = 0
let eraseRemTime = 0
let eraseEvents = 0
let syncRemTime = 0
let syncEvents = 0
let timeout
return new Promise((resolve) => {
broadcastListener.callback = (channel, message) => {
const currTime = process.hrtime()
addLocTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
addPackets.push(message)
addSize += message.message.length
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
}
prevTime = process.hrtime()
for (let i = 0; i < ITERATIONS; i++) {
dotIDs.push(room.addPath(dotDraw[0]))
}
})
.then(
() =>
new Promise((resolve) => {
broadcastListener.callback = (channel, message) => {
const currTime = process.hrtime()
eraseLocTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
erasePackets.push(message)
eraseSize += message.message.length
clearTimeout(timeout)
timeout = setTimeout(() => {
resolve()
}, 1000)
}
prevTime = process.hrtime()
for (let i = 0; i < ITERATIONS; i++) {
room.extendErasureIntervals(
dotIDs[i],
dotErase[0][0],
dotErase[0][1],
)
}
}),
)
.then(
() =>
new Promise((resolve) => {
sendListener.callback = (uid, channel, message) => {
const currTime = process.hrtime()
syncLocTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncPackets.push(message)
syncSize += message.message.length
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
}
prevTime = process.hrtime()
getEventListener(
"room",
"messageReceived",
)(createMessageReceivedEvent(syncStep1))
}),
)
.then(
() =>
new Promise((resolve) => {
updateRoom.addEventListener("addOrUpdatePath", () => {
const currTime = process.hrtime()
addRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
addEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
updateRoom.addEventListener("removedIntervalsChange", () => {
const currTime = process.hrtime()
addRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
addEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
for (const addPacket of addPackets) {
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(addPacket))
}
}),
)
.then(
() =>
new Promise((resolve) => {
updateRoom.addEventListener("addOrUpdatePath", () => {
const currTime = process.hrtime()
eraseRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
eraseEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
updateRoom.addEventListener("removedIntervalsChange", () => {
const currTime = process.hrtime()
eraseRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
eraseEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
for (const erasePacket of erasePackets) {
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(erasePacket))
}
}),
)
.then(
() =>
new Promise((resolve) => {
syncRoom.addEventListener("addOrUpdatePath", () => {
const currTime = process.hrtime()
syncRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
syncRoom.addEventListener("removedIntervalsChange", () => {
const currTime = process.hrtime()
syncRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
for (const syncPacket of syncPackets) {
getEventListener(
"sync",
"messageReceived",
)(createMessageReceivedEvent(syncPacket))
}
}),
)
.then(
() =>
new Promise((resolve) => {
printBenchmark("single draw and erase [parallel]", ITERATIONS, {
addPath: {
timeLoc: addLocTime,
packets: addPackets.length,
size: addSize,
timeRem: addRemTime,
events: addEvents,
},
extendErasureIntervals: {
timeLoc: eraseLocTime,
packets: erasePackets.length,
size: eraseSize,
timeRem: eraseRemTime,
events: eraseEvents,
},
synchronisation: {
timeLoc: syncLocTime,
packets: syncPackets.length,
size: syncSize,
timeRem: syncRemTime,
events: syncEvents,
},
})
resolve()
}),
)
})
it("communicates a single draw and erase update", () => {
jest.setTimeout(30000)
let dotID
const addPackets = []
const erasePackets = []
const syncPackets = []
let syncDonePacket = -1
const updatePaths = {}
const updateIntervals = {}
const syncPaths = {}
const syncIntervals = {}
let timeout
return new Promise((resolve) => {
// Draw the dot
broadcastListener.callback = (channel, message) => {
expect(channel).toEqual("y-js")
expect(message.message instanceof Uint8Array).toBe(true)
addPackets.push(message)
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
}
dotID = room.addPath(dotDraw[0])
})
.then(
() =>
new Promise((resolve) => {
// Erase the dot
broadcastListener.callback = (channel, message) => {
expect(channel).toEqual("y-js")
expect(message.message instanceof Uint8Array).toBe(true)
erasePackets.push(message)
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
}
room.extendErasureIntervals(dotID, dotErase[0][0], dotErase[0][1])
}),
)
.then(
() =>
new Promise((resolve) => {
// Request a sync step 2
sendListener.callback = (uid, channel, message) => {
expect(uid).toEqual("moritz")
expect(channel).toEqual("y-js")
expect(message.message instanceof Uint8Array).toBe(true)
syncPackets.push(message)
if (
message.message.length == syncDone.message.length &&
JSON.stringify(
Object.assign({}, message, { uuid: undefined }),
) == JSON.stringify(syncDone)
) {
expect(syncDonePacket).toEqual(-1)
syncDonePacket = syncPackets.length
}
clearTimeout(timeout)
timeout = setTimeout(() => {
expect(syncDonePacket).toEqual(syncPackets.length)
resolve()
}, 1000)
}
getEventListener(
"room",
"messageReceived",
)(createMessageReceivedEvent(syncStep1))
}),
)
.then(
() =>
new Promise((resolve) => {
// Replay the draw updates
updateRoom.addEventListener(
"addOrUpdatePath",
({ detail: { id, points } }) => {
updatePaths[id] = points
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
updateRoom.addEventListener(
"removedIntervalsChange",
({ detail: { id, intervals } }) => {
updateIntervals[id] = intervals
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
for (const addPacket of addPackets) {
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(addPacket))
}
}),
)
.then(
() =>
new Promise((resolve) => {
// Check the draw updates
expect(updatePaths).toStrictEqual({ [dotID]: dotDraw })
expect(updateIntervals).toStrictEqual({ [dotID]: {} })
resolve()
}),
)
.then(
() =>
new Promise((resolve) => {
// Replay the erase updates
updateRoom.addEventListener(
"removedIntervalsChange",
({ detail: { id, intervals } }) => {
updateIntervals[id] = intervals
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
for (const erasePacket of erasePackets) {
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(erasePacket))
}
}),
)
.then(
() =>
new Promise((resolve) => {
// Check the erase updates
expect(updatePaths).toStrictEqual({ [dotID]: dotDraw })
expect(updateIntervals).toStrictEqual({
[dotID]: { [dotErase[0][0]]: dotErase[0][1] },
})
resolve()
}),
)
.then(
() =>
new Promise((resolve) => {
// Replay the synchronisation
syncRoom.addEventListener(
"addOrUpdatePath",
({ detail: { id, points } }) => {
syncPaths[id] = points
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
syncRoom.addEventListener(
"removedIntervalsChange",
({ detail: { id, intervals } }) => {
syncIntervals[id] = intervals
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
for (const syncPacket of syncPackets) {
getEventListener(
"sync",
"messageReceived",
)(createMessageReceivedEvent(syncPacket))
}
}),
)
.then(
() =>
new Promise((resolve) => {
// Check the synchronisation
expect(syncPaths).toStrictEqual({ [dotID]: dotDraw })
expect(syncIntervals).toStrictEqual({
[dotID]: { [dotErase[0][0]]: dotErase[0][1] },
})
resolve()
}),
)
})
it("benchmarks a path draw and erase update sequentially", () => {
const ITERATIONS = 1000
jest.setTimeout(ITERATIONS * 1500)
const pathIDs = []
let prevTime
let currTime
let broadcasts = 0
let addLocTime = 0
const addPackets = []
let addSize = 0
let eraseLocTime = 0
const erasePackets = []
let eraseSize = 0
let syncLocTime
const syncPackets = []
let syncSize = 0
let addRemTime = 0
let addEvents = 0
let eraseRemTime = 0
let eraseEvents = 0
let syncRemTime = 0
let syncEvents = 0
let timeout
return new Promise((resolve) => {
broadcastListener.callback = (channel, message) => {
currTime = process.hrtime()
if (broadcasts < ITERATIONS) {
addPackets[addPackets.length - 1].push(message)
addSize += message.message.length
} else {
erasePackets[erasePackets.length - 1].push(message)
eraseSize += message.message.length
}
clearTimeout(timeout)
timeout = setTimeout(() => {
broadcasts += 1
if (broadcasts <= ITERATIONS) {
addLocTime +=
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
} else {
eraseLocTime +=
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
}
prevTime = process.hrtime()
if (broadcasts < ITERATIONS) {
addPackets.push([])
const tmpPathID = room.addPath(pathDraw[0])
pathIDs.push(tmpPathID)
for (let i = 1; i < pathDraw.length; i++) {
room.extendPath(tmpPathID, pathDraw[i])
}
} else if (broadcasts < ITERATIONS * 2) {
erasePackets.push([])
const tmpPathID = pathIDs[broadcasts - ITERATIONS]
for (let i = 0; i < pathErase.length; i++) {
room.extendErasureIntervals(
tmpPathID,
pathErase[i][0],
pathErase[i][1],
)
}
} else {
resolve()
}
}, 100)
}
addPackets.push([])
prevTime = process.hrtime()
const tmpPathID = room.addPath(pathDraw[0])
pathIDs.push(tmpPathID)
for (let i = 1; i < pathDraw.length; i++) {
room.extendPath(tmpPathID, pathDraw[i])
}
})
.then(
() =>
new Promise((resolve) => {
sendListener.callback = (uid, channel, message) => {
const currTime = process.hrtime()
syncLocTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncPackets.push(message)
syncSize += message.message.length
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
}
prevTime = process.hrtime()
getEventListener(
"room",
"messageReceived",
)(createMessageReceivedEvent(syncStep1))
}),
)
.then(
() =>
new Promise((resolve) => {
broadcasts = 0
let currTime
const timeoutCallback = () => {
broadcasts += 1
if (broadcasts < ITERATIONS) {
addRemTime +=
(currTime[0] - prevTime[0]) * 1e9 +
(currTime[1] - prevTime[1])
} else {
eraseRemTime +=
(currTime[0] - prevTime[0]) * 1e9 +
(currTime[1] - prevTime[1])
}
let packets
if (broadcasts < ITERATIONS) {
packets = addPackets[broadcasts]
} else if (broadcasts < ITERATIONS * 2) {
packets = erasePackets[broadcasts - ITERATIONS]
} else {
return resolve()
}
prevTime = process.hrtime()
for (const packet of packets) {
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(packet))
}
}
updateRoom.addEventListener("addOrUpdatePath", () => {
currTime = process.hrtime()
addEvents += 1
clearTimeout(timeout)
timeout = setTimeout(timeoutCallback, 100)
})
updateRoom.addEventListener("removedIntervalsChange", () => {
currTime = process.hrtime()
if (broadcasts < ITERATIONS) {
addEvents += 1
} else if (broadcasts < ITERATIONS * 2) {
eraseEvents += 1
}
clearTimeout(timeout)
timeout = setTimeout(timeoutCallback, 100)
})
prevTime = process.hrtime()
for (const packet of addPackets[0]) {
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(packet))
}
}),
)
.then(
() =>
new Promise((resolve) => {
syncRoom.addEventListener("addOrUpdatePath", () => {
const currTime = process.hrtime()
syncRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
syncRoom.addEventListener("removedIntervalsChange", () => {
const currTime = process.hrtime()
syncRemTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncEvents += 1
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
})
prevTime = process.hrtime()
for (const syncPacket of syncPackets) {
getEventListener(
"sync",
"messageReceived",
)(createMessageReceivedEvent(syncPacket))
}
}),
)
.then(() => {
printBenchmark("single draw and erase [sequential]", ITERATIONS, {
addPath: {
timeLoc: addLocTime,
packets: addPackets.reduce(
(sum, packets) => sum + packets.length,
0,
),
size: addSize,
timeRem: addRemTime,
events: addEvents,
},
extendErasureIntervals: {
timeLoc: eraseLocTime,
packets: erasePackets.reduce(
(sum, packets) => sum + packets.length,
0,
),
size: eraseSize,
timeRem: eraseRemTime,
events: eraseEvents,
},
synchronisation: {
timeLoc: syncLocTime,
packets: syncPackets.length,
size: syncSize,
timeRem: syncRemTime,
events: syncEvents,
},
})
})
})
it("benchmarks a path draw and erase update in parallel", () => {
const ITERATIONS = 1000
const BLOCKSIZE = 10
jest.setTimeout(ITERATIONS * 200)
const pathIDs = []
let prevTime
let addTime
let addPackets = 0
let addSize = 0
let eraseTime
let erasePackets = 0
let eraseSize = 0
let syncTime
let syncPackets = 0
let syncSize = 0
let timeout
return new Promise((resolve) => {
broadcastListener.callback = (channel, message) => {
const currTime = process.hrtime()
addTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
addPackets += 1
addSize += message.message.length
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
}
prevTime = process.hrtime()
for (let j = 0; j < ITERATIONS; j++) {
const tmpPathID = room.addPath(pathDraw[0])
pathIDs.push(tmpPathID)
for (let i = 1; i < pathDraw.length; i++) {
room.extendPath(tmpPathID, pathDraw[i])
}
}
})
.then(
() =>
new Promise((resolve) => {
broadcastListener.callback = (channel, message) => {
const currTime = process.hrtime()
eraseTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
erasePackets += 1
eraseSize += message.message.length
clearTimeout(timeout)
timeout = setTimeout(() => {
resolve()
}, 10000)
}
// Necessary to allow yjs to execute transactions (majority of processing time)
function erasePath(sj) {
if (sj >= ITERATIONS) return
for (let j = sj; j < Math.min(sj + BLOCKSIZE, ITERATIONS); j++) {
const tmpPathID = pathIDs[j]
for (let i = 0; i < pathErase.length; i++) {
room.extendErasureIntervals(
tmpPathID,
pathErase[i][0],
pathErase[i][1],
)
}
}
setTimeout(erasePath, 0, sj + BLOCKSIZE)
}
prevTime = process.hrtime()
erasePath(0)
}),
)
.then(
() =>
new Promise((resolve) => {
sendListener.callback = (uid, channel, message) => {
const currTime = process.hrtime()
syncTime =
(currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1])
syncPackets += 1
syncSize += message.message.length
clearTimeout(timeout)
timeout = setTimeout(() => {
printBenchmark("path draw and erase [parallel]", ITERATIONS, {
addPath: {
timeLoc: addTime,
packets: addPackets,
size: addSize,
timeRem: -1,
events: -1,
},
extendErasureIntervals: {
timeLoc: eraseTime,
packets: erasePackets,
size: eraseSize,
timeRem: -1,
events: -1,
},
synchronisation: {
timeLoc: syncTime,
packets: syncPackets,
size: syncSize,
timeRem: -1,
events: -1,
},
})
resolve()
}, 1000)
}
prevTime = process.hrtime()
getEventListener(
"room",
"messageReceived",
)(createMessageReceivedEvent(syncStep1))
}),
)
})
it("communicates a path draw and erase update", () => {
jest.setTimeout(15000)
let pathID
const addPackets = []
const erasePackets = []
const syncPackets = []
let syncDonePacket = -1
const updatePaths = {}
const updateIntervals = {}
const syncPaths = {}
const syncIntervals = {}
let timeout
return new Promise((resolve) => {
// Draw the path
broadcastListener.callback = (channel, message) => {
expect(channel).toEqual("y-js")
expect(message.message instanceof Uint8Array).toBe(true)
addPackets.push(message)
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
}
pathID = room.addPath(pathDraw[0])
for (let i = 1; i < pathDraw.length; i++) {
room.getPathPoints(pathID)
room.extendPath(pathID, pathDraw[i])
}
})
.then(
() =>
new Promise((resolve) => {
// Erase the path
broadcastListener.callback = (channel, message) => {
expect(channel).toEqual("y-js")
expect(message.message instanceof Uint8Array).toBe(true)
erasePackets.push(message)
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
}
for (let i = 0; i < pathErase.length; i++) {
room.extendErasureIntervals(
pathID,
pathErase[i][0],
pathErase[i][1],
)
}
}),
)
.then(
() =>
new Promise((resolve) => {
// Request a sync step 2
sendListener.callback = (uid, channel, message) => {
expect(uid).toEqual("moritz")
expect(channel).toEqual("y-js")
expect(message.message instanceof Uint8Array).toBe(true)
syncPackets.push(message)
if (
message.message.length == syncDone.message.length &&
JSON.stringify(Object.assign(message, { uuid: undefined })) ==
JSON.stringify(syncDone)
) {
expect(syncDonePacket).toEqual(-1)
syncDonePacket = syncPackets.length
}
clearTimeout(timeout)
timeout = setTimeout(() => {
expect(syncDonePacket).toEqual(syncPackets.length)
resolve()
}, 1000)
}
getEventListener(
"room",
"messageReceived",
)(createMessageReceivedEvent(syncStep1))
}),
)
.then(
() =>
new Promise((resolve) => {
// Replay the draw updates
updateRoom.addEventListener(
"addOrUpdatePath",
({ detail: { id, points } }) => {
updatePaths[id] = points
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
updateRoom.addEventListener(
"removedIntervalsChange",
({ detail: { id, intervals } }) => {
updateIntervals[id] = intervals
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
for (const addPacket of addPackets) {
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(addPacket))
}
}),
)
.then(
() =>
new Promise((resolve) => {
// Check the draw updates
expect(updatePaths).toStrictEqual({ [pathID]: pathDraw })
expect(updateIntervals).toStrictEqual({ [pathID]: {} })
resolve()
}),
)
.then(
() =>
new Promise((resolve) => {
// Replay the erase updates
updateRoom.addEventListener(
"removedIntervalsChange",
({ detail: { id, intervals } }) => {
updateIntervals[id] = intervals
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
for (const erasePacket of erasePackets) {
getEventListener(
"update",
"messageReceived",
)(createMessageReceivedEvent(erasePacket))
}
}),
)
.then(
() =>
new Promise((resolve) => {
// Check the erase updates
expect(updatePaths).toStrictEqual({ [pathID]: pathDraw })
expect(updateIntervals).toStrictEqual({
[pathID]: Object.assign(
{},
new Array(pathDraw.length).fill([[0, 1]]),
),
})
resolve()
}),
)
.then(
() =>
new Promise((resolve) => {
// Replay the synchronisation
syncRoom.addEventListener(
"addOrUpdatePath",
({ detail: { id, points } }) => {
syncPaths[id] = points
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
syncRoom.addEventListener(
"removedIntervalsChange",
({ detail: { id, intervals } }) => {
syncIntervals[id] = intervals
clearTimeout(timeout)
timeout = setTimeout(() => resolve(), 1000)
},
)
for (const syncPacket of syncPackets) {
getEventListener(
"sync",
"messageReceived",
)(createMessageReceivedEvent(syncPacket))
}
}),
)
.then(
() =>
new Promise((resolve) => {
// Check the synchronisation
expect(syncPaths).toStrictEqual({ [pathID]: pathDraw })
expect(syncIntervals).toStrictEqual({
[pathID]: Object.assign(
{},
new Array(pathDraw.length).fill([[0, 1]]),
),
})
resolve()
}),
)
})
})