Skip to content
Snippets Groups Projects
Commit a4090829 authored by Moritz Langenstein's avatar Moritz Langenstein
Browse files

(ml5717) User UUID extraction for StateVec and DelaVec serde

parent 89babd49
No related branches found
No related tags found
No related merge requests found
use fixedbitset::FixedBitSet;
use ordered_float::NotNan;
use serde::de::{self, Deserialize, Deserializer, SeqAccess, Visitor};
use serde::ser::{Serialize, SerializeSeq, SerializeStruct, SerializeTuple, Serializer};
use serde::de::{self, Deserialize, DeserializeSeed, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::ser::{
Serialize, SerializeMap, SerializeSeq, SerializeStruct, SerializeTuple, Serializer,
};
use std::cmp;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::ops::{Deref, DerefMut};
use twox_hash::XxHash64 as XXHasher;
......@@ -163,7 +166,7 @@ impl Point {
}
}
pub trait IntervalBound: Copy + Clone + Default + Ord + Hash {
pub trait IntervalBound: Debug + Copy + Clone + Default + Ord + Hash {
fn up(self) -> Self;
fn down(self) -> Self;
fn sub(self, nom: u8, denom: u8) -> Self;
......@@ -336,7 +339,7 @@ where
match intervals.last_mut() {
Some(top) => {
if interval.from <= top.to {
if interval.from <= top.to.up() {
top.to = cmp::max(top.to, interval.to)
} else {
intervals.push(interval)
......@@ -354,7 +357,7 @@ where
match intervals.last_mut() {
Some(top) => {
if interval.from <= top.to {
if interval.from <= top.to.up() {
top.to = cmp::max(top.to, interval.to)
} else {
intervals.push(interval)
......@@ -692,7 +695,7 @@ where
intervals.push(Interval::new(T::default(), self.range));
} else if footprint != IntervalUnionState::<T>::EMPTY_INTERVALS {
for from in 0u8..64u8 {
if (self.footprint & marker) != IntervalUnionState::<T>::EMPTY_INTERVALS {
if (footprint & marker) != IntervalUnionState::<T>::EMPTY_INTERVALS {
let interval =
Interval::new(self.range.sub(from, 64u8), self.range.sub(from + 1u8, 64u8));
......@@ -793,13 +796,154 @@ where
}
}
#[derive(Debug)]
pub struct DeltaVec(HashMap<StrokeID, StrokeDelta>);
impl Serialize for DeltaVec {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
struct DeltaVecSerWrapper<'a>(Vec<((usize, u8, usize), &'a StrokeDelta)>);
impl<'a> Serialize for DeltaVecSerWrapper<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(self.0.len()))?;
for (k, v) in &self.0 {
map.serialize_entry(k, v)?;
}
map.end()
}
}
let mut buffer: Vec<((usize, u8, usize), &StrokeDelta)> = Vec::with_capacity(self.0.len());
let mut lut_tmp: HashMap<u128, usize> = HashMap::new();
let mut lut_out: Vec<u128> = Vec::new();
for (k, v) in &self.0 {
let (user, active) = CRDT::canonicalise_uuid(k.0);
let id = match lut_tmp.entry(user) {
Occupied(entry) => *entry.get(),
Vacant(entry) => {
let id = lut_out.len();
entry.insert(id);
lut_out.push(user);
id
}
};
buffer.push(((id, active, k.1), v));
}
let mut tuple = serializer.serialize_tuple(2)?;
tuple.serialize_element(&lut_out)?;
tuple.serialize_element(&DeltaVecSerWrapper(buffer))?;
tuple.end()
}
}
impl<'de> Deserialize<'de> for DeltaVec {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct DeltaVecDeWrapper<'a>(&'a Vec<u128>);
impl<'de, 'a> DeserializeSeed<'de> for DeltaVecDeWrapper<'a> {
type Value = HashMap<StrokeID, StrokeDelta>;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
struct DeltaVecDeWrapperVisitor<'a>(&'a Vec<u128>);
impl<'de, 'a> Visitor<'de> for DeltaVecDeWrapperVisitor<'a> {
type Value = HashMap<StrokeID, StrokeDelta>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "struct DeltaVec")
}
fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut map = HashMap::with_capacity(access.size_hint().unwrap_or(0));
while let Some(((id, active, idx), value)) =
access.next_entry()?: Option<((usize, u8, usize), StrokeDelta)>
{
let user = match self.0.get(id) {
Some(user) => user,
None => {
return Err(de::Error::custom(
"invalid canonincal user lookup id",
))
}
};
map.insert(
StrokeID::new(CRDT::decanonicalise_uuid(*user, active), idx),
value,
);
}
Ok(map)
}
}
deserializer.deserialize_map(DeltaVecDeWrapperVisitor(self.0))
}
}
struct DeltaVecVisitor;
impl<'de> Visitor<'de> for DeltaVecVisitor {
type Value = DeltaVec;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("struct DeltaVec")
}
fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error>
where
V: SeqAccess<'de>,
{
let lut_in: Vec<u128> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(0, &self))?;
let deltas = seq
.next_element_seed(DeltaVecDeWrapper(&lut_in))?
.ok_or_else(|| de::Error::invalid_length(1, &self))?;
Ok(DeltaVec(deltas))
}
}
deserializer.deserialize_tuple(2, DeltaVecVisitor)
}
}
#[derive(Debug)]
pub struct StateVec {
strokes: HashMap<u128, IntervalUnion<usize>>,
intervals: HashMap<StrokeID, IntervalUnionState<NotNan<f32>>>,
}
impl Serialize for StateVec {
/*impl Serialize for StateVec {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
......@@ -809,11 +953,106 @@ impl Serialize for StateVec {
tuple.serialize_element(&self.strokes)?;
tuple.serialize_element(&self.intervals)?;
tuple.end()
}
}*/
impl Serialize for StateVec {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
struct StateVecStrokesSerWrapper<'a>(Vec<((usize, u8), &'a IntervalUnion<usize>)>);
impl<'a> Serialize for StateVecStrokesSerWrapper<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(self.0.len()))?;
for (k, v) in &self.0 {
map.serialize_entry(k, v)?;
}
map.end()
}
}
struct StateVecIntervalsSerWrapper<'a>(
Vec<((usize, u8, usize), &'a IntervalUnionState<NotNan<f32>>)>,
);
impl<'a> Serialize for StateVecIntervalsSerWrapper<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(self.0.len()))?;
for (k, v) in &self.0 {
map.serialize_entry(k, v)?;
}
map.end()
}
}
let mut strokes_buffer: Vec<((usize, u8), &IntervalUnion<usize>)> =
Vec::with_capacity(self.strokes.len());
let mut intervals_buffer: Vec<((usize, u8, usize), &IntervalUnionState<NotNan<f32>>)> =
Vec::with_capacity(self.intervals.len());
let mut lut_tmp: HashMap<u128, usize> = HashMap::new();
let mut lut_out: Vec<u128> = Vec::new();
for (k, v) in &self.strokes {
let (user, active) = CRDT::canonicalise_uuid(*k);
let id = match lut_tmp.entry(user) {
Occupied(entry) => *entry.get(),
Vacant(entry) => {
let id = lut_out.len();
entry.insert(id);
lut_out.push(user);
id
}
};
strokes_buffer.push(((id, active), v));
}
for (k, v) in &self.intervals {
let (user, active) = CRDT::canonicalise_uuid(k.0);
let id = match lut_tmp.entry(user) {
Occupied(entry) => *entry.get(),
Vacant(entry) => {
let id = lut_out.len();
entry.insert(id);
lut_out.push(user);
id
}
};
intervals_buffer.push(((id, active, k.1), v));
}
let mut tuple = serializer.serialize_tuple(3)?;
tuple.serialize_element(&lut_out)?;
tuple.serialize_element(&StateVecStrokesSerWrapper(strokes_buffer))?;
tuple.serialize_element(&StateVecIntervalsSerWrapper(intervals_buffer))?;
tuple.end()
}
}
impl<'de> Deserialize<'de> for StateVec {
/*impl<'de> Deserialize<'de> for StateVec {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
......@@ -844,6 +1083,141 @@ impl<'de> Deserialize<'de> for StateVec {
deserializer.deserialize_tuple(2, StateVecVisitor)
}
}*/
impl<'de> Deserialize<'de> for StateVec {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct StateVecStrokesDeWrapper<'a>(&'a Vec<u128>);
impl<'de, 'a> DeserializeSeed<'de> for StateVecStrokesDeWrapper<'a> {
type Value = HashMap<u128, IntervalUnion<usize>>;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
struct StateVecStrokesDeWrapperVisitor<'a>(&'a Vec<u128>);
impl<'de, 'a> Visitor<'de> for StateVecStrokesDeWrapperVisitor<'a> {
type Value = HashMap<u128, IntervalUnion<usize>>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "struct StateVec")
}
fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut map = HashMap::with_capacity(access.size_hint().unwrap_or(0));
while let Some(((id, active), value)) =
access.next_entry()?: Option<((usize, u8), IntervalUnion<usize>)>
{
let user = match self.0.get(id) {
Some(user) => user,
None => {
return Err(de::Error::custom(
"invalid canonincal user lookup id",
))
}
};
map.insert(CRDT::decanonicalise_uuid(*user, active), value);
}
Ok(map)
}
}
deserializer.deserialize_map(StateVecStrokesDeWrapperVisitor(self.0))
}
}
struct StateVecIntervalsDeWrapper<'a>(&'a Vec<u128>);
impl<'de, 'a> DeserializeSeed<'de> for StateVecIntervalsDeWrapper<'a> {
type Value = HashMap<StrokeID, IntervalUnionState<NotNan<f32>>>;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
struct StateVecIntervalsDeWrapperVisitor<'a>(&'a Vec<u128>);
impl<'de, 'a> Visitor<'de> for StateVecIntervalsDeWrapperVisitor<'a> {
type Value = HashMap<StrokeID, IntervalUnionState<NotNan<f32>>>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "struct StateVec")
}
fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut map = HashMap::with_capacity(access.size_hint().unwrap_or(0));
while let Some(((id, active, idx), value)) = access.next_entry()?:
Option<((usize, u8, usize), IntervalUnionState<NotNan<f32>>)>
{
let user = match self.0.get(id) {
Some(user) => user,
None => {
return Err(de::Error::custom(
"invalid canonincal user lookup id",
))
}
};
map.insert(
StrokeID::new(CRDT::decanonicalise_uuid(*user, active), idx),
value,
);
}
Ok(map)
}
}
deserializer.deserialize_map(StateVecIntervalsDeWrapperVisitor(self.0))
}
}
struct StateVecVisitor;
impl<'de> Visitor<'de> for StateVecVisitor {
type Value = StateVec;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("struct StateVec")
}
fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error>
where
V: SeqAccess<'de>,
{
let lut_in: Vec<u128> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(0, &self))?;
let strokes = seq
.next_element_seed(StateVecStrokesDeWrapper(&lut_in))?
.ok_or_else(|| de::Error::invalid_length(1, &self))?;
let intervals = seq
.next_element_seed(StateVecIntervalsDeWrapper(&lut_in))?
.ok_or_else(|| de::Error::invalid_length(2, &self))?;
Ok(StateVec { strokes, intervals })
}
}
deserializer.deserialize_tuple(3, StateVecVisitor)
}
}
impl StateVec {
......@@ -859,8 +1233,8 @@ pub trait EventListener {
fn on_stroke(&self, stroke: String, points: &VecMap<Point>);
fn on_interval(&self, stroke: String, intervals: &IntervalUnion<NotNan<f32>>);
fn on_deltas(&self, deltas: HashMap<StrokeID, StrokeDelta>);
fn on_deltas_from_state(&self, user: String, deltas: HashMap<StrokeID, StrokeDelta>);
fn on_deltas(&self, deltas: DeltaVec);
fn on_deltas_from_state(&self, user: String, deltas: DeltaVec);
}
pub struct CRDT {
......@@ -1262,10 +1636,14 @@ impl CRDT {
let mut deltas = HashMap::new();
std::mem::swap(&mut self.deltas, &mut deltas);
self.event_listener.on_deltas(deltas);
web_sys::console::log_1(&format!("temporal deltas: {:?}", deltas).into());
self.event_listener.on_deltas(DeltaVec(deltas));
}
pub fn apply_deltas(&mut self, deltas: HashMap<StrokeID, StrokeDelta>) {
pub fn apply_deltas(&mut self, deltas: DeltaVec) {
let deltas = deltas.0;
if deltas.is_empty() {
return;
};
......@@ -1370,8 +1748,8 @@ impl CRDT {
let mut deltas = HashMap::new();
web_sys::console::log_1(&format!("local {:?}", self.state).into());
web_sys::console::log_1(&format!("remote {:?}", remote_state).into());
web_sys::console::log_1(&format!("local state: {:?}", self.state).into());
web_sys::console::log_1(&format!("remote state: {:?}", remote_state).into());
for (user, strokes_state) in &self.state.strokes {
let strokes = &self.crdt.get(user).unwrap().strokes;
......@@ -1469,10 +1847,9 @@ impl CRDT {
}
}
web_sys::console::log_1(&format!("deltas {:?}", deltas).into());
web_sys::console::log_1(&format!("deltas from state: {:?}", deltas).into());
self.event_listener.on_deltas_from_state(user, deltas)
self.event_listener
.on_deltas_from_state(user, DeltaVec(deltas))
}
// TODO: implement serialisation/deserialisation for StateVec and DeltaVec (new type still needed) such that user UUIDs (incl. in stroke IDs) are encoded as (usize, u8) where the former is the index into a "VecSet" of canonical UUIDs and the latter is the modifier to the canonical ID
}
#![feature(type_ascription)]
use wasm_bindgen::prelude::*;
use js_sys::Error;
use ordered_float::NotNan;
use serde_wasm_bindgen::to_value as to_js_value;
use std::collections::HashMap;
use std::option::Option;
use vec_map::VecMap;
......@@ -14,7 +15,7 @@ static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
mod utils;
mod crdt;
use crdt::{EventListener, IntervalUnion, Point, StrokeDelta, StrokeID, CRDT};
use crdt::{DeltaVec, EventListener, IntervalUnion, Point, CRDT};
mod packing;
......@@ -45,11 +46,11 @@ impl EventListener for WasmEventListener {
self.on_interval(stroke, to_js_value(intervals).unwrap())
}
fn on_deltas(&self, deltas: HashMap<StrokeID, StrokeDelta>) {
fn on_deltas(&self, deltas: DeltaVec) {
self.on_deltas(packing::pack(&bincode::serialize(&deltas).unwrap()).into_boxed_slice())
}
fn on_deltas_from_state(&self, user: String, deltas: HashMap<StrokeID, StrokeDelta>) {
fn on_deltas_from_state(&self, user: String, deltas: DeltaVec) {
self.on_deltas_from_state(
user,
packing::pack(&bincode::serialize(&deltas).unwrap()).into_boxed_slice(),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment