diff --git a/src/crdt.rs b/src/crdt.rs index 30fae914851471f35f55293e90ea730221b097b8..d4ed9f224fa58b710146affeb58f8dd1e5b5c305 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1,12 +1,15 @@ use fixedbitset::FixedBitSet; use ordered_float::NotNan; -use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor}; -use serde::ser::{Serialize, SerializeSeq, SerializeStruct, SerializeTuple, Serializer}; +use serde::de::{self, Deserialize, DeserializeSeed, Deserializer, MapAccess, SeqAccess, Visitor}; +use serde::ser::{ + Serialize, SerializeMap, SerializeSeq, SerializeStruct, SerializeTuple, Serializer, +}; use std::cmp; use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; use std::convert::TryInto; use std::fmt; +use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::ops::{Deref, DerefMut}; use twox_hash::XxHash64 as XXHasher; @@ -163,7 +166,7 @@ impl Point { } } -pub trait IntervalBound: Copy + Clone + Default + Ord + Hash { +pub trait IntervalBound: Debug + Copy + Clone + Default + Ord + Hash { fn up(self) -> Self; fn down(self) -> Self; fn sub(self, nom: u8, denom: u8) -> Self; @@ -336,7 +339,7 @@ where match intervals.last_mut() { Some(top) => { - if interval.from <= top.to { + if interval.from <= top.to.up() { top.to = cmp::max(top.to, interval.to) } else { intervals.push(interval) @@ -354,7 +357,7 @@ where match intervals.last_mut() { Some(top) => { - if interval.from <= top.to { + if interval.from <= top.to.up() { top.to = cmp::max(top.to, interval.to) } else { intervals.push(interval) @@ -692,7 +695,7 @@ where intervals.push(Interval::new(T::default(), self.range)); } else if footprint != IntervalUnionState::<T>::EMPTY_INTERVALS { for from in 0u8..64u8 { - if (self.footprint & marker) != IntervalUnionState::<T>::EMPTY_INTERVALS { + if (footprint & marker) != IntervalUnionState::<T>::EMPTY_INTERVALS { let interval = Interval::new(self.range.sub(from, 64u8), self.range.sub(from + 1u8, 64u8)); @@ -793,13 +796,154 @@ where } } +#[derive(Debug)] +pub struct DeltaVec(HashMap<StrokeID, StrokeDelta>); + +impl Serialize for DeltaVec { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + struct DeltaVecSerWrapper<'a>(Vec<((usize, u8, usize), &'a StrokeDelta)>); + + impl<'a> Serialize for DeltaVecSerWrapper<'a> { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(self.0.len()))?; + + for (k, v) in &self.0 { + map.serialize_entry(k, v)?; + } + + map.end() + } + } + + let mut buffer: Vec<((usize, u8, usize), &StrokeDelta)> = Vec::with_capacity(self.0.len()); + + let mut lut_tmp: HashMap<u128, usize> = HashMap::new(); + let mut lut_out: Vec<u128> = Vec::new(); + + for (k, v) in &self.0 { + let (user, active) = CRDT::canonicalise_uuid(k.0); + + let id = match lut_tmp.entry(user) { + Occupied(entry) => *entry.get(), + Vacant(entry) => { + let id = lut_out.len(); + + entry.insert(id); + lut_out.push(user); + + id + } + }; + + buffer.push(((id, active, k.1), v)); + } + + let mut tuple = serializer.serialize_tuple(2)?; + + tuple.serialize_element(&lut_out)?; + tuple.serialize_element(&DeltaVecSerWrapper(buffer))?; + + tuple.end() + } +} + +impl<'de> Deserialize<'de> for DeltaVec { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + struct DeltaVecDeWrapper<'a>(&'a Vec<u128>); + + impl<'de, 'a> DeserializeSeed<'de> for DeltaVecDeWrapper<'a> { + type Value = HashMap<StrokeID, StrokeDelta>; + + fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error> + where + D: Deserializer<'de>, + { + struct DeltaVecDeWrapperVisitor<'a>(&'a Vec<u128>); + + impl<'de, 'a> Visitor<'de> for DeltaVecDeWrapperVisitor<'a> { + type Value = HashMap<StrokeID, StrokeDelta>; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "struct DeltaVec") + } + + fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error> + where + M: MapAccess<'de>, + { + let mut map = HashMap::with_capacity(access.size_hint().unwrap_or(0)); + + while let Some(((id, active, idx), value)) = + access.next_entry()?: Option<((usize, u8, usize), StrokeDelta)> + { + let user = match self.0.get(id) { + Some(user) => user, + None => { + return Err(de::Error::custom( + "invalid canonincal user lookup id", + )) + } + }; + + map.insert( + StrokeID::new(CRDT::decanonicalise_uuid(*user, active), idx), + value, + ); + } + + Ok(map) + } + } + + deserializer.deserialize_map(DeltaVecDeWrapperVisitor(self.0)) + } + } + + struct DeltaVecVisitor; + + impl<'de> Visitor<'de> for DeltaVecVisitor { + type Value = DeltaVec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct DeltaVec") + } + + fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error> + where + V: SeqAccess<'de>, + { + let lut_in: Vec<u128> = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(0, &self))?; + + let deltas = seq + .next_element_seed(DeltaVecDeWrapper(&lut_in))? + .ok_or_else(|| de::Error::invalid_length(1, &self))?; + + Ok(DeltaVec(deltas)) + } + } + + deserializer.deserialize_tuple(2, DeltaVecVisitor) + } +} + #[derive(Debug)] pub struct StateVec { strokes: HashMap<u128, IntervalUnion<usize>>, intervals: HashMap<StrokeID, IntervalUnionState<NotNan<f32>>>, } -impl Serialize for StateVec { +/*impl Serialize for StateVec { fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer, @@ -809,11 +953,106 @@ impl Serialize for StateVec { tuple.serialize_element(&self.strokes)?; tuple.serialize_element(&self.intervals)?; + tuple.end() + } +}*/ + +impl Serialize for StateVec { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + struct StateVecStrokesSerWrapper<'a>(Vec<((usize, u8), &'a IntervalUnion<usize>)>); + + impl<'a> Serialize for StateVecStrokesSerWrapper<'a> { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(self.0.len()))?; + + for (k, v) in &self.0 { + map.serialize_entry(k, v)?; + } + + map.end() + } + } + + struct StateVecIntervalsSerWrapper<'a>( + Vec<((usize, u8, usize), &'a IntervalUnionState<NotNan<f32>>)>, + ); + + impl<'a> Serialize for StateVecIntervalsSerWrapper<'a> { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(self.0.len()))?; + + for (k, v) in &self.0 { + map.serialize_entry(k, v)?; + } + + map.end() + } + } + + let mut strokes_buffer: Vec<((usize, u8), &IntervalUnion<usize>)> = + Vec::with_capacity(self.strokes.len()); + let mut intervals_buffer: Vec<((usize, u8, usize), &IntervalUnionState<NotNan<f32>>)> = + Vec::with_capacity(self.intervals.len()); + + let mut lut_tmp: HashMap<u128, usize> = HashMap::new(); + let mut lut_out: Vec<u128> = Vec::new(); + + for (k, v) in &self.strokes { + let (user, active) = CRDT::canonicalise_uuid(*k); + + let id = match lut_tmp.entry(user) { + Occupied(entry) => *entry.get(), + Vacant(entry) => { + let id = lut_out.len(); + + entry.insert(id); + lut_out.push(user); + + id + } + }; + + strokes_buffer.push(((id, active), v)); + } + + for (k, v) in &self.intervals { + let (user, active) = CRDT::canonicalise_uuid(k.0); + + let id = match lut_tmp.entry(user) { + Occupied(entry) => *entry.get(), + Vacant(entry) => { + let id = lut_out.len(); + + entry.insert(id); + lut_out.push(user); + + id + } + }; + + intervals_buffer.push(((id, active, k.1), v)); + } + + let mut tuple = serializer.serialize_tuple(3)?; + + tuple.serialize_element(&lut_out)?; + tuple.serialize_element(&StateVecStrokesSerWrapper(strokes_buffer))?; + tuple.serialize_element(&StateVecIntervalsSerWrapper(intervals_buffer))?; + tuple.end() } } -impl<'de> Deserialize<'de> for StateVec { +/*impl<'de> Deserialize<'de> for StateVec { fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de>, @@ -844,6 +1083,141 @@ impl<'de> Deserialize<'de> for StateVec { deserializer.deserialize_tuple(2, StateVecVisitor) } +}*/ + +impl<'de> Deserialize<'de> for StateVec { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + struct StateVecStrokesDeWrapper<'a>(&'a Vec<u128>); + + impl<'de, 'a> DeserializeSeed<'de> for StateVecStrokesDeWrapper<'a> { + type Value = HashMap<u128, IntervalUnion<usize>>; + + fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error> + where + D: Deserializer<'de>, + { + struct StateVecStrokesDeWrapperVisitor<'a>(&'a Vec<u128>); + + impl<'de, 'a> Visitor<'de> for StateVecStrokesDeWrapperVisitor<'a> { + type Value = HashMap<u128, IntervalUnion<usize>>; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "struct StateVec") + } + + fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error> + where + M: MapAccess<'de>, + { + let mut map = HashMap::with_capacity(access.size_hint().unwrap_or(0)); + + while let Some(((id, active), value)) = + access.next_entry()?: Option<((usize, u8), IntervalUnion<usize>)> + { + let user = match self.0.get(id) { + Some(user) => user, + None => { + return Err(de::Error::custom( + "invalid canonincal user lookup id", + )) + } + }; + + map.insert(CRDT::decanonicalise_uuid(*user, active), value); + } + + Ok(map) + } + } + + deserializer.deserialize_map(StateVecStrokesDeWrapperVisitor(self.0)) + } + } + + struct StateVecIntervalsDeWrapper<'a>(&'a Vec<u128>); + + impl<'de, 'a> DeserializeSeed<'de> for StateVecIntervalsDeWrapper<'a> { + type Value = HashMap<StrokeID, IntervalUnionState<NotNan<f32>>>; + + fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error> + where + D: Deserializer<'de>, + { + struct StateVecIntervalsDeWrapperVisitor<'a>(&'a Vec<u128>); + + impl<'de, 'a> Visitor<'de> for StateVecIntervalsDeWrapperVisitor<'a> { + type Value = HashMap<StrokeID, IntervalUnionState<NotNan<f32>>>; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "struct StateVec") + } + + fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error> + where + M: MapAccess<'de>, + { + let mut map = HashMap::with_capacity(access.size_hint().unwrap_or(0)); + + while let Some(((id, active, idx), value)) = access.next_entry()?: + Option<((usize, u8, usize), IntervalUnionState<NotNan<f32>>)> + { + let user = match self.0.get(id) { + Some(user) => user, + None => { + return Err(de::Error::custom( + "invalid canonincal user lookup id", + )) + } + }; + + map.insert( + StrokeID::new(CRDT::decanonicalise_uuid(*user, active), idx), + value, + ); + } + + Ok(map) + } + } + + deserializer.deserialize_map(StateVecIntervalsDeWrapperVisitor(self.0)) + } + } + + struct StateVecVisitor; + + impl<'de> Visitor<'de> for StateVecVisitor { + type Value = StateVec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct StateVec") + } + + fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error> + where + V: SeqAccess<'de>, + { + let lut_in: Vec<u128> = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(0, &self))?; + + let strokes = seq + .next_element_seed(StateVecStrokesDeWrapper(&lut_in))? + .ok_or_else(|| de::Error::invalid_length(1, &self))?; + + let intervals = seq + .next_element_seed(StateVecIntervalsDeWrapper(&lut_in))? + .ok_or_else(|| de::Error::invalid_length(2, &self))?; + + Ok(StateVec { strokes, intervals }) + } + } + + deserializer.deserialize_tuple(3, StateVecVisitor) + } } impl StateVec { @@ -859,8 +1233,8 @@ pub trait EventListener { fn on_stroke(&self, stroke: String, points: &VecMap<Point>); fn on_interval(&self, stroke: String, intervals: &IntervalUnion<NotNan<f32>>); - fn on_deltas(&self, deltas: HashMap<StrokeID, StrokeDelta>); - fn on_deltas_from_state(&self, user: String, deltas: HashMap<StrokeID, StrokeDelta>); + fn on_deltas(&self, deltas: DeltaVec); + fn on_deltas_from_state(&self, user: String, deltas: DeltaVec); } pub struct CRDT { @@ -1262,10 +1636,14 @@ impl CRDT { let mut deltas = HashMap::new(); std::mem::swap(&mut self.deltas, &mut deltas); - self.event_listener.on_deltas(deltas); + web_sys::console::log_1(&format!("temporal deltas: {:?}", deltas).into()); + + self.event_listener.on_deltas(DeltaVec(deltas)); } - pub fn apply_deltas(&mut self, deltas: HashMap<StrokeID, StrokeDelta>) { + pub fn apply_deltas(&mut self, deltas: DeltaVec) { + let deltas = deltas.0; + if deltas.is_empty() { return; }; @@ -1370,8 +1748,8 @@ impl CRDT { let mut deltas = HashMap::new(); - web_sys::console::log_1(&format!("local {:?}", self.state).into()); - web_sys::console::log_1(&format!("remote {:?}", remote_state).into()); + web_sys::console::log_1(&format!("local state: {:?}", self.state).into()); + web_sys::console::log_1(&format!("remote state: {:?}", remote_state).into()); for (user, strokes_state) in &self.state.strokes { let strokes = &self.crdt.get(user).unwrap().strokes; @@ -1469,10 +1847,9 @@ impl CRDT { } } - web_sys::console::log_1(&format!("deltas {:?}", deltas).into()); + web_sys::console::log_1(&format!("deltas from state: {:?}", deltas).into()); - self.event_listener.on_deltas_from_state(user, deltas) + self.event_listener + .on_deltas_from_state(user, DeltaVec(deltas)) } - - // TODO: implement serialisation/deserialisation for StateVec and DeltaVec (new type still needed) such that user UUIDs (incl. in stroke IDs) are encoded as (usize, u8) where the former is the index into a "VecSet" of canonical UUIDs and the latter is the modifier to the canonical ID } diff --git a/src/lib.rs b/src/lib.rs index 235e23d892fc53abcf741b6b796f15f2ef02903e..89c5d143203b19b6798af8ad19be07e7e7cec6a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,10 @@ +#![feature(type_ascription)] + use wasm_bindgen::prelude::*; use js_sys::Error; use ordered_float::NotNan; use serde_wasm_bindgen::to_value as to_js_value; -use std::collections::HashMap; use std::option::Option; use vec_map::VecMap; @@ -14,7 +15,7 @@ static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT; mod utils; mod crdt; -use crdt::{EventListener, IntervalUnion, Point, StrokeDelta, StrokeID, CRDT}; +use crdt::{DeltaVec, EventListener, IntervalUnion, Point, CRDT}; mod packing; @@ -45,11 +46,11 @@ impl EventListener for WasmEventListener { self.on_interval(stroke, to_js_value(intervals).unwrap()) } - fn on_deltas(&self, deltas: HashMap<StrokeID, StrokeDelta>) { + fn on_deltas(&self, deltas: DeltaVec) { self.on_deltas(packing::pack(&bincode::serialize(&deltas).unwrap()).into_boxed_slice()) } - fn on_deltas_from_state(&self, user: String, deltas: HashMap<StrokeID, StrokeDelta>) { + fn on_deltas_from_state(&self, user: String, deltas: DeltaVec) { self.on_deltas_from_state( user, packing::pack(&bincode::serialize(&deltas).unwrap()).into_boxed_slice(),