From 90d6d4886c7e19d33be23b9e2d4148ce67dfe828 Mon Sep 17 00:00:00 2001 From: Moritz Langenstein <ml5717@ic.ac.uk> Date: Tue, 24 Dec 2019 00:53:12 +0000 Subject: [PATCH] (ml5717) Added delta serialisation + deserialisation --- .gitmodules | 3 + Cargo.toml | 11 +- build.rs | 9 - schema/crdt.capnp | 12 - src/crdt.rs | 1122 ++++++++++++++++++++++++++++++--------------- src/lib.rs | 173 ++++--- vec-map | 1 + www/index.js | 41 +- 8 files changed, 874 insertions(+), 498 deletions(-) create mode 100644 .gitmodules delete mode 100644 build.rs delete mode 100644 schema/crdt.capnp create mode 160000 vec-map diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..5241fb2 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "vec-map"] + path = vec-map + url = git@gitlab.doc.ic.ac.uk:ml5717/vec-map.git diff --git a/Cargo.toml b/Cargo.toml index 90b6edb..c0c8f50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,10 +3,6 @@ name = "drawing-crdt" version = "0.1.0" authors = ["Moritz Langenstein <ml5717@ic.ac.uk>"] edition = "2018" -build = "build.rs" - -[build-dependencies] -capnpc = "0.11.1" [lib] crate-type = ["cdylib", "rlib"] @@ -18,10 +14,11 @@ default = ["console_error_panic_hook"] wasm-bindgen = "0.2" js-sys = "0.3.33" uuid = "0.8.1" -capnp = "0.11.1" serde = "1.0.104" serde_derive = "1.0.104" serde-wasm-bindgen = "0.1.3" +vec_map = { path = "vec-map", features = ["serde"] } +bincode = "1.2.1" # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires @@ -40,5 +37,5 @@ wee_alloc = { version = "0.4.2", optional = true } wasm-bindgen-test = "0.2" [profile.release] -# Tell `rustc` to optimize for small code size. -opt-level = "s" +opt-level = 3 +lto = true diff --git a/build.rs b/build.rs deleted file mode 100644 index b491c60..0000000 --- a/build.rs +++ /dev/null @@ -1,9 +0,0 @@ -extern crate capnpc; - -fn main() { - capnpc::CompilerCommand::new() - .src_prefix("schema") - .file("schema/crdt.capnp") - .run() - .expect("Compiling Cap'n Proto schema"); -} \ No newline at end of file diff --git a/schema/crdt.capnp b/schema/crdt.capnp deleted file mode 100644 index 5f2842b..0000000 --- a/schema/crdt.capnp +++ /dev/null @@ -1,12 +0,0 @@ -@0x8e28fa32ad5a1cb1; # unique file ID, generated by `capnp id` - -struct Point { - x @0 :Int32; - y @1 :Int32; - width @2 :Float32; - colour @3 :UInt32; -} - -struct Stroke { - points @0 :List(Point); -} diff --git a/src/crdt.rs b/src/crdt.rs index 636801c..2cc302d 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1,462 +1,834 @@ -use std::ops::{Deref, DerefMut}; +use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor}; +use serde::ser::{Serialize, SerializeSeq, SerializeStruct, SerializeTuple, Serializer}; +use std::cmp; use std::collections::HashMap; -use serde::ser::{Serialize, Serializer, SerializeStruct}; +use std::fmt; +use std::ops::{Deref, DerefMut}; use uuid::Uuid; +use vec_map::VecMap; struct Dirty<T> { - value: T, - dirty: bool + value: T, + dirty: bool, } impl<T> Deref for Dirty<T> { - type Target = T; + type Target = T; - fn deref(&self) -> &Self::Target { - &self.value - } + fn deref(&self) -> &Self::Target { + &self.value + } } impl<T> DerefMut for Dirty<T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.value - } + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.value + } } impl<T> Dirty<T> { - pub fn new(value: T, dirty: bool) -> Dirty<T> { - Dirty { - value, - dirty, - } - } - - pub fn is_dirty(&self) -> bool { - self.dirty - } - - pub fn clean(&mut self) { - self.dirty = false - } - - pub fn dirty(&mut self) { - self.dirty = true - } + pub fn new(value: T, dirty: bool) -> Dirty<T> { + Dirty { value, dirty } + } + + pub fn is_dirty(&self) -> bool { + self.dirty + } + + pub fn clean(&mut self) { + self.dirty = false + } + + pub fn dirty(&mut self) { + self.dirty = true + } } #[derive(Clone)] pub struct Point { - x: i32, - y: i32, - weight: f32, - colour: u32, + x: i32, + y: i32, + weight: f32, + colour: u32, } impl Serialize for Point { - fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> - where - S: Serializer, - { - // There are four fields inside the struct - let mut state = serializer.serialize_struct("Point", 4)?; - - state.serialize_field("x", &self.x)?; - state.serialize_field("y", &self.y)?; - state.serialize_field("weight", &self.weight)?; - state.serialize_field("colour", &format!("{:06X}", self.colour))?; - - state.end() - } -} + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + if serializer.is_human_readable() { + // There are four fields inside the struct + let mut state = serializer.serialize_struct("Point", 4)?; -impl Point { - fn try_new(x: i32, y: i32, weight: f32, colour: &str) -> Option<Point> { - match u32::from_str_radix(colour, 16) { - Ok(colour) => Some(Point { x, y, weight, colour }), - Err(_) => None, + state.serialize_field("x", &self.x)?; + state.serialize_field("y", &self.y)?; + state.serialize_field("weight", &self.weight)?; + state.serialize_field("colour", &format!("{:06X}", self.colour))?; + + state.end() + } else { + let mut tuple = serializer.serialize_tuple(4)?; + + tuple.serialize_element(&self.x)?; + tuple.serialize_element(&self.y)?; + tuple.serialize_element(&self.weight)?; + tuple.serialize_element(&self.colour)?; + + tuple.end() + } } - } } -#[derive(Copy, Clone, PartialEq, Serialize)] -struct Interval { - from: f32, - to: f32, +impl<'de> Deserialize<'de> for Point { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + struct PointVisitor; + + impl<'de> Visitor<'de> for PointVisitor { + type Value = Point; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct Point") + } + + fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error> + where + V: SeqAccess<'de>, + { + let x = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(0, &self))?; + let y = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(1, &self))?; + let weight = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(2, &self))?; + let colour = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(3, &self))?; + + Ok(Point::new(x, y, weight, colour)) + } + } + + deserializer.deserialize_tuple(4, PointVisitor) + } } -impl Interval { - fn new(from: f32, to: f32) -> Interval { - Interval{ - from, - to, +impl Point { + fn new(x: i32, y: i32, weight: f32, colour: u32) -> Point { + Point { + x, + y, + weight, + colour, + } + } + + fn try_new(x: i32, y: i32, weight: f32, colour: &str) -> Option<Point> { + match u32::from_str_radix(colour, 16) { + Ok(colour) => Some(Point { + x, + y, + weight, + colour, + }), + Err(_) => None, + } } - } } -#[derive(Serialize)] -pub struct IntervalUnion(Vec<Interval>); +pub trait IntervalBound { + fn max(a: Self, b: Self) -> Self; + fn up(self) -> Self; + fn down(self) -> Self; +} -impl IntervalUnion { - fn new() -> IntervalUnion { - IntervalUnion(Vec::new()) - } - - fn union(&mut self, other: &IntervalUnion) -> bool { - if other.0.is_empty() { - return false - }; - - let mut si = 0; - let mut oi = 0; +impl IntervalBound for f32 { + fn max(a: f32, b: f32) -> f32 { + f32::max(a, b) + } - let mut intervals: Vec<Interval> = Vec::new(); + fn up(self) -> f32 { + self + } - while si < self.0.len() && oi < other.0.len() { - let interval = if self.0[si].from < other.0[oi].from { - let interval = self.0[si]; - si += 1; - interval - } else { - let interval = other.0[oi]; - oi += 1; - interval - }; - - match intervals.last_mut() { - Some(top) => { - if interval.from <= top.to { - top.to = f32::max(top.to, interval.to) - } else { - intervals.push(interval) - } - }, - None => intervals.push(interval), - }; + fn down(self) -> f32 { + self + } +} + +impl IntervalBound for usize { + fn max(a: usize, b: usize) -> usize { + cmp::max(a, b) } - let (interval, tail) = if si < self.0.len() { - (self.0[si], &self.0[si+1..]) - } else { - (other.0[oi], &other.0[oi+1..]) - }; + fn up(self) -> usize { + self + 1 + } - match intervals.last_mut() { - Some(top) => { - if interval.from <= top.to { - top.to = f32::max(top.to, interval.to) + fn down(self) -> usize { + self - 1 + } +} + +#[derive(Copy, Clone, PartialEq)] +struct Interval<T> +where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, +{ + from: T, + to: T, +} + +impl<T> Interval<T> +where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, +{ + fn new(from: T, to: T) -> Interval<T> { + Interval { from, to } + } +} + +pub struct IntervalUnion<T>(Vec<Interval<T>>) +where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound; + +impl<T> Serialize for IntervalUnion<T> +where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound + Serialize, +{ + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.0.len() * 2))?; + + for interval in &self.0 { + seq.serialize_element(&interval.from)?; + seq.serialize_element(&interval.to)?; + } + + seq.end() + } +} + +impl<'de, T> Deserialize<'de> for IntervalUnion<T> +where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound + Deserialize<'de>, +{ + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + use std::marker::PhantomData; + + struct IntervalUnionVisitor<T> + where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, + { + marker: PhantomData<fn() -> IntervalUnion<T>>, + }; + + impl<'de, T> Visitor<'de> for IntervalUnionVisitor<T> + where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound + Deserialize<'de>, + { + type Value = IntervalUnion<T>; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct IntervalUnion") + } + + fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error> + where + V: SeqAccess<'de>, + { + let mut intervals = + Vec::with_capacity(cmp::max(seq.size_hint().unwrap_or(0) / 2, 4)); + + while let (Some(from), Some(to)) = (seq.next_element()?, seq.next_element()?) { + intervals.push(Interval::new(from, to)); + } + + Ok(IntervalUnion(intervals)) + } + } + + let visitor = IntervalUnionVisitor { + marker: PhantomData, + }; + + deserializer.deserialize_seq(visitor) + } +} + +impl<T> IntervalUnion<T> +where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, +{ + fn new() -> IntervalUnion<T> { + IntervalUnion(Vec::new()) + } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } + + fn union(&mut self, other: &IntervalUnion<T>) -> bool { + if other.is_empty() { + return false; + }; + + let mut si = 0; + let mut oi = 0; + + let mut intervals: Vec<Interval<T>> = Vec::new(); + + while si < self.0.len() && oi < other.0.len() { + let interval = if self.0[si].from < other.0[oi].from { + let interval = self.0[si]; + si += 1; + interval + } else { + let interval = other.0[oi]; + oi += 1; + interval + }; + + match intervals.last_mut() { + Some(top) => { + if interval.from <= top.to { + top.to = T::max(top.to, interval.to) + } else { + intervals.push(interval) + } + } + None => intervals.push(interval), + }; + } + + let (interval, tail) = if si < self.0.len() { + (self.0[si], &self.0[si + 1..]) } else { - intervals.push(interval) + (other.0[oi], &other.0[oi + 1..]) + }; + + match intervals.last_mut() { + Some(top) => { + if interval.from <= top.to { + top.to = T::max(top.to, interval.to) + } else { + intervals.push(interval) + } + } + None => intervals.push(interval), + }; + + intervals.extend_from_slice(tail); + + let changed = intervals != self.0; + + self.0 = intervals; + + changed + } + + fn difference(&mut self, other: &IntervalUnion<T>) { + let mut si = 0; + let mut oi = 0; + + let mut intervals: Vec<Interval<T>> = Vec::new(); + + while si < self.0.len() && oi < other.0.len() { + if other.0[oi].to < self.0[si].from { + oi += 1; + continue; + } + + if other.0[oi].from <= self.0[si].from { + if other.0[oi].to >= self.0[si].to { + si += 1; + } else { + self.0[si].from = other.0[oi].to.up(); + oi += 1; + } + + continue; + } + + if other.0[oi].from <= self.0[si].to { + intervals.push(Interval::new(self.0[si].from, other.0[oi].from.down())); + self.0[si].from = other.0[oi].from; + } else { + intervals.push(self.0[si]); + si += 1; + } } - }, - None => intervals.push(interval), - }; - - intervals.extend_from_slice(tail); - - let changed = intervals != self.0; - - self.0 = intervals; - - changed - } + + if si < self.0.len() { + intervals.extend_from_slice(&self.0[si..]) + }; + + self.0 = intervals; + } } -impl From<Interval> for IntervalUnion { - fn from(interval: Interval) -> Self { +impl<T> From<Interval<T>> for IntervalUnion<T> +where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, +{ + fn from(interval: Interval<T>) -> Self { IntervalUnion(vec![interval]) } } struct Stroke { - points: Dirty<Vec<Option<Point>>>, - intervals: Dirty<IntervalUnion>, + points: Dirty<VecMap<Point>>, + intervals: Dirty<IntervalUnion<f32>>, } impl Stroke { - fn new() -> Stroke { - Stroke { - points: Dirty::new(Vec::new(), true), - intervals: Dirty::new(IntervalUnion::new(), true), + fn new() -> Stroke { + Stroke { + points: Dirty::new(VecMap::new(), true), + intervals: Dirty::new(IntervalUnion::new(), true), + } } - } } -#[derive(Serialize)] pub struct StrokeDelta { - points: Vec<(usize, Point)>, - intervals: IntervalUnion, + points: VecMap<Point>, + intervals: IntervalUnion<f32>, +} + +impl Serialize for StrokeDelta { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut tuple = serializer.serialize_tuple(2)?; + + tuple.serialize_element(&self.points)?; + tuple.serialize_element(&self.intervals)?; + + tuple.end() + } +} + +impl<'de> Deserialize<'de> for StrokeDelta { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + struct StrokeDeltaVisitor; + + impl<'de> Visitor<'de> for StrokeDeltaVisitor { + type Value = StrokeDelta; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct StrokeDelta") + } + + fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error> + where + V: SeqAccess<'de>, + { + let points = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(0, &self))?; + let intervals = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(1, &self))?; + + Ok(StrokeDelta { points, intervals }) + } + } + + deserializer.deserialize_tuple(2, StrokeDeltaVisitor) + } } impl StrokeDelta { - fn new() -> StrokeDelta { - StrokeDelta { - points: Vec::new(), - intervals: IntervalUnion::new(), + fn new() -> StrokeDelta { + StrokeDelta { + points: VecMap::new(), + intervals: IntervalUnion::new(), + } } - } } struct User { - strokes: Vec<Stroke>, + strokes: VecMap<Stroke>, } impl User { - fn new() -> User { - User { - strokes: Vec::new(), + fn new() -> User { + User { + strokes: VecMap::new(), + } } - } } #[derive(Hash, Eq, PartialEq)] pub struct StrokeID(u128, usize); impl Serialize for StrokeID { - fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> - where - S: Serializer, - { - serializer.serialize_str(&(CRDT::uuid_to_string(self.0) + "-" + &self.1.to_string())) - } + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + if serializer.is_human_readable() { + serializer.serialize_str(&(CRDT::uuid_to_string(self.0) + "-" + &self.1.to_string())) + } else { + let mut tuple = serializer.serialize_tuple(2)?; + + tuple.serialize_element(&self.0)?; + tuple.serialize_element(&self.1)?; + + tuple.end() + } + } +} + +impl<'de> Deserialize<'de> for StrokeID { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + struct StrokeIDVisitor; + + impl<'de> Visitor<'de> for StrokeIDVisitor { + type Value = StrokeID; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct StrokeID") + } + + fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error> + where + V: SeqAccess<'de>, + { + let user = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(0, &self))?; + let stroke_idx = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(1, &self))?; + + Ok(StrokeID(user, stroke_idx)) + } + } + + deserializer.deserialize_tuple(2, StrokeIDVisitor) + } } impl StrokeID { - fn new(user: u128, stroke_idx: usize) -> StrokeID { - StrokeID(user, stroke_idx) - } + fn new(user: u128, stroke_idx: usize) -> StrokeID { + StrokeID(user, stroke_idx) + } +} + +pub struct StateVec { + strokes: HashMap<u128, IntervalUnion<usize>>, + //intervals: HashMap<>, } pub trait EventListener { - fn on_stroke(&self, stroke: String, points: &Vec<Option<Point>>); - fn on_interval(&self, stroke: String, intervals: &IntervalUnion); - - fn on_broadcast(&self, deltas: &HashMap<StrokeID, StrokeDelta>); + fn on_stroke(&self, stroke: String, points: &VecMap<Point>); + fn on_interval(&self, stroke: String, intervals: &IntervalUnion<f32>); + + fn on_deltas(&self, deltas: &HashMap<StrokeID, StrokeDelta>); } pub struct CRDT { - user: Option<u128>, - - crdt: Dirty<HashMap<u128, Dirty<User>>>, - deltas: HashMap<StrokeID, StrokeDelta>, - - event_listener: Box<dyn EventListener>, + user: Option<u128>, + + crdt: Dirty<HashMap<u128, Dirty<User>>>, + deltas: HashMap<StrokeID, StrokeDelta>, + + event_listener: Box<dyn EventListener>, } impl CRDT { - pub fn new(event_listener: Box<dyn EventListener>) -> CRDT { - CRDT { - user: None, - - crdt: Dirty::new(HashMap::new(), false), - deltas: HashMap::new(), - - event_listener, - } - } - - pub fn set_user(&mut self, user: &str) -> bool { - self.user = CRDT::string_to_uuid(user); - self.user.is_some() - } - - fn string_to_uuid(uuid: &str) -> Option<u128> { - match Uuid::parse_str(uuid) { - Ok(uuid) => Some(uuid.as_u128()), - Err(_) => None, - } - } - - fn uuid_to_string(uuid: u128) -> String { - Uuid::from_u128(uuid) - .to_hyphenated() - .encode_lower(&mut Uuid::encode_buffer()) - .to_string() - } - - pub fn get_user(&self) -> Option<String> { - self.user.map(CRDT::uuid_to_string) - } - - pub fn add_stroke(&mut self, x: i32, y: i32, weight: f32, colour: &str) -> Option<String> { - let user = match self.user { - Some(user) => user, - None => return None, - }; - - let point = match Point::try_new(x, y, weight, colour) { - Some(point) => point, - None => return None, - }; - - let entry = self.crdt.entry(user).or_insert_with(|| Dirty::new(User::new(), true)); - let mut stroke = Stroke::new(); - stroke.points.push(Some(point.clone())); - entry.strokes.push(stroke); - - let stroke_idx = entry.strokes.len() - 1; - - // Implicit call to stroke.points.dirty() in Stroke::new() - entry.dirty(); - self.crdt.dirty(); - - let delta = self.deltas.entry(StrokeID::new(user, stroke_idx)).or_insert_with(StrokeDelta::new); - delta.points.push((0, point)); - - Some(CRDT::uuid_to_string(user) + "-" + &stroke_idx.to_string()) - } - - fn get_user_mut(&mut self, stroke_id: &str) -> Option<(&mut Dirty<User>, u128, usize)> { - let (author, stroke) = match stroke_id.rfind('-') { - Some(split) => stroke_id.split_at(split), - None => return None, - }; - - let author = match CRDT::string_to_uuid(author) { - Some(author) => author, - None => return None - }; - - let entry = match self.crdt.get_mut(&author) { - Some(entry) => entry, - None => return None, - }; - - match stroke[1..].parse::<usize>() { - Ok(stroke_idx) => Some((entry, author, stroke_idx)), - _ => None, - } - } - - fn get_stroke(&self, stroke_id: &str) -> Option<&Stroke> { - let (author, stroke) = match stroke_id.rfind('-') { - Some(split) => stroke_id.split_at(split), - None => return None, - }; - - let entry = match CRDT::string_to_uuid(author).as_ref().and_then(move |author| self.crdt.get(author)) { - Some(entry) => entry, - None => return None, - }; - - match stroke[1..].parse::<usize>() { - Ok(stroke) if stroke < entry.strokes.len() => Some(&entry.strokes[stroke]), - _ => None, - } - } - - pub fn add_point(&mut self, stroke_id: &str, x: i32, y: i32, weight: f32, colour: &str) -> bool { - let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) { - Some(stroke) => stroke, - None => return false, - }; - - let stroke = match entry.strokes.get_mut(stroke_idx) { - Some(stroke) => stroke, - None => return false, - }; - - let point = match Point::try_new(x, y, weight, colour) { - Some(point) => point, - None => return false, - }; - - stroke.points.push(Some(point.clone())); - let point_idx = stroke.points.len() - 1; - - stroke.points.dirty(); - entry.dirty(); - self.crdt.dirty(); - - let delta = self.deltas.entry(StrokeID::new(user, stroke_idx)).or_insert_with(StrokeDelta::new); - delta.points.push((point_idx, point)); - - true - } - - pub fn erase_stroke(&mut self, stroke_id: &str, from: f32, to: f32) -> bool { - if from < 0.0 || to < from { - return false - }; - - let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) { - Some(stroke) => stroke, - None => return false, - }; - - let stroke = match entry.strokes.get_mut(stroke_idx) { - Some(stroke) => stroke, - None => return false, - }; - - let unit_interval = Interval::new(from, to).into(); - - if stroke.intervals.union(&unit_interval) { - stroke.intervals.dirty(); - entry.dirty(); - self.crdt.dirty(); + pub fn new(event_listener: Box<dyn EventListener>) -> CRDT { + CRDT { + user: None, + + crdt: Dirty::new(HashMap::new(), false), + deltas: HashMap::new(), - let delta = self.deltas.entry(StrokeID::new(user, stroke_idx)).or_insert_with(StrokeDelta::new); - delta.intervals.union(&unit_interval); + event_listener, + } } - - true - } - - pub fn get_stroke_points(&self, stroke: &str) -> Option<&Vec<Option<Point>>> { - self.get_stroke(stroke).map(|stroke| stroke.points.deref()) - } - - pub fn get_stroke_intervals(&self, stroke: &str) -> Option<&IntervalUnion> { - self.get_stroke(stroke).map(|stroke| stroke.intervals.deref()) - } - - pub fn fetch_events(&mut self) { - if !self.crdt.is_dirty() { - return - }; - - self.crdt.clean(); - - let mut stroke_events = Vec::new(); - let mut interval_events = Vec::new(); - - self.crdt.iter_mut().for_each(|(user, entry)| { - if !entry.is_dirty() { - return - }; - - entry.clean(); - - let user = CRDT::uuid_to_string(*user) + "-"; - - entry.strokes.iter_mut().enumerate().for_each(|(i, stroke)| { - if stroke.points.is_dirty() { - stroke.points.clean(); - stroke_events.push((user.clone() + &i.to_string(), stroke.points.deref())); + + pub fn set_user(&mut self, user: &str) -> bool { + self.user = CRDT::string_to_uuid(user); + self.user.is_some() + } + + fn string_to_uuid(uuid: &str) -> Option<u128> { + match Uuid::parse_str(uuid) { + Ok(uuid) => Some(uuid.as_u128()), + Err(_) => None, } - - if stroke.intervals.is_dirty() { - stroke.intervals.clean(); - interval_events.push((user.clone() + &i.to_string(), stroke.intervals.deref())); + } + + fn uuid_to_string(uuid: u128) -> String { + Uuid::from_u128(uuid) + .to_hyphenated() + .encode_lower(&mut Uuid::encode_buffer()) + .to_string() + } + + pub fn get_user(&self) -> Option<String> { + self.user.map(CRDT::uuid_to_string) + } + + pub fn add_stroke(&mut self, x: i32, y: i32, weight: f32, colour: &str) -> Option<String> { + let user = match self.user { + Some(user) => user, + None => return None, + }; + + let point = match Point::try_new(x, y, weight, colour) { + Some(point) => point, + None => return None, + }; + + let entry = self + .crdt + .entry(user) + .or_insert_with(|| Dirty::new(User::new(), true)); + let mut stroke = Stroke::new(); + stroke.points.push(point.clone()); + entry.strokes.push(stroke); + + let stroke_idx = entry.strokes.len() - 1; + + // Implicit call to stroke.points.dirty() in Stroke::new() + entry.dirty(); + self.crdt.dirty(); + + let delta = self + .deltas + .entry(StrokeID::new(user, stroke_idx)) + .or_insert_with(StrokeDelta::new); + delta.points.insert(0, point); + + Some(CRDT::uuid_to_string(user) + "-" + &stroke_idx.to_string()) + } + + fn get_user_mut(&mut self, stroke_id: &str) -> Option<(&mut Dirty<User>, u128, usize)> { + let (author, stroke) = match stroke_id.rfind('-') { + Some(split) => stroke_id.split_at(split), + None => return None, + }; + + let author = match CRDT::string_to_uuid(author) { + Some(author) => author, + None => return None, + }; + + let entry = match self.crdt.get_mut(&author) { + Some(entry) => entry, + None => return None, + }; + + match stroke[1..].parse::<usize>() { + Ok(stroke_idx) => Some((entry, author, stroke_idx)), + _ => None, } - }); - }); - - for (user, points) in stroke_events { - self.event_listener.on_stroke(user, points); } - - for (user, intervals) in interval_events { - self.event_listener.on_interval(user, intervals); - } - } - - pub fn fetch_broadcasts(&mut self) { - if self.deltas.is_empty() { - return - }; - - self.event_listener.on_broadcast(&self.deltas); - - self.deltas.clear(); - } + + fn get_stroke(&self, stroke_id: &str) -> Option<&Stroke> { + let (author, stroke) = match stroke_id.rfind('-') { + Some(split) => stroke_id.split_at(split), + None => return None, + }; + + let entry = match CRDT::string_to_uuid(author) + .as_ref() + .and_then(move |author| self.crdt.get(author)) + { + Some(entry) => entry, + None => return None, + }; + + match stroke[1..].parse::<usize>() { + Ok(stroke) => entry.strokes.get(stroke), + _ => None, + } + } + + pub fn add_point( + &mut self, + stroke_id: &str, + x: i32, + y: i32, + weight: f32, + colour: &str, + ) -> bool { + let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) { + Some(stroke) => stroke, + None => return false, + }; + + let stroke = match entry.strokes.get_mut(stroke_idx) { + Some(stroke) => stroke, + _ => return false, + }; + + let point = match Point::try_new(x, y, weight, colour) { + Some(point) => point, + None => return false, + }; + + let point_idx = stroke.points.push(point.clone()) - 1; + + stroke.points.dirty(); + entry.dirty(); + self.crdt.dirty(); + + let delta = self + .deltas + .entry(StrokeID::new(user, stroke_idx)) + .or_insert_with(StrokeDelta::new); + delta.points.insert(point_idx, point); + + true + } + + pub fn erase_stroke(&mut self, stroke_id: &str, from: f32, to: f32) -> bool { + if from < 0.0 || to < from { + return false; + }; + + let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) { + Some(stroke) => stroke, + None => return false, + }; + + let stroke = match entry.strokes.get_mut(stroke_idx) { + Some(stroke) => stroke, + _ => return false, + }; + + let unit_interval = Interval::new(from, to).into(); + + if stroke.intervals.union(&unit_interval) { + stroke.intervals.dirty(); + entry.dirty(); + self.crdt.dirty(); + + let delta = self + .deltas + .entry(StrokeID::new(user, stroke_idx)) + .or_insert_with(StrokeDelta::new); + delta.intervals.union(&unit_interval); + } + + true + } + + pub fn get_stroke_points(&self, stroke: &str) -> Option<&VecMap<Point>> { + self.get_stroke(stroke).map(|stroke| stroke.points.deref()) + } + + pub fn get_stroke_intervals(&self, stroke: &str) -> Option<&IntervalUnion<f32>> { + self.get_stroke(stroke) + .map(|stroke| stroke.intervals.deref()) + } + + pub fn fetch_events(&mut self) { + if !self.crdt.is_dirty() { + return; + }; + + self.crdt.clean(); + + let mut stroke_events = Vec::new(); + let mut interval_events = Vec::new(); + + self.crdt.iter_mut().for_each(|(user, entry)| { + if !entry.is_dirty() { + return; + }; + + entry.clean(); + + let user = CRDT::uuid_to_string(*user) + "-"; + + entry.strokes.iter_mut().for_each(|(i, stroke)| { + if stroke.points.is_dirty() { + stroke.points.clean(); + stroke_events.push((user.clone() + &i.to_string(), stroke.points.deref())); + } + + if stroke.intervals.is_dirty() { + stroke.intervals.clean(); + interval_events.push((user.clone() + &i.to_string(), stroke.intervals.deref())); + } + }); + }); + + for (user, points) in stroke_events { + self.event_listener.on_stroke(user, points); + } + + for (user, intervals) in interval_events { + self.event_listener.on_interval(user, intervals); + } + } + + pub fn fetch_deltas(&mut self) { + if self.deltas.is_empty() { + return; + }; + + self.event_listener.on_deltas(&self.deltas); + + self.deltas.clear(); + } + + pub fn apply_deltas(&mut self, deltas: HashMap<StrokeID, StrokeDelta>) { + if deltas.is_empty() { + return; + }; + + self.crdt.dirty(); + + deltas + .into_iter() + .for_each(|(stroke_id, mut stroke_delta)| { + let entry = self + .crdt + .entry(stroke_id.0) + .or_insert_with(|| Dirty::new(User::new(), true)); + + entry.dirty(); + + let stroke = entry.strokes.entry(stroke_id.1).or_insert_with(Stroke::new); + + if !stroke_delta.points.is_empty() { + stroke.points.dirty(); + stroke.points.append(&mut stroke_delta.points); + } + + if stroke.intervals.union(&stroke_delta.intervals) { + stroke.intervals.dirty() + }; + }); + } + + pub fn get_state_vector(&self) {} + + // TODO: add endStroke() method that finishes the named stroke + // TODO: update addStroke() to use a new username (XOR variant + version bits in UUID) if last stroke still unfinished + // TODO: keep track of users with unfinished strokes to use "fake" usernames optimally } diff --git a/src/lib.rs b/src/lib.rs index e2b4f28..56f1f11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,10 @@ -#[macro_use] -extern crate serde_derive; - use wasm_bindgen::prelude::*; -use std::option::Option; use js_sys::Error; use serde_wasm_bindgen::to_value as to_js_value; use std::collections::HashMap; +use std::option::Option; +use vec_map::VecMap; #[cfg(feature = "wee_alloc")] #[global_allocator] @@ -15,43 +13,35 @@ static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT; mod utils; mod crdt; -use crdt::{CRDT, EventListener, Point, IntervalUnion, StrokeDelta, StrokeID}; +use crdt::{EventListener, IntervalUnion, Point, StrokeDelta, StrokeID, CRDT}; #[wasm_bindgen] extern "C" { - #[derive(Clone)] - pub type WasmEventListener; - - #[wasm_bindgen(structural, method)] - pub fn on_stroke(this: &WasmEventListener, stroke: String, points: JsValue); - - #[wasm_bindgen(structural, method)] - pub fn on_interval(this: &WasmEventListener, stroke: String, intervals: JsValue); - - #[wasm_bindgen(structural, method)] - pub fn on_broadcast(this: &WasmEventListener, deltas: JsValue); + #[derive(Clone)] + pub type WasmEventListener; + + #[wasm_bindgen(structural, method)] + pub fn on_stroke(this: &WasmEventListener, stroke: String, points: JsValue); + + #[wasm_bindgen(structural, method)] + pub fn on_interval(this: &WasmEventListener, stroke: String, intervals: JsValue); + + #[wasm_bindgen(structural, method)] + pub fn on_deltas(this: &WasmEventListener, deltas: Box<[u8]>); } impl EventListener for WasmEventListener { - fn on_stroke(&self, stroke: String, points: &Vec<Option<Point>>) { - #[allow(unused_must_use)] { // TODO: Error handling - to_js_value(points).map(|points| self.on_stroke(stroke, points)); + fn on_stroke(&self, stroke: String, points: &VecMap<Point>) { + self.on_stroke(stroke, to_js_value(points).unwrap()) + } + + fn on_interval(&self, stroke: String, intervals: &IntervalUnion<f32>) { + self.on_interval(stroke, to_js_value(intervals).unwrap()) } - } - - fn on_interval(&self, stroke: String, intervals: &IntervalUnion) { - #[allow(unused_must_use)] { // TODO: Error handling - to_js_value(intervals).map(|intervals| self.on_interval(stroke, intervals)); + + fn on_deltas(&self, deltas: &HashMap<StrokeID, StrokeDelta>) { + self.on_deltas(bincode::serialize(deltas).unwrap().into_boxed_slice()) } - } - - fn on_broadcast(&self, deltas: &HashMap<StrokeID, StrokeDelta>) { - /*#[allow(unused_must_use)] { // TODO: Error handling - to_js_value(deltas).map(|deltas| self.on_broadcast(deltas)); - }*/ - let deltas = to_js_value(deltas).unwrap(); - self.on_broadcast(deltas); - } } #[wasm_bindgen] @@ -59,58 +49,67 @@ pub struct WasmCRDT(CRDT); #[wasm_bindgen] impl WasmCRDT { - // WARNING: The CRDT actually obtains "ownership" of the event listener, but it can - // still be modified and destructed from inside JavaScript - // Pass in a unique object with unique callbacks into the constructor without - // other references to them to uphold this contract - #[wasm_bindgen(constructor)] - pub fn new(event_listener: &WasmEventListener) -> WasmCRDT { - WasmCRDT(CRDT::new(Box::new((*event_listener).clone()))) - } - - pub fn set_user(&mut self, user: &str) -> bool { - self.0.set_user(user) - } - - pub fn get_user(&self) -> Option<String> { - self.0.get_user() - } - - pub fn add_stroke(&mut self, x: i32, y: i32, weight: f32, colour: &str) -> Option<String> { - self.0.add_stroke(x, y, weight, colour) - } - - pub fn add_point(&mut self, stroke: &str, x: i32, y: i32, weight: f32, colour: &str) -> bool { - self.0.add_point(stroke, x, y, weight, colour) - } - - pub fn erase_stroke(&mut self, stroke: &str, from: f32, to: f32) -> bool { - self.0.erase_stroke(stroke, from, to) - } - - pub fn get_stroke_points(&self, stroke: &str) -> Result<JsValue, JsValue> { - let points = match self.0.get_stroke_points(stroke) { - Some(points) => points, - None => return Err(Error::new("Unknown stroke ID").into()), - }; - - Ok(to_js_value(points)?) - } - - pub fn get_stroke_intervals(&self, stroke: &str) -> Result<JsValue, JsValue> { - let intervals = match self.0.get_stroke_intervals(stroke) { - Some(intervals) => intervals, - None => return Err(Error::new("Unknown stroke ID").into()), - }; - - Ok(to_js_value(intervals)?) - } - - pub fn fetch_events(&mut self) { - self.0.fetch_events() - } - - pub fn fetch_broadcasts(&mut self) { - self.0.fetch_broadcasts() - } + // WARNING: The CRDT actually obtains "ownership" of the event listener, but it can + // still be modified and destructed from inside JavaScript + // Pass in a unique object with unique callbacks into the constructor without + // other references to them to uphold this contract + #[wasm_bindgen(constructor)] + pub fn new(event_listener: &WasmEventListener) -> WasmCRDT { + WasmCRDT(CRDT::new(Box::new((*event_listener).clone()))) + } + + pub fn set_user(&mut self, user: &str) -> bool { + self.0.set_user(user) + } + + pub fn get_user(&self) -> Option<String> { + self.0.get_user() + } + + pub fn add_stroke(&mut self, x: i32, y: i32, weight: f32, colour: &str) -> Option<String> { + self.0.add_stroke(x, y, weight, colour) + } + + pub fn add_point(&mut self, stroke: &str, x: i32, y: i32, weight: f32, colour: &str) -> bool { + self.0.add_point(stroke, x, y, weight, colour) + } + + pub fn erase_stroke(&mut self, stroke: &str, from: f32, to: f32) -> bool { + self.0.erase_stroke(stroke, from, to) + } + + pub fn get_stroke_points(&self, stroke: &str) -> Result<JsValue, JsValue> { + let points = match self.0.get_stroke_points(stroke) { + Some(points) => points, + None => return Err(Error::new("Unknown stroke ID").into()), + }; + + Ok(to_js_value(points)?) + } + + pub fn get_stroke_intervals(&self, stroke: &str) -> Result<JsValue, JsValue> { + let intervals = match self.0.get_stroke_intervals(stroke) { + Some(intervals) => intervals, + None => return Err(Error::new("Unknown stroke ID").into()), + }; + + Ok(to_js_value(intervals)?) + } + + pub fn fetch_events(&mut self) { + self.0.fetch_events() + } + + pub fn fetch_deltas(&mut self) { + self.0.fetch_deltas() + } + + pub fn apply_deltas(&mut self, deltas: Box<[u8]>) -> Result<(), JsValue> { + let deltas = match bincode::deserialize(&deltas) { + Ok(deltas) => deltas, + Err(error) => return Err(Error::new(&format!("{:?}", error)).into()), + }; + + Ok(self.0.apply_deltas(deltas)) + } } diff --git a/vec-map b/vec-map new file mode 160000 index 0000000..9fd05f2 --- /dev/null +++ b/vec-map @@ -0,0 +1 @@ +Subproject commit 9fd05f23024ef98ebaae809f31b30f9c2c644707 diff --git a/www/index.js b/www/index.js index 7d58dae..e1be930 100644 --- a/www/index.js +++ b/www/index.js @@ -1,5 +1,7 @@ import * as wasm from "drawing-crdt" +const broadcasts = [] + const crdt = new wasm.WasmCRDT({ on_stroke: (stroke, points) => { console.log("on_stroke:", stroke, points) @@ -7,8 +9,9 @@ const crdt = new wasm.WasmCRDT({ on_interval: (stroke, intervals) => { console.log("on_interval", stroke, intervals) }, - on_broadcast: (deltas) => { - console.log("on_broadcast:", deltas) + on_deltas: (deltas) => { + console.log("on_deltas:", deltas) + broadcasts.push(deltas) }, }) @@ -22,9 +25,9 @@ console.log("pre fetch events") crdt.fetch_events(); console.log("post fetch events") -console.log("pre fetch broadcasts") -crdt.fetch_broadcasts(); -console.log("post fetch broadcasts") +console.log("pre fetch deltas") +crdt.fetch_deltas(); +console.log("post fetch deltas") crdt.erase_stroke(stroke_id, 0.0, 0.25) crdt.erase_stroke(stroke_id, 0.5, 2.25) @@ -34,6 +37,28 @@ console.log("pre fetch events") crdt.fetch_events(); console.log("post fetch events") -console.log("pre fetch broadcasts") -crdt.fetch_broadcasts(); -console.log("post fetch broadcasts") +console.log("pre fetch deltas") +crdt.fetch_deltas(); +console.log("post fetch deltas") + +const crdt2 = new wasm.WasmCRDT({ + on_stroke: (stroke, points) => { + console.log("on_stroke2:", stroke, points) + }, + on_interval: (stroke, intervals) => { + console.log("on_interval2", stroke, intervals) + }, + on_deltas: (deltas) => { + console.log("on_deltas:2", deltas) + }, +}) + +console.log("pre apply 2nd deltas + fetch events") +crdt2.apply_deltas(broadcasts[1]) +crdt2.fetch_events() +console.log("post apply 2nd deltas + fetch events") + +console.log("pre apply 1st deltas + fetch events") +crdt2.apply_deltas(broadcasts[0]) +crdt2.fetch_events() +console.log("post apply 1st deltas + fetch events") \ No newline at end of file -- GitLab