diff --git a/__tests__/benchmark.test.js b/__tests__/benchmark.test.js index 129368f56c4e6cb629d52d3dc51acaa342dd1aec..afdfa9a6453196547d884df1433976802727b483 100644 --- a/__tests__/benchmark.test.js +++ b/__tests__/benchmark.test.js @@ -22,11 +22,11 @@ import { createMessageReceivedEvent, handshake, syncStep1, - //syncDone, + syncDone, dotDraw, dotErase, - //pathDraw, - //pathErase, + pathDraw, + pathErase, } from "./benchmark.data.js" // Adapted from https://github.com/jprichardson/buffer-json (MIT license) @@ -155,6 +155,7 @@ function runBidirectionalBenchmark( BENCHMARK, FILENAME, ITERATIONSLIST, + BLOCKSIZE, addData, eraseData, @@ -283,7 +284,14 @@ function runBidirectionalBenchmark( prevTime = process.hrtime() - addOnInitFrontend(room, addPackets, pathIDs, addData) + addOnInitFrontend( + room, + addPackets, + pathIDs, + addData, + ITERATIONS, + BLOCKSIZE, + ) }), ) .then(() => { @@ -329,7 +337,14 @@ function runBidirectionalBenchmark( prevTime = process.hrtime() - eraseOnInitFrontend(room, erasePackets, pathIDs, eraseData) + eraseOnInitFrontend( + room, + erasePackets, + pathIDs, + eraseData, + ITERATIONS, + BLOCKSIZE, + ) }), ) .then(() => { @@ -621,6 +636,246 @@ function runBidirectionalBenchmark( ) } +function addOnInitFrontendSequential( + room, + addPackets, + pathIDs, + addData, + ITERATIONS, // eslint-disable-line no-unused-vars + BLOCKSIZE, // eslint-disable-line no-unused-vars +) { + addPackets.push([]) + + const drawPathID = room.addPath(addData[0]) + + pathIDs.push(drawPathID) + for (let i = 1; i < addData.length; i++) { + room.extendPath(drawPathID, addData[i]) + } +} + +function addOnBroadcastGroupSequential( + room, + addPackets, + pathIDs, + addData, + ITERATIONS, + broadcasts, + resolve, +) { + if (broadcasts < ITERATIONS) { + addPackets.push([]) + + const drawPathID = room.addPath(addData[0]) + pathIDs.push(drawPathID) + + for (let i = 1; i < addData.length; i++) { + room.extendPath(drawPathID, addData[i]) + } + } else { + resolve() + } +} + +function eraseOnInitFrontendSequential( + room, + erasePackets, + pathIDs, + eraseData, + ITERATIONS, // eslint-disable-line no-unused-vars + BLOCKSIZE, // eslint-disable-line no-unused-vars +) { + erasePackets.push([]) + + const erasePathID = pathIDs[0] + + for (let i = 0; i < eraseData.length; i++) { + room.extendErasureIntervals(erasePathID, eraseData[i][0], eraseData[i][1]) + } +} + +function eraseOnBroadcastGroupSequential( + room, + erasePackets, + pathIDs, + eraseData, + ITERATIONS, + broadcasts, + resolve, +) { + if (broadcasts < ITERATIONS) { + erasePackets.push([]) + + const erasePathID = pathIDs[broadcasts] + + for (let i = 0; i < eraseData.length; i++) { + room.extendErasureIntervals(erasePathID, eraseData[i][0], eraseData[i][1]) + } + } else { + resolve() + } +} + +function syncOnSendGroup(syncPackets, resolve) { + resolve() +} + +function addOnInitBackend(addPackets) { + for (const packet of addPackets[0]) { + getEventListener( + "update", + "messageReceived", + )(createMessageReceivedEvent(packet)) + } +} + +function addOnEventGroupSequential( + addPackets, + ITERATIONS, + broadcasts, + resolve, +) { + if (broadcasts >= ITERATIONS) { + return resolve() + } + + for (const packet of addPackets[broadcasts]) { + getEventListener( + "update", + "messageReceived", + )(createMessageReceivedEvent(packet)) + } +} + +function eraseOnInitBackend(erasePackets) { + for (const packet of erasePackets[0]) { + getEventListener( + "update", + "messageReceived", + )(createMessageReceivedEvent(packet)) + } +} + +function eraseOnEventGroupSequential( + erasePackets, + ITERATIONS, + broadcasts, + resolve, +) { + if (broadcasts >= ITERATIONS) { + return resolve() + } + + for (const packet of erasePackets[broadcasts]) { + getEventListener( + "update", + "messageReceived", + )(createMessageReceivedEvent(packet)) + } +} + +function snycOnEventGroup(resolve) { + resolve() +} + +function addOnInitFrontendParallel( + room, + addPackets, + pathIDs, + addData, + ITERATIONS, + BLOCKSIZE, +) { + addPackets.push([]) + + // Necessary to allow yjs to execute transactions (majority of processing time) + function addPath(sj) { + if (sj >= ITERATIONS) return + + for (let j = sj; j < Math.min(sj + BLOCKSIZE, ITERATIONS); j++) { + const drawPathID = room.addPath(addData[0]) + + pathIDs.push(drawPathID) + for (let i = 1; i < addData.length; i++) { + room.extendPath(drawPathID, addData[i]) + } + } + + setTimeout(addPath, 0, sj + BLOCKSIZE) + } + + addPath(0) +} + +function addOnBroadcastGroupParallel( + room, + addPackets, + pathIDs, + addData, + ITERATIONS, + broadcasts, + resolve, +) { + resolve() +} + +function eraseOnInitFrontendParallel( + room, + erasePackets, + pathIDs, + eraseData, + ITERATIONS, + BLOCKSIZE, +) { + erasePackets.push([]) + + // Necessary to allow yjs to execute transactions (majority of processing time) + function erasePath(sj) { + if (sj >= ITERATIONS) return + + for (let j = sj; j < Math.min(sj + BLOCKSIZE, ITERATIONS); j++) { + const erasePathID = pathIDs[j] + + for (let i = 0; i < eraseData.length; i++) { + room.extendErasureIntervals( + erasePathID, + eraseData[i][0], + eraseData[i][1], + ) + } + } + + setTimeout(erasePath, 0, sj + BLOCKSIZE) + } + + erasePath(0) +} + +function eraseOnBroadcastGroupParallel( + room, + erasePackets, + pathIDs, + eraseData, + ITERATIONS, + broadcasts, + resolve, +) { + resolve() +} + +function addOnEventGroupParallel(addPackets, ITERATIONS, broadcasts, resolve) { + resolve() +} + +function eraseOnEventGroupParallel( + erasePackets, + ITERATIONS, + broadcasts, + resolve, +) { + resolve() +} + describe("drawing app mesh", () => { beforeEach(() => { getUserID.mockClear() @@ -637,7 +892,7 @@ describe("drawing app mesh", () => { }) const ITERATIONSLIST = [10, 25, 50, 75, 100, 250, 500] - const BLOCKSIZE = 10 // eslint-disable-line no-unused-vars + const BLOCKSIZE = 10 jest.setTimeout(1000 * 60 * 60) @@ -646,163 +901,134 @@ describe("drawing app mesh", () => { "dot draw and erase [sequential]" /* BENCHMARK */, "plots/dot-seq-benchmark.tsv", ITERATIONSLIST /* ITERATIONSLIST */, + BLOCKSIZE /* BLOCKSIZE */, dotDraw /* addData */, dotErase /* eraseData */, - function addOnInitFrontend(room, addPackets, pathIDs, addData) { - addPackets.push([]) - - const drawPathID = room.addPath(addData[0]) - - pathIDs.push(drawPathID) - for (let i = 1; i < addData.length; i++) { - room.extendPath(drawPathID, addData[i]) - } - } /* addOnInitFrontend */, + addOnInitFrontendSequential /* addOnInitFrontend */, 100 /* addBroadcastGroupTimeout */, - function addOnBroadcastGroup( - room, - addPackets, - pathIDs, - addData, - ITERATIONS, - broadcasts, - resolve, - ) { - if (broadcasts < ITERATIONS) { - addPackets.push([]) - - const drawPathID = room.addPath(addData[0]) - pathIDs.push(drawPathID) - - for (let i = 1; i < addData.length; i++) { - room.extendPath(drawPathID, addData[i]) - } - } else { - resolve() - } - } /* addOnBroadcastGroup */, + addOnBroadcastGroupSequential /* addOnBroadcastGroup */, ".dot-seq-add.json" /* addPacketsFilename */, - function eraseOnInitFrontend(room, erasePackets, pathIDs, eraseData) { - erasePackets.push([]) - - const erasePathID = pathIDs[0] - - for (let i = 0; i < eraseData.length; i++) { - room.extendErasureIntervals( - erasePathID, - eraseData[i][0], - eraseData[i][1], - ) - } - } /* eraseOnInitFrontend */, + eraseOnInitFrontendSequential /* eraseOnInitFrontend */, 100 /* eraseBroadcastGroupTimeout */, - function eraseOnBroadcastGroup( - room, - erasePackets, - pathIDs, - eraseData, - ITERATIONS, - broadcasts, - resolve, - ) { - if (broadcasts < ITERATIONS) { - erasePackets.push([]) - - const erasePathID = pathIDs[broadcasts] - - for (let i = 0; i < eraseData.length; i++) { - room.extendErasureIntervals( - erasePathID, - eraseData[i][0], - eraseData[i][1], - ) - } - } else { - resolve() - } - } /* eraseOnBroadcastGroup */, + eraseOnBroadcastGroupSequential /* eraseOnBroadcastGroup */, ".dot-seq-erase.json" /* erasePacketsFilename */, 1000 /* syncSendGroupTimeout */, - function syncOnSendGroup(syncPackets, resolve) { - resolve() - } /* syncOnSendGroup */, + syncOnSendGroup /* syncOnSendGroup */, ".dot-seq-sync.json" /* syncPacketsFilename */, - function addOnInitBackend(addPackets) { - for (const packet of addPackets[0]) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(packet)) - } - } /* addOnInitBackend */, + addOnInitBackend /* addOnInitBackend */, + 100 /* addEventGroupTimeout */, + addOnEventGroupSequential /* addOnEventGroup */, + eraseOnInitBackend /* eraseOnInitBackend */, + 100 /* eraseEventGroupTimeout */, + eraseOnEventGroupSequential /* eraseOnEventGroupTimeout */, + 1000 /* syncEventGroupTimeout */, + snycOnEventGroup /* syncOnEventGroup */, + ) + }) + + it("benchmarks a dot draw and erase update in parallel", () => { + return runBidirectionalBenchmark( + "dot draw and erase [parallel]" /* BENCHMARK */, + "plots/dot-par-benchmark.tsv", + ITERATIONSLIST /* ITERATIONSLIST */, + BLOCKSIZE /* BLOCKSIZE */, + dotDraw /* addData */, + dotErase /* eraseData */, + addOnInitFrontendParallel /* addOnInitFrontend */, + 1000 /* addBroadcastGroupTimeout */, + addOnBroadcastGroupParallel /* addOnBroadcastGroup */, + ".dot-par-add.json" /* addPacketsFilename */, + eraseOnInitFrontendParallel /* eraseOnInitFrontend */, + 1000 /* eraseBroadcastGroupTimeout */, + eraseOnBroadcastGroupParallel /* eraseOnBroadcastGroup */, + ".dot-par-erase.json" /* erasePacketsFilename */, + 1000 /* syncSendGroupTimeout */, + syncOnSendGroup /* syncOnSendGroup */, + ".dot-par-sync.json" /* syncPacketsFilename */, + addOnInitBackend /* addOnInitBackend */, + 1000 /* addEventGroupTimeout */, + addOnEventGroupParallel /* addOnEventGroup */, + eraseOnInitBackend /* eraseOnInitBackend */, + 1000 /* eraseEventGroupTimeout */, + eraseOnEventGroupParallel /* eraseOnEventGroupTimeout */, + 1000 /* syncEventGroupTimeout */, + snycOnEventGroup /* syncOnEventGroup */, + ) + }) + + it("benchmarks a path draw and erase update sequentially", () => { + return runBidirectionalBenchmark( + "path draw and erase [sequential]" /* BENCHMARK */, + "plots/path-seq-benchmark.tsv", + ITERATIONSLIST /* ITERATIONSLIST */, + BLOCKSIZE /* BLOCKSIZE */, + pathDraw /* addData */, + pathErase /* eraseData */, + addOnInitFrontendSequential /* addOnInitFrontend */, + 100 /* addBroadcastGroupTimeout */, + addOnBroadcastGroupSequential /* addOnBroadcastGroup */, + ".path-seq-add.json" /* addPacketsFilename */, + eraseOnInitFrontendSequential /* eraseOnInitFrontend */, + 100 /* eraseBroadcastGroupTimeout */, + eraseOnBroadcastGroupSequential /* eraseOnBroadcastGroup */, + ".path-seq-erase.json" /* erasePacketsFilename */, + 1000 /* syncSendGroupTimeout */, + syncOnSendGroup /* syncOnSendGroup */, + ".path-seq-sync.json" /* syncPacketsFilename */, + addOnInitBackend /* addOnInitBackend */, 100 /* addEventGroupTimeout */, - function addOnEventGroup(addPackets, ITERATIONS, broadcasts, resolve) { - if (broadcasts >= ITERATIONS) { - return resolve() - } - - for (const packet of addPackets[broadcasts]) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(packet)) - } - } /* addOnEventGroupTimeout */, - function eraseOnInitBackend(erasePackets) { - for (const packet of erasePackets[0]) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(packet)) - } - } /* eraseOnInitBackend */, + addOnEventGroupSequential /* addOnEventGroup */, + eraseOnInitBackend /* eraseOnInitBackend */, 100 /* eraseEventGroupTimeout */, - function eraseOnEventGroup( - erasePackets, - ITERATIONS, - broadcasts, - resolve, - ) { - if (broadcasts >= ITERATIONS) { - return resolve() - } - - for (const packet of erasePackets[broadcasts]) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(packet)) - } - } /* eraseOnEventGroupTimeout */, + eraseOnEventGroupSequential /* eraseOnEventGroupTimeout */, 1000 /* syncEventGroupTimeout */, - function snycOnEventGroup(resolve) { - resolve() - } /* syncOnEventGroup */, + snycOnEventGroup /* syncOnEventGroup */, ) }) - /*it("benchmarks a single draw and erase update sequentially", () => { - const dotIDs = [] - let prevTime + it("benchmarks a path draw and erase update in parallel", () => { + return runBidirectionalBenchmark( + "path draw and erase [parallel]" /* BENCHMARK */, + "plots/path-par-benchmark.tsv", + ITERATIONSLIST /* ITERATIONSLIST */, + BLOCKSIZE /* BLOCKSIZE */, + pathDraw /* addData */, + pathErase /* eraseData */, + addOnInitFrontendParallel /* addOnInitFrontend */, + 1000 /* addBroadcastGroupTimeout */, + addOnBroadcastGroupParallel /* addOnBroadcastGroup */, + ".path-par-add.json" /* addPacketsFilename */, + eraseOnInitFrontendParallel /* eraseOnInitFrontend */, + 1000 /* eraseBroadcastGroupTimeout */, + eraseOnBroadcastGroupParallel /* eraseOnBroadcastGroup */, + ".path-par-erase.json" /* erasePacketsFilename */, + 1000 /* syncSendGroupTimeout */, + syncOnSendGroup /* syncOnSendGroup */, + ".path-par-sync.json" /* syncPacketsFilename */, + addOnInitBackend /* addOnInitBackend */, + 1000 /* addEventGroupTimeout */, + addOnEventGroupParallel /* addOnEventGroup */, + eraseOnInitBackend /* eraseOnInitBackend */, + 1000 /* eraseEventGroupTimeout */, + eraseOnEventGroupParallel /* eraseOnEventGroupTimeout */, + 1000 /* syncEventGroupTimeout */, + snycOnEventGroup /* syncOnEventGroup */, + ) + }) - let broadcasts = 0 + // TODO: apply refactoring if possible + it("communicates a single draw and erase update", () => { + let dotID - let addLocTime = 0 let addPackets = [] - let addSize = 0 - let eraseLocTime = 0 let erasePackets = [] - let eraseSize = 0 - let syncLocTime let syncPackets = [] - let syncSize = 0 + let syncDonePacket = -1 - let addRemTime = 0 - let addEvents = 0 - let eraseRemTime = 0 - let eraseEvents = 0 - let syncRemTime = 0 - let syncEvents = 0 + const updatePaths = {} + const updateIntervals = {} + const syncPaths = {} + const syncIntervals = {} let timeout @@ -822,70 +1048,78 @@ describe("drawing app mesh", () => { .then( () => new Promise((resolve) => { + // Draw the dot broadcastListener.callback = (channel, message) => { - const currTime = process.hrtime() - - broadcasts += 1 - - if (broadcasts <= ITERATIONS) { - addLocTime += - (currTime[0] - prevTime[0]) * 1e9 + - (currTime[1] - prevTime[1]) - addPackets.push(message) - addSize += message.message.length - } else { - eraseLocTime += - (currTime[0] - prevTime[0]) * 1e9 + - (currTime[1] - prevTime[1]) - erasePackets.push(message) - eraseSize += message.message.length - } + expect(channel).toEqual("y-js") + expect(message.message instanceof Uint8Array).toBe(true) - prevTime = process.hrtime() - - if (broadcasts < ITERATIONS) { - dotIDs.push(room.addPath(dotDraw[0])) - } else if (broadcasts < ITERATIONS * 2) { - room.extendErasureIntervals( - dotIDs[broadcasts - ITERATIONS], - dotErase[0][0], - dotErase[0][1], - ) - } else { - resolve() - } - } + addPackets.push(message) - prevTime = process.hrtime() + clearTimeout(timeout) + timeout = setTimeout(() => resolve(), 1000) + } - dotIDs.push(room.addPath(dotDraw[0])) + dotID = room.addPath(dotDraw[0]) }), ) .then(() => { broadcastListener.callback = null - dumpBSON("dot-seq-add.json", addPackets) + dumpBSON(".dot-seq-add.json", addPackets) addPackets = null + }) + .then( + () => + new Promise((resolve) => { + // Erase the dot + broadcastListener.callback = (channel, message) => { + expect(channel).toEqual("y-js") + expect(message.message instanceof Uint8Array).toBe(true) - dumpBSON("dot-seq-erase.json", erasePackets) + erasePackets.push(message) + + clearTimeout(timeout) + timeout = setTimeout(() => resolve(), 1000) + } + + room.extendErasureIntervals(dotID, dotErase[0][0], dotErase[0][1]) + }), + ) + .then(() => { + broadcastListener.callback = null + + dumpBSON(".dot-seq-erase.json", erasePackets) erasePackets = null }) .then( () => new Promise((resolve) => { + // Request a sync step 2 sendListener.callback = (uid, channel, message) => { - const currTime = process.hrtime() - syncLocTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) + expect(uid).toEqual("moritz") + expect(channel).toEqual("y-js") + expect(message.message instanceof Uint8Array).toBe(true) syncPackets.push(message) - syncSize += message.message.length + + if ( + message.message.length == syncDone.message.length && + JSON.stringify( + Object.assign({}, message, { uuid: undefined }), + ) == JSON.stringify(syncDone) + ) { + expect(syncDonePacket).toEqual(-1) + + syncDonePacket = syncPackets.length + } clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - } + timeout = setTimeout(() => { + expect(syncDonePacket).toEqual(syncPackets.length) - prevTime = process.hrtime() + resolve() + }, 1000) + } getEventListener( "room", @@ -896,7 +1130,7 @@ describe("drawing app mesh", () => { .then(() => { sendListener.callback = null - dumpBSON("dot-seq-sync.json", syncPackets) + dumpBSON(".dot-seq-sync.json", syncPackets) syncPackets = null room.disconnect() @@ -912,8 +1146,7 @@ describe("drawing app mesh", () => { "messageReceived", )(createMessageReceivedEvent(handshake, "tw-ml")) - addPackets = loadBSON("dot-seq-add.json") - erasePackets = loadBSON("dot-seq-erase.json") + addPackets = loadBSON(".dot-seq-add.json") return resolve() }), @@ -921,70 +1154,82 @@ describe("drawing app mesh", () => { .then( () => new Promise((resolve) => { - broadcasts = 0 - - let currTime - - const timeoutCallback = () => { - broadcasts += 1 - - // TODO: can we assume that we only use one message here? - if (broadcasts <= ITERATIONS) { - addRemTime += - (currTime[0] - prevTime[0]) * 1e9 + - (currTime[1] - prevTime[1]) - } else { - eraseRemTime += - (currTime[0] - prevTime[0]) * 1e9 + - (currTime[1] - prevTime[1]) - } + // Replay the draw updates + updateRoom.addEventListener( + "addOrUpdatePath", + ({ detail: { id, points } }) => { + updatePaths[id] = points - let packet + clearTimeout(timeout) + timeout = setTimeout(() => resolve(), 1000) + }, + ) - if (broadcasts < ITERATIONS) { - packet = addPackets[broadcasts] - } else if (broadcasts < ITERATIONS * 2) { - packet = erasePackets[broadcasts - ITERATIONS] - } else { - return resolve() - } + updateRoom.addEventListener( + "removedIntervalsChange", + ({ detail: { id, intervals } }) => { + updateIntervals[id] = intervals - prevTime = process.hrtime() + clearTimeout(timeout) + timeout = setTimeout(() => resolve(), 1000) + }, + ) + for (const addPacket of addPackets) { getEventListener( "update", "messageReceived", - )(createMessageReceivedEvent(packet)) + )(createMessageReceivedEvent(addPacket)) } + }), + ) + .then( + () => + new Promise((resolve) => { + // Check the draw updates + expect(updatePaths).toStrictEqual({ [dotID]: dotDraw }) + expect(updateIntervals).toStrictEqual({ [dotID]: {} }) - updateRoom.addEventListener("addOrUpdatePath", () => { - currTime = process.hrtime() - - addEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(timeoutCallback, 100) - }) + resolve() + }), + ) + .then(() => { + addPackets = null - updateRoom.addEventListener("removedIntervalsChange", () => { - currTime = process.hrtime() + erasePackets = loadBSON(".dot-seq-erase.json") + }) + .then( + () => + new Promise((resolve) => { + // Replay the erase updates + updateRoom.addEventListener( + "removedIntervalsChange", + ({ detail: { id, intervals } }) => { + updateIntervals[id] = intervals - if (broadcasts < ITERATIONS) { - addEvents += 1 - } else if (broadcasts < ITERATIONS * 2) { - eraseEvents += 1 - } + clearTimeout(timeout) + timeout = setTimeout(() => resolve(), 1000) + }, + ) - clearTimeout(timeout) - timeout = setTimeout(timeoutCallback, 100) + for (const erasePacket of erasePackets) { + getEventListener( + "update", + "messageReceived", + )(createMessageReceivedEvent(erasePacket)) + } + }), + ) + .then( + () => + new Promise((resolve) => { + // Check the erase updates + expect(updatePaths).toStrictEqual({ [dotID]: dotDraw }) + expect(updateIntervals).toStrictEqual({ + [dotID]: { [dotErase[0][0]]: dotErase[0][1] }, }) - prevTime = process.hrtime() - - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(addPackets[0])) + resolve() }), ) .then(() => { @@ -1004,7 +1249,7 @@ describe("drawing app mesh", () => { "messageReceived", )(createMessageReceivedEvent(handshake, "tw-ml")) - syncPackets = loadBSON("dot-seq-sync.json") + syncPackets = loadBSON(".dot-seq-sync.json") return resolve() }), @@ -1012,954 +1257,26 @@ describe("drawing app mesh", () => { .then( () => new Promise((resolve) => { - syncRoom.addEventListener("addOrUpdatePath", () => { - const currTime = process.hrtime() + // Replay the synchronisation + syncRoom.addEventListener( + "addOrUpdatePath", + ({ detail: { id, points } }) => { + syncPaths[id] = points - syncRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - syncEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - syncRoom.addEventListener("removedIntervalsChange", () => { - const currTime = process.hrtime() - - syncRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - syncEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - prevTime = process.hrtime() - - for (const syncPacket of syncPackets) { - getEventListener( - "sync", - "messageReceived", - )(createMessageReceivedEvent(syncPacket)) - } - }), - ) - .then(() => { - syncPackets = null - - syncRoom.disconnect() - syncRoom = null - }) - .then(() => { - addPackets = loadBSON("dot-seq-add.json").length - erasePackets = loadBSON("dot-seq-erase.json").length - syncPackets = loadBSON("dot-seq-sync.json").length - - printBenchmark("single draw and erase [sequential]", ITERATIONS, { - addPath: { - timeLoc: addLocTime, - packets: addPackets, - size: addSize, - timeRem: addRemTime, - events: addEvents, - }, - extendErasureIntervals: { - timeLoc: eraseLocTime, - packets: erasePackets, - size: eraseSize, - timeRem: eraseRemTime, - events: eraseEvents, - }, - synchronisation: { - timeLoc: syncLocTime, - packets: syncPackets, - size: syncSize, - timeRem: syncRemTime, - events: syncEvents, - }, - }) - }) - }) - - it("benchmarks a single draw and erase update in parallel", () => { - const dotIDs = [] - let prevTime - - let addLocTime - let addPackets = [] - let addSize = 0 - let eraseLocTime - let erasePackets = [] - let eraseSize = 0 - let syncLocTime - let syncPackets = [] - let syncSize = 0 - - let addRemTime = 0 - let addEvents = 0 - let eraseRemTime = 0 - let eraseEvents = 0 - let syncRemTime = 0 - let syncEvents = 0 - - let timeout - - let room = null - let updateRoom = null - let syncRoom = null - - // eslint-disable-next-line no-async-promise-executor - return new Promise(async (resolve) => { - room = await connect("room", MockConnection) - getEventListener( - "room", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - return resolve() - }) - .then( - () => - new Promise((resolve) => { - broadcastListener.callback = (channel, message) => { - const currTime = process.hrtime() - - addLocTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - addPackets.push(message) - addSize += message.message.length - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - } - - prevTime = process.hrtime() - - for (let i = 0; i < ITERATIONS; i++) { - dotIDs.push(room.addPath(dotDraw[0])) - } - }), - ) - .then(() => { - broadcastListener.callback = null - - dumpBSON("dot-seq-add.json", addPackets) - addPackets = null - }) - .then( - () => - new Promise((resolve) => { - broadcastListener.callback = (channel, message) => { - const currTime = process.hrtime() - - eraseLocTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - erasePackets.push(message) - eraseSize += message.message.length - - clearTimeout(timeout) - timeout = setTimeout(() => { - resolve() - }, 1000) - } - - prevTime = process.hrtime() - - for (let i = 0; i < ITERATIONS; i++) { - room.extendErasureIntervals( - dotIDs[i], - dotErase[0][0], - dotErase[0][1], - ) - } - }), - ) - .then(() => { - broadcastListener.callback = null - - dumpBSON("dot-seq-erase.json", erasePackets) - erasePackets = null - }) - .then( - () => - new Promise((resolve) => { - sendListener.callback = (uid, channel, message) => { - const currTime = process.hrtime() - syncLocTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - - syncPackets.push(message) - syncSize += message.message.length - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - } - - prevTime = process.hrtime() - - getEventListener( - "room", - "messageReceived", - )(createMessageReceivedEvent(syncStep1)) - }), - ) - .then(() => { - sendListener.callback = null - - dumpBSON("dot-seq-sync.json", syncPackets) - syncPackets = null - - room.disconnect() - room = null - }) - .then( - () => - // eslint-disable-next-line no-async-promise-executor - new Promise(async (resolve) => { - updateRoom = await connect("update", MockConnection) - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - - addPackets = loadBSON("dot-seq-add.json") - - return resolve() - }), - ) - .then( - () => - new Promise((resolve) => { - updateRoom.addEventListener("addOrUpdatePath", () => { - const currTime = process.hrtime() - - addRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - addEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - updateRoom.addEventListener("removedIntervalsChange", () => { - const currTime = process.hrtime() - - addRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - addEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - for (const addPacket of addPackets) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(addPacket)) - } - }), - ) - .then(() => { - addPackets = null - erasePackets = loadBSON("dot-seq-erase.json") - }) - .then( - () => - new Promise((resolve) => { - updateRoom.addEventListener("addOrUpdatePath", () => { - const currTime = process.hrtime() - - eraseRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - eraseEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - updateRoom.addEventListener("removedIntervalsChange", () => { - const currTime = process.hrtime() - - eraseRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - eraseEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - for (const erasePacket of erasePackets) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(erasePacket)) - } - }), - ) - .then(() => { - addPackets = null - erasePackets = null - - updateRoom.disconnect() - updateRoom = null - }) - .then( - () => - // eslint-disable-next-line no-async-promise-executor - new Promise(async (resolve) => { - syncRoom = await connect("sync", MockConnection) - getEventListener( - "sync", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - - syncPackets = loadBSON("dot-seq-sync.json") - - return resolve() - }), - ) - .then( - () => - new Promise((resolve) => { - syncRoom.addEventListener("addOrUpdatePath", () => { - const currTime = process.hrtime() - - syncRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - syncEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - syncRoom.addEventListener("removedIntervalsChange", () => { - const currTime = process.hrtime() - - syncRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - syncEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - for (const syncPacket of syncPackets) { - getEventListener( - "sync", - "messageReceived", - )(createMessageReceivedEvent(syncPacket)) - } - }), - ) - .then(() => { - syncPackets = null - - syncRoom.disconnect() - syncRoom = null - }) - .then(() => { - addPackets = loadBSON("dot-seq-add.json").length - erasePackets = loadBSON("dot-seq-erase.json").length - syncPackets = loadBSON("dot-seq-sync.json").length - - printBenchmark("single draw and erase [parallel]", ITERATIONS, { - addPath: { - timeLoc: addLocTime, - packets: addPackets, - size: addSize, - timeRem: addRemTime, - events: addEvents, - }, - extendErasureIntervals: { - timeLoc: eraseLocTime, - packets: erasePackets, - size: eraseSize, - timeRem: eraseRemTime, - events: eraseEvents, - }, - synchronisation: { - timeLoc: syncLocTime, - packets: syncPackets, - size: syncSize, - timeRem: syncRemTime, - events: syncEvents, - }, - }) - }) - }) - - it("communicates a single draw and erase update", () => { - let dotID - - let addPackets = [] - let erasePackets = [] - let syncPackets = [] - let syncDonePacket = -1 - - const updatePaths = {} - const updateIntervals = {} - const syncPaths = {} - const syncIntervals = {} - - let timeout - - let room = null - let updateRoom = null - let syncRoom = null - - // eslint-disable-next-line no-async-promise-executor - return new Promise(async (resolve) => { - room = await connect("room", MockConnection) - getEventListener( - "room", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - return resolve() - }) - .then( - () => - new Promise((resolve) => { - // Draw the dot - broadcastListener.callback = (channel, message) => { - expect(channel).toEqual("y-js") - expect(message.message instanceof Uint8Array).toBe(true) - - addPackets.push(message) - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - } - - dotID = room.addPath(dotDraw[0]) - }), - ) - .then(() => { - broadcastListener.callback = null - - dumpBSON("dot-seq-add.json", addPackets) - addPackets = null - }) - .then( - () => - new Promise((resolve) => { - // Erase the dot - broadcastListener.callback = (channel, message) => { - expect(channel).toEqual("y-js") - expect(message.message instanceof Uint8Array).toBe(true) - - erasePackets.push(message) - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - } - - room.extendErasureIntervals(dotID, dotErase[0][0], dotErase[0][1]) - }), - ) - .then(() => { - broadcastListener.callback = null - - dumpBSON("dot-seq-erase.json", erasePackets) - erasePackets = null - }) - .then( - () => - new Promise((resolve) => { - // Request a sync step 2 - sendListener.callback = (uid, channel, message) => { - expect(uid).toEqual("moritz") - expect(channel).toEqual("y-js") - expect(message.message instanceof Uint8Array).toBe(true) - - syncPackets.push(message) - - if ( - message.message.length == syncDone.message.length && - JSON.stringify( - Object.assign({}, message, { uuid: undefined }), - ) == JSON.stringify(syncDone) - ) { - expect(syncDonePacket).toEqual(-1) - - syncDonePacket = syncPackets.length - } - - clearTimeout(timeout) - timeout = setTimeout(() => { - expect(syncDonePacket).toEqual(syncPackets.length) - - resolve() - }, 1000) - } - - getEventListener( - "room", - "messageReceived", - )(createMessageReceivedEvent(syncStep1)) - }), - ) - .then(() => { - sendListener.callback = null - - dumpBSON("dot-seq-sync.json", syncPackets) - syncPackets = null - - room.disconnect() - room = null - }) - .then( - () => - // eslint-disable-next-line no-async-promise-executor - new Promise(async (resolve) => { - updateRoom = await connect("update", MockConnection) - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - - addPackets = loadBSON("dot-seq-add.json") - - return resolve() - }), - ) - .then( - () => - new Promise((resolve) => { - // Replay the draw updates - updateRoom.addEventListener( - "addOrUpdatePath", - ({ detail: { id, points } }) => { - updatePaths[id] = points - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }, - ) - - updateRoom.addEventListener( - "removedIntervalsChange", - ({ detail: { id, intervals } }) => { - updateIntervals[id] = intervals - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }, - ) - - for (const addPacket of addPackets) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(addPacket)) - } - }), - ) - .then( - () => - new Promise((resolve) => { - // Check the draw updates - expect(updatePaths).toStrictEqual({ [dotID]: dotDraw }) - expect(updateIntervals).toStrictEqual({ [dotID]: {} }) - - resolve() - }), - ) - .then(() => { - addPackets = null - - erasePackets = loadBSON("dot-seq-erase.json") - }) - .then( - () => - new Promise((resolve) => { - // Replay the erase updates - updateRoom.addEventListener( - "removedIntervalsChange", - ({ detail: { id, intervals } }) => { - updateIntervals[id] = intervals - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }, - ) - - for (const erasePacket of erasePackets) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(erasePacket)) - } - }), - ) - .then( - () => - new Promise((resolve) => { - // Check the erase updates - expect(updatePaths).toStrictEqual({ [dotID]: dotDraw }) - expect(updateIntervals).toStrictEqual({ - [dotID]: { [dotErase[0][0]]: dotErase[0][1] }, - }) - - resolve() - }), - ) - .then(() => { - addPackets = null - erasePackets = null - - updateRoom.disconnect() - updateRoom = null - }) - .then( - () => - // eslint-disable-next-line no-async-promise-executor - new Promise(async (resolve) => { - syncRoom = await connect("sync", MockConnection) - getEventListener( - "sync", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - - syncPackets = loadBSON("dot-seq-sync.json") - - return resolve() - }), - ) - .then( - () => - new Promise((resolve) => { - // Replay the synchronisation - syncRoom.addEventListener( - "addOrUpdatePath", - ({ detail: { id, points } }) => { - syncPaths[id] = points - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }, - ) - - syncRoom.addEventListener( - "removedIntervalsChange", - ({ detail: { id, intervals } }) => { - syncIntervals[id] = intervals - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }, - ) - - for (const syncPacket of syncPackets) { - getEventListener( - "sync", - "messageReceived", - )(createMessageReceivedEvent(syncPacket)) - } - }), - ) - .then( - () => - new Promise((resolve) => { - // Check the synchronisation - expect(syncPaths).toStrictEqual({ [dotID]: dotDraw }) - expect(syncIntervals).toStrictEqual({ - [dotID]: { [dotErase[0][0]]: dotErase[0][1] }, - }) - - resolve() - }), - ) - .then(() => { - syncPackets = null - - syncRoom.disconnect() - syncRoom = null - }) - }) - - it("benchmarks a path draw and erase update sequentially", () => { - const pathIDs = [] - let prevTime - let currTime - - let broadcasts = 0 - - let addLocTime = 0 - let addPackets = [] - let addSize = 0 - let eraseLocTime = 0 - let erasePackets = [] - let eraseSize = 0 - let syncLocTime - let syncPackets = [] - let syncSize = 0 - - let addRemTime = 0 - let addEvents = 0 - let eraseRemTime = 0 - let eraseEvents = 0 - let syncRemTime = 0 - let syncEvents = 0 - - let timeout - - let room = null - let updateRoom = null - let syncRoom = null - - // eslint-disable-next-line no-async-promise-executor - return new Promise(async (resolve) => { - room = await connect("room", MockConnection) - getEventListener( - "room", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - return resolve() - }) - .then( - () => - new Promise((resolve) => { - broadcastListener.callback = (channel, message) => { - currTime = process.hrtime() - - if (broadcasts < ITERATIONS) { - addPackets[addPackets.length - 1].push(message) - addSize += message.message.length - } else { - erasePackets[erasePackets.length - 1].push(message) - eraseSize += message.message.length - } - - clearTimeout(timeout) - timeout = setTimeout(() => { - broadcasts += 1 - - if (broadcasts <= ITERATIONS) { - addLocTime += - (currTime[0] - prevTime[0]) * 1e9 + - (currTime[1] - prevTime[1]) - } else { - eraseLocTime += - (currTime[0] - prevTime[0]) * 1e9 + - (currTime[1] - prevTime[1]) - } - - prevTime = process.hrtime() - - if (broadcasts < ITERATIONS) { - addPackets.push([]) - - const tmpPathID = room.addPath(pathDraw[0]) - pathIDs.push(tmpPathID) - for (let i = 1; i < pathDraw.length; i++) { - room.extendPath(tmpPathID, pathDraw[i]) - } - } else if (broadcasts < ITERATIONS * 2) { - erasePackets.push([]) - - const tmpPathID = pathIDs[broadcasts - ITERATIONS] - - for (let i = 0; i < pathErase.length; i++) { - room.extendErasureIntervals( - tmpPathID, - pathErase[i][0], - pathErase[i][1], - ) - } - } else { - resolve() - } - }, 100) - } - - addPackets.push([]) - - prevTime = process.hrtime() - - const tmpPathID = room.addPath(pathDraw[0]) - pathIDs.push(tmpPathID) - for (let i = 1; i < pathDraw.length; i++) { - room.extendPath(tmpPathID, pathDraw[i]) - } - }), - ) - .then(() => { - broadcastListener.callback = null - - dumpBSON("dot-seq-add.json", addPackets) - addPackets = null - - dumpBSON("dot-seq-erase.json", erasePackets) - erasePackets = null - }) - .then( - () => - new Promise((resolve) => { - sendListener.callback = (uid, channel, message) => { - const currTime = process.hrtime() - syncLocTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - - syncPackets.push(message) - syncSize += message.message.length - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - } - - prevTime = process.hrtime() - - getEventListener( - "room", - "messageReceived", - )(createMessageReceivedEvent(syncStep1)) - }), - ) - .then(() => { - sendListener.callback = null - - dumpBSON("dot-seq-sync.json", syncPackets) - syncPackets = null - - room.disconnect() - room = null - }) - .then( - () => - // eslint-disable-next-line no-async-promise-executor - new Promise(async (resolve) => { - updateRoom = await connect("update", MockConnection) - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - - addPackets = loadBSON("dot-seq-add.json") - erasePackets = loadBSON("dot-seq-erase.json") - - return resolve() - }), - ) - .then( - () => - new Promise((resolve) => { - broadcasts = 0 - - let currTime - - const timeoutCallback = () => { - broadcasts += 1 - - if (broadcasts <= ITERATIONS) { - addRemTime += - (currTime[0] - prevTime[0]) * 1e9 + - (currTime[1] - prevTime[1]) - } else { - eraseRemTime += - (currTime[0] - prevTime[0]) * 1e9 + - (currTime[1] - prevTime[1]) - } - - let packets - - if (broadcasts < ITERATIONS) { - packets = addPackets[broadcasts] - } else if (broadcasts < ITERATIONS * 2) { - packets = erasePackets[broadcasts - ITERATIONS] - } else { - return resolve() - } - - prevTime = process.hrtime() - - for (const packet of packets) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(packet)) - } - } - - updateRoom.addEventListener("addOrUpdatePath", () => { - currTime = process.hrtime() - - addEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(timeoutCallback, 100) - }) - - updateRoom.addEventListener("removedIntervalsChange", () => { - currTime = process.hrtime() - - if (broadcasts < ITERATIONS) { - addEvents += 1 - } else if (broadcasts < ITERATIONS * 2) { - eraseEvents += 1 - } - - clearTimeout(timeout) - timeout = setTimeout(timeoutCallback, 100) - }) - - prevTime = process.hrtime() - - for (const packet of addPackets[0]) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(packet)) - } - }), - ) - .then(() => { - addPackets = null - erasePackets = null - - updateRoom.disconnect() - updateRoom = null - }) - .then( - () => - // eslint-disable-next-line no-async-promise-executor - new Promise(async (resolve) => { - syncRoom = await connect("sync", MockConnection) - getEventListener( - "sync", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - - syncPackets = loadBSON("dot-seq-sync.json") - - return resolve() - }), - ) - .then( - () => - new Promise((resolve) => { - syncRoom.addEventListener("addOrUpdatePath", () => { - const currTime = process.hrtime() - - syncRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - syncEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - syncRoom.addEventListener("removedIntervalsChange", () => { - const currTime = process.hrtime() - - syncRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - syncEvents += 1 + clearTimeout(timeout) + timeout = setTimeout(() => resolve(), 1000) + }, + ) - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) + syncRoom.addEventListener( + "removedIntervalsChange", + ({ detail: { id, intervals } }) => { + syncIntervals[id] = intervals - prevTime = process.hrtime() + clearTimeout(timeout) + timeout = setTimeout(() => resolve(), 1000) + }, + ) for (const syncPacket of syncPackets) { getEventListener( @@ -1969,347 +1286,16 @@ describe("drawing app mesh", () => { } }), ) - .then(() => { - syncPackets = null - - syncRoom.disconnect() - syncRoom = null - }) - .then(() => { - addPackets = loadBSON("dot-seq-add.json").reduce( - (sum, packets) => sum + packets.length, - 0, - ) - erasePackets = loadBSON("dot-seq-erase.json").reduce( - (sum, packets) => sum + packets.length, - 0, - ) - syncPackets = loadBSON("dot-seq-sync.json").length - - printBenchmark("path draw and erase [sequential]", ITERATIONS, { - addPath: { - timeLoc: addLocTime, - packets: addPackets, - size: addSize, - timeRem: addRemTime, - events: addEvents, - }, - extendErasureIntervals: { - timeLoc: eraseLocTime, - packets: erasePackets, - size: eraseSize, - timeRem: eraseRemTime, - events: eraseEvents, - }, - synchronisation: { - timeLoc: syncLocTime, - packets: syncPackets, - size: syncSize, - timeRem: syncRemTime, - events: syncEvents, - }, - }) - }) - }) - - it("benchmarks a path draw and erase update in parallel", () => { - const pathIDs = [] - let prevTime - - let addLocTime - let addPackets = [] - let addSize = 0 - let eraseLocTime - let erasePackets = [] - let eraseSize = 0 - let syncLocTime - let syncPackets = [] - let syncSize = 0 - - let addRemTime = 0 - let addEvents = 0 - let eraseRemTime = 0 - let eraseEvents = 0 - let syncRemTime = 0 - let syncEvents = 0 - - let timeout - - let room = null - let updateRoom = null - let syncRoom = null - - // eslint-disable-next-line no-async-promise-executor - return new Promise(async (resolve) => { - room = await connect("room", MockConnection) - getEventListener( - "room", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - return resolve() - }) - .then( - () => - new Promise((resolve) => { - broadcastListener.callback = (channel, message) => { - const currTime = process.hrtime() - - addLocTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - addPackets.push(message) - addSize += message.message.length - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - } - - prevTime = process.hrtime() - - for (let j = 0; j < ITERATIONS; j++) { - const tmpPathID = room.addPath(pathDraw[0]) - pathIDs.push(tmpPathID) - for (let i = 1; i < pathDraw.length; i++) { - room.extendPath(tmpPathID, pathDraw[i]) - } - } - }), - ) - .then(() => { - broadcastListener.callback = null - - dumpBSON("dot-seq-add.json", addPackets) - addPackets = null - }) - .then( - () => - new Promise((resolve) => { - broadcastListener.callback = (channel, message) => { - const currTime = process.hrtime() - - eraseLocTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - erasePackets.push(message) - eraseSize += message.message.length - - clearTimeout(timeout) - timeout = setTimeout(() => { - resolve() - }, 10000) - } - - // Necessary to allow yjs to execute transactions (majority of processing time) - function erasePath(sj) { - if (sj >= ITERATIONS) return - - for (let j = sj; j < Math.min(sj + BLOCKSIZE, ITERATIONS); j++) { - const tmpPathID = pathIDs[j] - - for (let i = 0; i < pathErase.length; i++) { - room.extendErasureIntervals( - tmpPathID, - pathErase[i][0], - pathErase[i][1], - ) - } - } - - setTimeout(erasePath, 0, sj + BLOCKSIZE) - } - - prevTime = process.hrtime() - erasePath(0) - }), - ) - .then(() => { - broadcastListener.callback = null - - dumpBSON("dot-seq-erase.json", erasePackets) - erasePackets = null - }) - .then( - () => - new Promise((resolve) => { - sendListener.callback = (uid, channel, message) => { - const currTime = process.hrtime() - syncLocTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - - syncPackets.push(message) - syncSize += message.message.length - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - } - - prevTime = process.hrtime() - - getEventListener( - "room", - "messageReceived", - )(createMessageReceivedEvent(syncStep1)) - }), - ) - .then(() => { - sendListener.callback = null - - dumpBSON("dot-seq-sync.json", syncPackets) - syncPackets = null - - room.disconnect() - room = null - }) - .then( - () => - // eslint-disable-next-line no-async-promise-executor - new Promise(async (resolve) => { - updateRoom = await connect("update", MockConnection) - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - - addPackets = loadBSON("dot-seq-add.json") - - return resolve() - }), - ) - .then( - () => - new Promise((resolve) => { - updateRoom.addEventListener("addOrUpdatePath", () => { - const currTime = process.hrtime() - - addRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - addEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - updateRoom.addEventListener("removedIntervalsChange", () => { - const currTime = process.hrtime() - - addRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - addEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - for (const addPacket of addPackets) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(addPacket)) - } - }), - ) - .then(() => { - addPackets = null - erasePackets = loadBSON("dot-seq-erase.json") - }) - .then( - () => - new Promise((resolve) => { - updateRoom.addEventListener("addOrUpdatePath", () => { - const currTime = process.hrtime() - - eraseRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - eraseEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - updateRoom.addEventListener("removedIntervalsChange", () => { - const currTime = process.hrtime() - - eraseRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - eraseEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - // Necessary to allow yjs to execute transactions (majority of processing time) - function erasePath(sj) { - if (sj >= erasePackets.length) return - - for ( - let j = sj; - j < Math.min(sj + BLOCKSIZE * 10, erasePackets.length); - j++ - ) { - getEventListener( - "update", - "messageReceived", - )(createMessageReceivedEvent(erasePackets[j])) - } - - setTimeout(erasePath, 0, sj + BLOCKSIZE * 10) - } - - prevTime = process.hrtime() - erasePath(0) - }), - ) - .then(() => { - addPackets = null - erasePackets = null - - updateRoom.disconnect() - updateRoom = null - }) - .then( - () => - // eslint-disable-next-line no-async-promise-executor - new Promise(async (resolve) => { - syncRoom = await connect("sync", MockConnection) - getEventListener( - "sync", - "messageReceived", - )(createMessageReceivedEvent(handshake, "tw-ml")) - - syncPackets = loadBSON("dot-seq-sync.json") - - return resolve() - }), - ) .then( () => new Promise((resolve) => { - syncRoom.addEventListener("addOrUpdatePath", () => { - const currTime = process.hrtime() - - syncRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - syncEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) - }) - - syncRoom.addEventListener("removedIntervalsChange", () => { - const currTime = process.hrtime() - - syncRemTime = - (currTime[0] - prevTime[0]) * 1e9 + (currTime[1] - prevTime[1]) - syncEvents += 1 - - clearTimeout(timeout) - timeout = setTimeout(() => resolve(), 1000) + // Check the synchronisation + expect(syncPaths).toStrictEqual({ [dotID]: dotDraw }) + expect(syncIntervals).toStrictEqual({ + [dotID]: { [dotErase[0][0]]: dotErase[0][1] }, }) - for (const syncPacket of syncPackets) { - getEventListener( - "sync", - "messageReceived", - )(createMessageReceivedEvent(syncPacket)) - } + resolve() }), ) .then(() => { @@ -2318,37 +1304,9 @@ describe("drawing app mesh", () => { syncRoom.disconnect() syncRoom = null }) - .then(() => { - addPackets = loadBSON("dot-seq-add.json").length - erasePackets = loadBSON("dot-seq-erase.json").length - syncPackets = loadBSON("dot-seq-sync.json").length - - printBenchmark("path draw and erase [parallel]", ITERATIONS, { - addPath: { - timeLoc: addLocTime, - packets: addPackets, - size: addSize, - timeRem: addRemTime, - events: addEvents, - }, - extendErasureIntervals: { - timeLoc: eraseLocTime, - packets: erasePackets, - size: eraseSize, - timeRem: eraseRemTime, - events: eraseEvents, - }, - synchronisation: { - timeLoc: syncLocTime, - packets: syncPackets, - size: syncSize, - timeRem: syncRemTime, - events: syncEvents, - }, - }) - }) }) + // TODO: apply refactoring if possible it("communicates a path draw and erase update", () => { let pathID @@ -2401,7 +1359,7 @@ describe("drawing app mesh", () => { .then(() => { broadcastListener.callback = null - dumpBSON("dot-seq-add.json", addPackets) + dumpBSON(".dot-seq-add.json", addPackets) addPackets = null }) .then( @@ -2430,7 +1388,7 @@ describe("drawing app mesh", () => { .then(() => { broadcastListener.callback = null - dumpBSON("dot-seq-erase.json", erasePackets) + dumpBSON(".dot-seq-erase.json", erasePackets) erasePackets = null }) .then( @@ -2471,7 +1429,7 @@ describe("drawing app mesh", () => { .then(() => { sendListener.callback = null - dumpBSON("dot-seq-sync.json", syncPackets) + dumpBSON(".dot-seq-sync.json", syncPackets) syncPackets = null room.disconnect() @@ -2487,7 +1445,7 @@ describe("drawing app mesh", () => { "messageReceived", )(createMessageReceivedEvent(handshake, "tw-ml")) - addPackets = loadBSON("dot-seq-add.json") + addPackets = loadBSON(".dot-seq-add.json") return resolve() }), @@ -2536,7 +1494,7 @@ describe("drawing app mesh", () => { ) .then(() => { addPackets = null - erasePackets = loadBSON("dot-seq-erase.json") + erasePackets = loadBSON(".dot-seq-erase.json") }) .then( () => @@ -2592,7 +1550,7 @@ describe("drawing app mesh", () => { "messageReceived", )(createMessageReceivedEvent(handshake, "tw-ml")) - syncPackets = loadBSON("dot-seq-sync.json") + syncPackets = loadBSON(".dot-seq-sync.json") return resolve() }), @@ -2650,5 +1608,5 @@ describe("drawing app mesh", () => { syncRoom.disconnect() syncRoom = null }) - })*/ + }) })