Newer
Older
"use strict"
import MessagePack from "what-the-pack"
import pako from "pako"
import uuidv4 from "uuid/v4"
import FastBitSet from "fastbitset"
Moritz Langenstein
committed
const MESSAGE_BUFFER_SIZE = 2 ** 25 // 32MB
const MESSAGE_SLICE_SIZE = 2 ** 10 // 1KB
const { encode, decode } = MessagePack.initialize(MESSAGE_BUFFER_SIZE)
const queue = []
onmessage = (event) => {
if (!event || !event.data) {
return
}
if (event.data.method == "send" || event.data.method == "broadcast") {
let message = event.data.message
const compressed =
event.data.compressed != null ? event.data.compressed : true
const uuid = uuidv4()
//console.log("send in", JSON.stringify(message))
message = encode(message)
if (compressed) {
message = pako.deflate(message)
}
const sender = (slice) => {
let offset = slice * MESSAGE_SLICE_SIZE
event.data.message = encode({
uuid,
message: message.subarray(offset, offset + MESSAGE_SLICE_SIZE),
length: message.length,
compressed,
//console.log(JSON.stringify([...event.data.message]), event.data.message.length, "send out")
self.postMessage(event.data)
offset += MESSAGE_SLICE_SIZE
if (offset < message.length) {
setTimeout(() => sender(slice + 1), 5)
} else {
queue.shift()
if (queue.length > 0) queue[0](0)
}
}
queue.push(sender)
if (queue.length == 1) queue[0](0)
} else if (event.data.method == "received") {
const packet = decode(MessagePack.Buffer.from(event.data.message))
let message = packet.message
//console.log("receive in", JSON.stringify(packet))
if (packet.length > MESSAGE_SLICE_SIZE) {
let messages = buffer[event.data.uid]
if (!messages) {
buffer[event.data.uid] = messages
}
let slices = messages[packet.uuid]
if (!slices) {
slices = {
message: new Uint8Array(packet.length),
received: new FastBitSet(),
length: 0,
}
messages[packet.uuid] = slices
}
// Packets may arrive out-of-order and multiple times
if (slices.received.checkedAdd(packet.slice) === 1) {
slices.length += packet.message.length
slices.message.set(packet.message, packet.slice * MESSAGE_SLICE_SIZE)
}
if (slices.length < slices.message.length) {
delete packet.uuid
delete packet.message
return
}
message = slices.message
delete messages[packet.uuid]
}
if (packet.compressed) {
message = pako.inflate(Uint8Array.from(message))
}
message = decode(MessagePack.Buffer.from(message))
event.data.message = message
//console.log("receive out", JSON.stringify(event.data))
self.postMessage(event.data)
}
}