From 79a932feb17ad3c56c57c5c5b64dcc86a40ae9b3 Mon Sep 17 00:00:00 2001 From: Moritz Langenstein <ml5717@ic.ac.uk> Date: Tue, 24 Dec 2019 17:04:38 +0000 Subject: [PATCH] (ml5717) Added state vector for strokes --- Cargo.toml | 1 + src/crdt.rs | 364 ++++++++++++++++++++++++++++++++++++++++++--------- src/lib.rs | 41 +++++- vec-map | 2 +- www/index.js | 19 ++- 5 files changed, 357 insertions(+), 70 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c0c8f50..a8191ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ serde_derive = "1.0.104" serde-wasm-bindgen = "0.1.3" vec_map = { path = "vec-map", features = ["serde"] } bincode = "1.2.1" +# web-sys = { version = "0.3.33", features = ["console"] } # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires diff --git a/src/crdt.rs b/src/crdt.rs index 2cc302d..b69838e 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -7,6 +7,7 @@ use std::ops::{Deref, DerefMut}; use uuid::Uuid; use vec_map::VecMap; +#[derive(Debug)] struct Dirty<T> { value: T, dirty: bool, @@ -44,7 +45,7 @@ impl<T> Dirty<T> { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Point { x: i32, y: i32, @@ -152,11 +153,11 @@ impl IntervalBound for f32 { fn max(a: f32, b: f32) -> f32 { f32::max(a, b) } - + fn up(self) -> f32 { self } - + fn down(self) -> f32 { self } @@ -166,18 +167,18 @@ impl IntervalBound for usize { fn max(a: usize, b: usize) -> usize { cmp::max(a, b) } - + fn up(self) -> usize { self + 1 } - + fn down(self) -> usize { self - 1 } } -#[derive(Copy, Clone, PartialEq)] -struct Interval<T> +#[derive(Copy, Clone, PartialEq, Debug)] +pub struct Interval<T> where T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, { @@ -194,6 +195,7 @@ where } } +#[derive(Clone, Debug)] pub struct IntervalUnion<T>(Vec<Interval<T>>) where T: Copy + Clone + PartialEq + PartialOrd + IntervalBound; @@ -341,9 +343,9 @@ where fn difference(&mut self, other: &IntervalUnion<T>) { let mut si = 0; let mut oi = 0; - + let mut intervals: Vec<Interval<T>> = Vec::new(); - + while si < self.0.len() && oi < other.0.len() { if other.0[oi].to < self.0[si].from { oi += 1; @@ -369,15 +371,28 @@ where si += 1; } } - + if si < self.0.len() { intervals.extend_from_slice(&self.0[si..]) }; - + self.0 = intervals; } } +impl<T> IntoIterator for IntervalUnion<T> +where + T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, +{ + type Item = Interval<T>; + type IntoIter = std::vec::IntoIter<Self::Item>; + + #[inline] + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + impl<T> From<Interval<T>> for IntervalUnion<T> where T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, @@ -387,22 +402,26 @@ where } } +#[derive(Debug)] struct Stroke { + index: usize, points: Dirty<VecMap<Point>>, intervals: Dirty<IntervalUnion<f32>>, } impl Stroke { - fn new() -> Stroke { + fn new(index: usize) -> Stroke { Stroke { + index, points: Dirty::new(VecMap::new(), true), intervals: Dirty::new(IntervalUnion::new(), true), } } } +#[derive(Debug)] pub struct StrokeDelta { - points: VecMap<Point>, + points: HashMap<usize, Point>, intervals: IntervalUnion<f32>, } @@ -456,25 +475,32 @@ impl<'de> Deserialize<'de> for StrokeDelta { impl StrokeDelta { fn new() -> StrokeDelta { StrokeDelta { - points: VecMap::new(), + points: HashMap::new(), intervals: IntervalUnion::new(), } } } +#[derive(Debug)] struct User { - strokes: VecMap<Stroke>, + strokes: Vec<Stroke>, } impl User { fn new() -> User { User { - strokes: VecMap::new(), + strokes: Vec::new(), } } + + fn get_stroke_idx(&self, index: usize) -> Option<usize> { + self.strokes + .binary_search_by_key(&index, |stroke| stroke.index) + .ok() + } } -#[derive(Hash, Eq, PartialEq)] +#[derive(Hash, Eq, PartialEq, Debug)] pub struct StrokeID(u128, usize); impl Serialize for StrokeID { @@ -534,16 +560,69 @@ impl StrokeID { } } +#[derive(Debug)] pub struct StateVec { strokes: HashMap<u128, IntervalUnion<usize>>, //intervals: HashMap<>, } +impl Serialize for StateVec { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut tuple = serializer.serialize_tuple(1)?; + + tuple.serialize_element(&self.strokes)?; + + tuple.end() + } +} + +impl<'de> Deserialize<'de> for StateVec { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + struct StateVecVisitor; + + impl<'de> Visitor<'de> for StateVecVisitor { + type Value = StateVec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct StateVec") + } + + fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error> + where + V: SeqAccess<'de>, + { + let strokes = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(0, &self))?; + + Ok(StateVec { strokes }) + } + } + + deserializer.deserialize_tuple(1, StateVecVisitor) + } +} + +impl StateVec { + fn new() -> StateVec { + StateVec { + strokes: HashMap::new(), + } + } +} + pub trait EventListener { fn on_stroke(&self, stroke: String, points: &VecMap<Point>); fn on_interval(&self, stroke: String, intervals: &IntervalUnion<f32>); - fn on_deltas(&self, deltas: &HashMap<StrokeID, StrokeDelta>); + fn on_deltas(&self, deltas: HashMap<StrokeID, StrokeDelta>); + fn on_deltas_from_state(&self, user: String, deltas: HashMap<StrokeID, StrokeDelta>); } pub struct CRDT { @@ -605,11 +684,16 @@ impl CRDT { .crdt .entry(user) .or_insert_with(|| Dirty::new(User::new(), true)); - let mut stroke = Stroke::new(); + + let stroke_idx = match entry.strokes.last() { + Some(stroke) => stroke.index + stroke.points.vec_len(), + None => 0, + }; + + let mut stroke = Stroke::new(stroke_idx); stroke.points.push(point.clone()); - entry.strokes.push(stroke); - let stroke_idx = entry.strokes.len() - 1; + entry.strokes.push(stroke); // Implicit call to stroke.points.dirty() in Stroke::new() entry.dirty(); @@ -624,7 +708,7 @@ impl CRDT { Some(CRDT::uuid_to_string(user) + "-" + &stroke_idx.to_string()) } - fn get_user_mut(&mut self, stroke_id: &str) -> Option<(&mut Dirty<User>, u128, usize)> { + fn split_stroke_id(&self, stroke_id: &str) -> Option<(&Dirty<User>, u128, usize)> { let (author, stroke) = match stroke_id.rfind('-') { Some(split) => stroke_id.split_at(split), None => return None, @@ -635,7 +719,7 @@ impl CRDT { None => return None, }; - let entry = match self.crdt.get_mut(&author) { + let entry = match self.crdt.get(&author) { Some(entry) => entry, None => return None, }; @@ -646,22 +730,24 @@ impl CRDT { } } - fn get_stroke(&self, stroke_id: &str) -> Option<&Stroke> { + fn split_stroke_id_mut(&mut self, stroke_id: &str) -> Option<(&mut Dirty<User>, u128, usize)> { let (author, stroke) = match stroke_id.rfind('-') { Some(split) => stroke_id.split_at(split), None => return None, }; - let entry = match CRDT::string_to_uuid(author) - .as_ref() - .and_then(move |author| self.crdt.get(author)) - { + let author = match CRDT::string_to_uuid(author) { + Some(author) => author, + None => return None, + }; + + let entry = match self.crdt.get_mut(&author) { Some(entry) => entry, None => return None, }; match stroke[1..].parse::<usize>() { - Ok(stroke) => entry.strokes.get(stroke), + Ok(stroke_idx) => Some((entry, author, stroke_idx)), _ => None, } } @@ -674,12 +760,17 @@ impl CRDT { weight: f32, colour: &str, ) -> bool { - let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) { + let (entry, user, stroke_idx) = match self.split_stroke_id_mut(stroke_id) { Some(stroke) => stroke, None => return false, }; - let stroke = match entry.strokes.get_mut(stroke_idx) { + let idx = match entry.get_stroke_idx(stroke_idx) { + Some(idx) if idx == (entry.strokes.len() - 1) => idx, + _ => return false, + }; + + let stroke = match entry.strokes.get_mut(idx) { Some(stroke) => stroke, _ => return false, }; @@ -689,7 +780,8 @@ impl CRDT { None => return false, }; - let point_idx = stroke.points.push(point.clone()) - 1; + stroke.points.push(point.clone()); + let point_idx = stroke.points.vec_len() - 1; stroke.points.dirty(); entry.dirty(); @@ -709,12 +801,17 @@ impl CRDT { return false; }; - let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) { + let (entry, user, stroke_idx) = match self.split_stroke_id_mut(stroke_id) { Some(stroke) => stroke, None => return false, }; - let stroke = match entry.strokes.get_mut(stroke_idx) { + let idx = match entry.get_stroke_idx(stroke_idx) { + Some(idx) => idx, + _ => return false, + }; + + let stroke = match entry.strokes.get_mut(idx) { Some(stroke) => stroke, _ => return false, }; @@ -736,12 +833,34 @@ impl CRDT { true } - pub fn get_stroke_points(&self, stroke: &str) -> Option<&VecMap<Point>> { - self.get_stroke(stroke).map(|stroke| stroke.points.deref()) + pub fn get_stroke_points(&self, stroke_id: &str) -> Option<&VecMap<Point>> { + let (entry, _user, stroke_idx) = match self.split_stroke_id(stroke_id) { + Some(stroke) => stroke, + None => return None, + }; + + let idx = match entry.get_stroke_idx(stroke_idx) { + Some(idx) => idx, + None => return None, + }; + + entry.strokes.get(idx).map(|stroke| stroke.points.deref()) } - pub fn get_stroke_intervals(&self, stroke: &str) -> Option<&IntervalUnion<f32>> { - self.get_stroke(stroke) + pub fn get_stroke_intervals(&self, stroke_id: &str) -> Option<&IntervalUnion<f32>> { + let (entry, _user, stroke_idx) = match self.split_stroke_id(stroke_id) { + Some(stroke) => stroke, + None => return None, + }; + + let idx = match entry.get_stroke_idx(stroke_idx) { + Some(idx) => idx, + _ => return None, + }; + + entry + .strokes + .get(idx) .map(|stroke| stroke.intervals.deref()) } @@ -755,7 +874,7 @@ impl CRDT { let mut stroke_events = Vec::new(); let mut interval_events = Vec::new(); - self.crdt.iter_mut().for_each(|(user, entry)| { + for (user, entry) in self.crdt.deref_mut() { if !entry.is_dirty() { return; }; @@ -764,7 +883,7 @@ impl CRDT { let user = CRDT::uuid_to_string(*user) + "-"; - entry.strokes.iter_mut().for_each(|(i, stroke)| { + for (i, stroke) in entry.strokes.iter_mut().enumerate() { if stroke.points.is_dirty() { stroke.points.clean(); stroke_events.push((user.clone() + &i.to_string(), stroke.points.deref())); @@ -774,8 +893,8 @@ impl CRDT { stroke.intervals.clean(); interval_events.push((user.clone() + &i.to_string(), stroke.intervals.deref())); } - }); - }); + } + } for (user, points) in stroke_events { self.event_listener.on_stroke(user, points); @@ -791,9 +910,12 @@ impl CRDT { return; }; - self.event_listener.on_deltas(&self.deltas); + let mut deltas = HashMap::new(); + std::mem::swap(&mut self.deltas, &mut deltas); + + //web_sys::console::log_1(&format!("deltas {:?}", deltas).into()); - self.deltas.clear(); + self.event_listener.on_deltas(deltas); } pub fn apply_deltas(&mut self, deltas: HashMap<StrokeID, StrokeDelta>) { @@ -803,32 +925,152 @@ impl CRDT { self.crdt.dirty(); - deltas - .into_iter() - .for_each(|(stroke_id, mut stroke_delta)| { - let entry = self - .crdt - .entry(stroke_id.0) - .or_insert_with(|| Dirty::new(User::new(), true)); + for (stroke_id, stroke_delta) in deltas { + let entry = self + .crdt + .entry(stroke_id.0) + .or_insert_with(|| Dirty::new(User::new(), true)); + + entry.dirty(); + + let stroke = match entry + .strokes + .binary_search_by_key(&stroke_id.1, |stroke| stroke.index) + { + Ok(idx) => &mut entry.strokes[idx], + Err(idx) => { + entry.strokes.insert(idx, Stroke::new(stroke_id.1)); + &mut entry.strokes[idx] + } + }; + + if !stroke_delta.points.is_empty() { + stroke.points.dirty(); - entry.dirty(); + stroke.points.extend(stroke_delta.points.into_iter()); + } - let stroke = entry.strokes.entry(stroke_id.1).or_insert_with(Stroke::new); + if stroke.intervals.union(&stroke_delta.intervals) { + stroke.intervals.dirty() + }; + } + } - if !stroke_delta.points.is_empty() { - stroke.points.dirty(); - stroke.points.append(&mut stroke_delta.points); + pub fn get_state_vector(&self) -> StateVec { + let mut state = StateVec::new(); + state.strokes.reserve(self.crdt.len()); + + for (user, entry) in self.crdt.deref() { + let mut point_intervals: Vec<Interval<usize>> = Vec::new(); + + for stroke in &entry.strokes { + for point_idx in stroke.points.keys() { + match point_intervals.last_mut() { + Some(interval) if interval.to.up() == (stroke.index + point_idx) => { + interval.to = stroke.index + point_idx + } + _ => point_intervals.push(Interval::new( + stroke.index + point_idx, + stroke.index + point_idx, + )), + }; } + } - if stroke.intervals.union(&stroke_delta.intervals) { - stroke.intervals.dirty() - }; - }); + state.strokes.insert(*user, IntervalUnion(point_intervals)); + } + + state } - pub fn get_state_vector(&self) {} + pub fn fetch_deltas_from_state_vector(&self, user: String, remote_state: &StateVec) { + let mut deltas = HashMap::new(); + + //web_sys::console::log_1(&format!("remote state: {:?}", remote_state).into()); + //web_sys::console::log_1(&format!("local state: {:?}", self.get_state_vector()).into()); + + for (user, strokes_state) in &self.get_state_vector().strokes { + let strokes = &self.crdt.get(user).unwrap().strokes; + + if let Some(remote_strokes_state) = remote_state.strokes.get(user) { + //web_sys::console::log_1(&format!("remote strokes state: {:?}", remote_strokes_state).into()); + //web_sys::console::log_1(&format!("local strokes state: {:?}", strokes_state).into()); + + let mut difference = (*strokes_state).clone(); + difference.difference(remote_strokes_state); + + //web_sys::console::log_1(&format!("strokes state difference: {:?}", difference).into()); + + if difference.is_empty() { + continue; + }; + + for interval in difference { + loop { + let stroke_idx = match strokes + .binary_search_by_key(&interval.from, |stroke| stroke.index) + { + Ok(stroke_idx) => stroke_idx, + Err(after_idx) => after_idx - 1, + }; + + let stroke = strokes.get(stroke_idx).unwrap(); + let points_len = stroke.points.vec_len(); + + //web_sys::console::log_1(&format!("stroke_idx {:?}", stroke_idx).into()); + //web_sys::console::log_1(&format!("stroke {:?}", stroke).into()); + //web_sys::console::log_1(&format!("points_len {:?}", points_len).into()); + + let mut stroke_delta = StrokeDelta::new(); + stroke_delta + .points + .reserve(cmp::min(points_len, interval.to - interval.from + 1)); + + //web_sys::console::log_1(&format!("point_idx range {:?}", (interval.from - stroke.index)..=cmp::min(interval.to - stroke.index, points_len - 1)).into()); + + for point_idx in (interval.from - stroke.index) + ..=cmp::min(interval.to - stroke.index, points_len - 1) + { + stroke_delta + .points + .insert(point_idx, stroke.points.get(point_idx).unwrap().clone()); + } + + deltas.insert(StrokeID::new(*user, stroke.index), stroke_delta); + + if interval.to < (stroke.index + points_len) { + break; + }; + } + } + } else { + for stroke in strokes { + let len = stroke.points.len(); + + if len == 0 { + continue; + }; + + let mut stroke_delta = StrokeDelta::new(); + stroke_delta.points.reserve(len); + + for (point_idx, point) in stroke.points.deref() { + stroke_delta.points.insert(point_idx, point.clone()); + } + + deltas.insert(StrokeID::new(*user, stroke.index), stroke_delta); + } + } + } + + //web_sys::console::log_1(&format!("deltas {:?}", deltas).into()); + + self.event_listener.on_deltas_from_state(user, deltas) + } // TODO: add endStroke() method that finishes the named stroke // TODO: update addStroke() to use a new username (XOR variant + version bits in UUID) if last stroke still unfinished // TODO: keep track of users with unfinished strokes to use "fake" usernames optimally + + // TODO: intervals in StateVec } diff --git a/src/lib.rs b/src/lib.rs index 56f1f11..ccc9682 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,9 @@ extern "C" { #[wasm_bindgen(structural, method)] pub fn on_deltas(this: &WasmEventListener, deltas: Box<[u8]>); + + #[wasm_bindgen(structural, method)] + pub fn on_deltas_from_state(this: &WasmEventListener, user: String, deltas: Box<[u8]>); } impl EventListener for WasmEventListener { @@ -39,8 +42,15 @@ impl EventListener for WasmEventListener { self.on_interval(stroke, to_js_value(intervals).unwrap()) } - fn on_deltas(&self, deltas: &HashMap<StrokeID, StrokeDelta>) { - self.on_deltas(bincode::serialize(deltas).unwrap().into_boxed_slice()) + fn on_deltas(&self, deltas: HashMap<StrokeID, StrokeDelta>) { + self.on_deltas(bincode::serialize(&deltas).unwrap().into_boxed_slice()) + } + + fn on_deltas_from_state(&self, user: String, deltas: HashMap<StrokeID, StrokeDelta>) { + self.on_deltas_from_state( + user, + bincode::serialize(&deltas).unwrap().into_boxed_slice(), + ) } } @@ -78,8 +88,8 @@ impl WasmCRDT { self.0.erase_stroke(stroke, from, to) } - pub fn get_stroke_points(&self, stroke: &str) -> Result<JsValue, JsValue> { - let points = match self.0.get_stroke_points(stroke) { + pub fn get_stroke_points(&self, stroke_id: &str) -> Result<JsValue, JsValue> { + let points = match self.0.get_stroke_points(stroke_id) { Some(points) => points, None => return Err(Error::new("Unknown stroke ID").into()), }; @@ -87,8 +97,8 @@ impl WasmCRDT { Ok(to_js_value(points)?) } - pub fn get_stroke_intervals(&self, stroke: &str) -> Result<JsValue, JsValue> { - let intervals = match self.0.get_stroke_intervals(stroke) { + pub fn get_stroke_intervals(&self, stroke_id: &str) -> Result<JsValue, JsValue> { + let intervals = match self.0.get_stroke_intervals(stroke_id) { Some(intervals) => intervals, None => return Err(Error::new("Unknown stroke ID").into()), }; @@ -112,4 +122,23 @@ impl WasmCRDT { Ok(self.0.apply_deltas(deltas)) } + + pub fn get_state_vector(&self) -> Box<[u8]> { + bincode::serialize(&self.0.get_state_vector()) + .unwrap() + .into_boxed_slice() + } + + pub fn fetch_deltas_from_state_vector( + &self, + user: String, + remote_state: Box<[u8]>, + ) -> Result<(), JsValue> { + let remote_state = match bincode::deserialize(&remote_state) { + Ok(remote_state) => remote_state, + Err(error) => return Err(Error::new(&format!("{:?}", error)).into()), + }; + + Ok(self.0.fetch_deltas_from_state_vector(user, &remote_state)) + } } diff --git a/vec-map b/vec-map index 9fd05f2..80fab04 160000 --- a/vec-map +++ b/vec-map @@ -1 +1 @@ -Subproject commit 9fd05f23024ef98ebaae809f31b30f9c2c644707 +Subproject commit 80fab04b18e020ccb645947dc026915b4503886d diff --git a/www/index.js b/www/index.js index e1be930..61ae568 100644 --- a/www/index.js +++ b/www/index.js @@ -13,6 +13,9 @@ const crdt = new wasm.WasmCRDT({ console.log("on_deltas:", deltas) broadcasts.push(deltas) }, + on_deltas_from_state: (user, deltas) => { + console.log("on_deltas_from_state:", user, deltas) + }, }) crdt.set_user("36577c51-a80b-47d6-b3c3-cfb11f705b87") @@ -51,6 +54,9 @@ const crdt2 = new wasm.WasmCRDT({ on_deltas: (deltas) => { console.log("on_deltas:2", deltas) }, + on_deltas_from_state: (user, deltas) => { + console.log("on_deltas_from_state:", user, deltas) + }, }) console.log("pre apply 2nd deltas + fetch events") @@ -58,7 +64,16 @@ crdt2.apply_deltas(broadcasts[1]) crdt2.fetch_events() console.log("post apply 2nd deltas + fetch events") -console.log("pre apply 1st deltas + fetch events") +console.log("CRDT StateVec", crdt.get_state_vector()) + +const state2 = crdt2.get_state_vector() +console.log("CRDT2 StateVec", state2) + +console.log("pre fetch deltas from 1 for 2") +crdt.fetch_deltas_from_state_vector("moritz", state2) +console.log("post fetch deltas from 1 for 2") + +/*console.log("pre apply 1st deltas + fetch events") crdt2.apply_deltas(broadcasts[0]) crdt2.fetch_events() -console.log("post apply 1st deltas + fetch events") \ No newline at end of file +console.log("post apply 1st deltas + fetch events")*/ \ No newline at end of file -- GitLab