Skip to content
Snippets Groups Projects
Commit daac741b authored by Moritz Langenstein's avatar Moritz Langenstein
Browse files

(ml5717) Added compressed parameter to p2pmesh send/broadcast methods

parent 3dcabb84
No related branches found
No related tags found
1 merge request!71Rust WASM CRDT implementation
Pipeline #108926 canceled
...@@ -57,6 +57,7 @@ export default class P2PMesh { ...@@ -57,6 +57,7 @@ export default class P2PMesh {
uid, uid,
channel: "tw-ml", channel: "tw-ml",
message: "ml", message: "ml",
compressed: false,
}) })
} else if (message == "ml") { } else if (message == "ml") {
// Handshake completed // Handshake completed
...@@ -111,6 +112,7 @@ export default class P2PMesh { ...@@ -111,6 +112,7 @@ export default class P2PMesh {
uid, uid,
channel: "tw-ml", channel: "tw-ml",
message: "tw", message: "tw",
compressed: false,
}) })
setTimeout(handshake.bind(this, peer), this.options.handshake.interval) setTimeout(handshake.bind(this, peer), this.options.handshake.interval)
...@@ -267,11 +269,22 @@ export default class P2PMesh { ...@@ -267,11 +269,22 @@ export default class P2PMesh {
this.crdt = null this.crdt = null
} }
send(uid, message) { send(uid, message, compressed = true) {
this.queue.postMessage({ method: "send", uid, channel: "crdt", message }) this.queue.postMessage({
method: "send",
uid,
channel: "crdt",
message,
compressed,
})
} }
broadcast(message) { broadcast(message, compressed = true) {
this.queue.postMessage({ method: "broadcast", channel: "crdt", message }) this.queue.postMessage({
method: "broadcast",
channel: "crdt",
message,
compressed,
})
} }
} }
...@@ -19,11 +19,8 @@ onmessage = (event) => { ...@@ -19,11 +19,8 @@ onmessage = (event) => {
if (event.data.method == "send" || event.data.method == "broadcast") { if (event.data.method == "send" || event.data.method == "broadcast") {
let message = event.data.message let message = event.data.message
const compressed = !( const compressed =
message == undefined || event.data.compressed != null ? event.data.compressed : true
message instanceof String ||
typeof message == "string"
)
const uuid = uuidv4() const uuid = uuidv4()
......
...@@ -45,14 +45,18 @@ export default class WasmCRDTWrapper { ...@@ -45,14 +45,18 @@ export default class WasmCRDTWrapper {
}), }),
) )
}, },
on_deltas: (deltas) => this.mesh.broadcast(deltas), on_deltas: (deltas) => this.mesh.broadcast(deltas, true),
on_deltas_from_state: (uid, deltas) => { on_deltas_from_state: (uid, deltas) => {
this.mesh.send(uid, { this.mesh.send(
type: "sync step 2", uid,
message: deltas, {
}) type: "sync step 2",
message: deltas,
},
true,
)
this.mesh.send(uid, { type: "sync done" }) this.mesh.send(uid, { type: "sync done" }, false)
this.room.dispatchEvent( this.room.dispatchEvent(
new CustomEvent("weSyncedWithPeer", { detail: uid }), new CustomEvent("weSyncedWithPeer", { detail: uid }),
...@@ -184,10 +188,10 @@ export default class WasmCRDTWrapper { ...@@ -184,10 +188,10 @@ export default class WasmCRDTWrapper {
if (this.users.check) { if (this.users.check) {
this.users.synced.forEach((user) => this.users.synced.forEach((user) =>
this.mesh.send(user, { type: "sync check" }), this.mesh.send(user, { type: "sync check" }, false),
) )
this.users.waiting.forEach((user) => this.users.waiting.forEach((user) =>
this.mesh.send(user, { type: "sync check" }), this.mesh.send(user, { type: "sync check" }, false),
) )
} }
...@@ -224,10 +228,14 @@ export default class WasmCRDTWrapper { ...@@ -224,10 +228,14 @@ export default class WasmCRDTWrapper {
this.users.syncing = this.users.waiting.shift() this.users.syncing = this.users.waiting.shift()
this.users.check = false this.users.check = false
this.mesh.send(this.users.syncing, { this.mesh.send(
type: "sync step 1", this.users.syncing,
message: this.crdt.get_state_vector(), {
}) type: "sync step 1",
message: this.crdt.get_state_vector(),
},
false,
)
this.room.dispatchEvent( this.room.dispatchEvent(
new CustomEvent("waitingForSyncStep", { detail: this.users.syncing }), new CustomEvent("waitingForSyncStep", { detail: this.users.syncing }),
......
...@@ -225,21 +225,29 @@ export default class YjsCRDTWrapper extends Y.AbstractConnector { ...@@ -225,21 +225,29 @@ export default class YjsCRDTWrapper extends Y.AbstractConnector {
} }
send(uid, message) { send(uid, message) {
let compressed = true
if (message.type === "sync step 1") { if (message.type === "sync step 1") {
compressed = false
this.room.dispatchEvent( this.room.dispatchEvent(
new CustomEvent("waitingForSyncStep", { detail: uid }), new CustomEvent("waitingForSyncStep", { detail: uid }),
) )
} else if (message.type === "sync done") { } else if (message.type === "sync done") {
compressed = false
this.room.dispatchEvent( this.room.dispatchEvent(
new CustomEvent("weSyncedWithPeer", { detail: uid }), new CustomEvent("weSyncedWithPeer", { detail: uid }),
) )
} else if (message.type === "sync check") {
compressed = false
} }
this.mesh.send(uid, message) this.mesh.send(uid, message, compressed)
} }
broadcast(message) { broadcast(message) {
this.mesh.broadcast(message) this.mesh.broadcast(message, true)
} }
isDisconnected() { isDisconnected() {
...@@ -252,106 +260,94 @@ Y.extend("y-crdt", YjsCRDTWrapper) ...@@ -252,106 +260,94 @@ Y.extend("y-crdt", YjsCRDTWrapper)
export const syncStep1 = { export const syncStep1 = {
uuid: "6e20b20d-e1d8-405d-8a61-d56cb1c47a24", uuid: "6e20b20d-e1d8-405d-8a61-d56cb1c47a24",
message: Uint8Array.of( message: Uint8Array.of(
120, 133,
156, 164,
107, 116,
93, 121,
82,
82,
89,
144,
186,
186,
184,
50,
47,
89,
161,
184,
36,
181,
64,
193,
112, 112,
69, 101,
113, 171,
73, 115,
98, 121,
73, 110,
106, 99,
32,
115,
116,
101,
112, 112,
106, 32,
73, 49,
195, 168,
202,
148,
212,
156,
84,
8,
115, 115,
125, 116,
65, 97,
81, 116,
126, 101,
73, 83,
126, 101,
116,
128,
169,
100,
101,
108,
101,
116,
101,
83,
101,
116,
128,
175,
112,
114,
111,
116,
111,
99,
111,
108,
86,
101,
114, 114,
126, 115,
78, 105,
88, 111,
106, 110,
81, 11,
113, 164,
102, 97,
126, 117,
30, 116,
247, 104,
146, 192,
196,
210,
146,
140,
3,
0,
80,
113,
26,
230,
), ),
slice: 0, slice: 0,
length: 1, length: 1,
compressed: true, compressed: false,
} }
export const syncDone = { export const syncDone = {
message: Uint8Array.of( message: Uint8Array.of(
120, 129,
156, 164,
107, 116,
92, 121,
82, 112,
82, 101,
89, 169,
144, 115,
186, 121,
178, 110,
184, 99,
50, 32,
47, 100,
89, 111,
33, 110,
37, 101,
63,
47,
21,
0,
64,
79,
7,
20,
), ),
slice: 0, slice: 0,
length: 1, length: 1,
compressed: true, compressed: false,
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment