Skip to content
Snippets Groups Projects
Commit 79a932fe authored by Moritz Langenstein's avatar Moritz Langenstein
Browse files

(ml5717) Added state vector for strokes

parent 90d6d488
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ serde_derive = "1.0.104" ...@@ -19,6 +19,7 @@ serde_derive = "1.0.104"
serde-wasm-bindgen = "0.1.3" serde-wasm-bindgen = "0.1.3"
vec_map = { path = "vec-map", features = ["serde"] } vec_map = { path = "vec-map", features = ["serde"] }
bincode = "1.2.1" bincode = "1.2.1"
# web-sys = { version = "0.3.33", features = ["console"] }
# The `console_error_panic_hook` crate provides better debugging of panics by # The `console_error_panic_hook` crate provides better debugging of panics by
# logging them with `console.error`. This is great for development, but requires # logging them with `console.error`. This is great for development, but requires
......
...@@ -7,6 +7,7 @@ use std::ops::{Deref, DerefMut}; ...@@ -7,6 +7,7 @@ use std::ops::{Deref, DerefMut};
use uuid::Uuid; use uuid::Uuid;
use vec_map::VecMap; use vec_map::VecMap;
#[derive(Debug)]
struct Dirty<T> { struct Dirty<T> {
value: T, value: T,
dirty: bool, dirty: bool,
...@@ -44,7 +45,7 @@ impl<T> Dirty<T> { ...@@ -44,7 +45,7 @@ impl<T> Dirty<T> {
} }
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct Point { pub struct Point {
x: i32, x: i32,
y: i32, y: i32,
...@@ -152,11 +153,11 @@ impl IntervalBound for f32 { ...@@ -152,11 +153,11 @@ impl IntervalBound for f32 {
fn max(a: f32, b: f32) -> f32 { fn max(a: f32, b: f32) -> f32 {
f32::max(a, b) f32::max(a, b)
} }
fn up(self) -> f32 { fn up(self) -> f32 {
self self
} }
fn down(self) -> f32 { fn down(self) -> f32 {
self self
} }
...@@ -166,18 +167,18 @@ impl IntervalBound for usize { ...@@ -166,18 +167,18 @@ impl IntervalBound for usize {
fn max(a: usize, b: usize) -> usize { fn max(a: usize, b: usize) -> usize {
cmp::max(a, b) cmp::max(a, b)
} }
fn up(self) -> usize { fn up(self) -> usize {
self + 1 self + 1
} }
fn down(self) -> usize { fn down(self) -> usize {
self - 1 self - 1
} }
} }
#[derive(Copy, Clone, PartialEq)] #[derive(Copy, Clone, PartialEq, Debug)]
struct Interval<T> pub struct Interval<T>
where where
T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, T: Copy + Clone + PartialEq + PartialOrd + IntervalBound,
{ {
...@@ -194,6 +195,7 @@ where ...@@ -194,6 +195,7 @@ where
} }
} }
#[derive(Clone, Debug)]
pub struct IntervalUnion<T>(Vec<Interval<T>>) pub struct IntervalUnion<T>(Vec<Interval<T>>)
where where
T: Copy + Clone + PartialEq + PartialOrd + IntervalBound; T: Copy + Clone + PartialEq + PartialOrd + IntervalBound;
...@@ -341,9 +343,9 @@ where ...@@ -341,9 +343,9 @@ where
fn difference(&mut self, other: &IntervalUnion<T>) { fn difference(&mut self, other: &IntervalUnion<T>) {
let mut si = 0; let mut si = 0;
let mut oi = 0; let mut oi = 0;
let mut intervals: Vec<Interval<T>> = Vec::new(); let mut intervals: Vec<Interval<T>> = Vec::new();
while si < self.0.len() && oi < other.0.len() { while si < self.0.len() && oi < other.0.len() {
if other.0[oi].to < self.0[si].from { if other.0[oi].to < self.0[si].from {
oi += 1; oi += 1;
...@@ -369,15 +371,28 @@ where ...@@ -369,15 +371,28 @@ where
si += 1; si += 1;
} }
} }
if si < self.0.len() { if si < self.0.len() {
intervals.extend_from_slice(&self.0[si..]) intervals.extend_from_slice(&self.0[si..])
}; };
self.0 = intervals; self.0 = intervals;
} }
} }
impl<T> IntoIterator for IntervalUnion<T>
where
T: Copy + Clone + PartialEq + PartialOrd + IntervalBound,
{
type Item = Interval<T>;
type IntoIter = std::vec::IntoIter<Self::Item>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl<T> From<Interval<T>> for IntervalUnion<T> impl<T> From<Interval<T>> for IntervalUnion<T>
where where
T: Copy + Clone + PartialEq + PartialOrd + IntervalBound, T: Copy + Clone + PartialEq + PartialOrd + IntervalBound,
...@@ -387,22 +402,26 @@ where ...@@ -387,22 +402,26 @@ where
} }
} }
#[derive(Debug)]
struct Stroke { struct Stroke {
index: usize,
points: Dirty<VecMap<Point>>, points: Dirty<VecMap<Point>>,
intervals: Dirty<IntervalUnion<f32>>, intervals: Dirty<IntervalUnion<f32>>,
} }
impl Stroke { impl Stroke {
fn new() -> Stroke { fn new(index: usize) -> Stroke {
Stroke { Stroke {
index,
points: Dirty::new(VecMap::new(), true), points: Dirty::new(VecMap::new(), true),
intervals: Dirty::new(IntervalUnion::new(), true), intervals: Dirty::new(IntervalUnion::new(), true),
} }
} }
} }
#[derive(Debug)]
pub struct StrokeDelta { pub struct StrokeDelta {
points: VecMap<Point>, points: HashMap<usize, Point>,
intervals: IntervalUnion<f32>, intervals: IntervalUnion<f32>,
} }
...@@ -456,25 +475,32 @@ impl<'de> Deserialize<'de> for StrokeDelta { ...@@ -456,25 +475,32 @@ impl<'de> Deserialize<'de> for StrokeDelta {
impl StrokeDelta { impl StrokeDelta {
fn new() -> StrokeDelta { fn new() -> StrokeDelta {
StrokeDelta { StrokeDelta {
points: VecMap::new(), points: HashMap::new(),
intervals: IntervalUnion::new(), intervals: IntervalUnion::new(),
} }
} }
} }
#[derive(Debug)]
struct User { struct User {
strokes: VecMap<Stroke>, strokes: Vec<Stroke>,
} }
impl User { impl User {
fn new() -> User { fn new() -> User {
User { User {
strokes: VecMap::new(), strokes: Vec::new(),
} }
} }
fn get_stroke_idx(&self, index: usize) -> Option<usize> {
self.strokes
.binary_search_by_key(&index, |stroke| stroke.index)
.ok()
}
} }
#[derive(Hash, Eq, PartialEq)] #[derive(Hash, Eq, PartialEq, Debug)]
pub struct StrokeID(u128, usize); pub struct StrokeID(u128, usize);
impl Serialize for StrokeID { impl Serialize for StrokeID {
...@@ -534,16 +560,69 @@ impl StrokeID { ...@@ -534,16 +560,69 @@ impl StrokeID {
} }
} }
#[derive(Debug)]
pub struct StateVec { pub struct StateVec {
strokes: HashMap<u128, IntervalUnion<usize>>, strokes: HashMap<u128, IntervalUnion<usize>>,
//intervals: HashMap<>, //intervals: HashMap<>,
} }
impl Serialize for StateVec {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut tuple = serializer.serialize_tuple(1)?;
tuple.serialize_element(&self.strokes)?;
tuple.end()
}
}
impl<'de> Deserialize<'de> for StateVec {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct StateVecVisitor;
impl<'de> Visitor<'de> for StateVecVisitor {
type Value = StateVec;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("struct StateVec")
}
fn visit_seq<V>(self, mut seq: V) -> Result<Self::Value, V::Error>
where
V: SeqAccess<'de>,
{
let strokes = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(0, &self))?;
Ok(StateVec { strokes })
}
}
deserializer.deserialize_tuple(1, StateVecVisitor)
}
}
impl StateVec {
fn new() -> StateVec {
StateVec {
strokes: HashMap::new(),
}
}
}
pub trait EventListener { pub trait EventListener {
fn on_stroke(&self, stroke: String, points: &VecMap<Point>); fn on_stroke(&self, stroke: String, points: &VecMap<Point>);
fn on_interval(&self, stroke: String, intervals: &IntervalUnion<f32>); fn on_interval(&self, stroke: String, intervals: &IntervalUnion<f32>);
fn on_deltas(&self, deltas: &HashMap<StrokeID, StrokeDelta>); fn on_deltas(&self, deltas: HashMap<StrokeID, StrokeDelta>);
fn on_deltas_from_state(&self, user: String, deltas: HashMap<StrokeID, StrokeDelta>);
} }
pub struct CRDT { pub struct CRDT {
...@@ -605,11 +684,16 @@ impl CRDT { ...@@ -605,11 +684,16 @@ impl CRDT {
.crdt .crdt
.entry(user) .entry(user)
.or_insert_with(|| Dirty::new(User::new(), true)); .or_insert_with(|| Dirty::new(User::new(), true));
let mut stroke = Stroke::new();
let stroke_idx = match entry.strokes.last() {
Some(stroke) => stroke.index + stroke.points.vec_len(),
None => 0,
};
let mut stroke = Stroke::new(stroke_idx);
stroke.points.push(point.clone()); stroke.points.push(point.clone());
entry.strokes.push(stroke);
let stroke_idx = entry.strokes.len() - 1; entry.strokes.push(stroke);
// Implicit call to stroke.points.dirty() in Stroke::new() // Implicit call to stroke.points.dirty() in Stroke::new()
entry.dirty(); entry.dirty();
...@@ -624,7 +708,7 @@ impl CRDT { ...@@ -624,7 +708,7 @@ impl CRDT {
Some(CRDT::uuid_to_string(user) + "-" + &stroke_idx.to_string()) Some(CRDT::uuid_to_string(user) + "-" + &stroke_idx.to_string())
} }
fn get_user_mut(&mut self, stroke_id: &str) -> Option<(&mut Dirty<User>, u128, usize)> { fn split_stroke_id(&self, stroke_id: &str) -> Option<(&Dirty<User>, u128, usize)> {
let (author, stroke) = match stroke_id.rfind('-') { let (author, stroke) = match stroke_id.rfind('-') {
Some(split) => stroke_id.split_at(split), Some(split) => stroke_id.split_at(split),
None => return None, None => return None,
...@@ -635,7 +719,7 @@ impl CRDT { ...@@ -635,7 +719,7 @@ impl CRDT {
None => return None, None => return None,
}; };
let entry = match self.crdt.get_mut(&author) { let entry = match self.crdt.get(&author) {
Some(entry) => entry, Some(entry) => entry,
None => return None, None => return None,
}; };
...@@ -646,22 +730,24 @@ impl CRDT { ...@@ -646,22 +730,24 @@ impl CRDT {
} }
} }
fn get_stroke(&self, stroke_id: &str) -> Option<&Stroke> { fn split_stroke_id_mut(&mut self, stroke_id: &str) -> Option<(&mut Dirty<User>, u128, usize)> {
let (author, stroke) = match stroke_id.rfind('-') { let (author, stroke) = match stroke_id.rfind('-') {
Some(split) => stroke_id.split_at(split), Some(split) => stroke_id.split_at(split),
None => return None, None => return None,
}; };
let entry = match CRDT::string_to_uuid(author) let author = match CRDT::string_to_uuid(author) {
.as_ref() Some(author) => author,
.and_then(move |author| self.crdt.get(author)) None => return None,
{ };
let entry = match self.crdt.get_mut(&author) {
Some(entry) => entry, Some(entry) => entry,
None => return None, None => return None,
}; };
match stroke[1..].parse::<usize>() { match stroke[1..].parse::<usize>() {
Ok(stroke) => entry.strokes.get(stroke), Ok(stroke_idx) => Some((entry, author, stroke_idx)),
_ => None, _ => None,
} }
} }
...@@ -674,12 +760,17 @@ impl CRDT { ...@@ -674,12 +760,17 @@ impl CRDT {
weight: f32, weight: f32,
colour: &str, colour: &str,
) -> bool { ) -> bool {
let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) { let (entry, user, stroke_idx) = match self.split_stroke_id_mut(stroke_id) {
Some(stroke) => stroke, Some(stroke) => stroke,
None => return false, None => return false,
}; };
let stroke = match entry.strokes.get_mut(stroke_idx) { let idx = match entry.get_stroke_idx(stroke_idx) {
Some(idx) if idx == (entry.strokes.len() - 1) => idx,
_ => return false,
};
let stroke = match entry.strokes.get_mut(idx) {
Some(stroke) => stroke, Some(stroke) => stroke,
_ => return false, _ => return false,
}; };
...@@ -689,7 +780,8 @@ impl CRDT { ...@@ -689,7 +780,8 @@ impl CRDT {
None => return false, None => return false,
}; };
let point_idx = stroke.points.push(point.clone()) - 1; stroke.points.push(point.clone());
let point_idx = stroke.points.vec_len() - 1;
stroke.points.dirty(); stroke.points.dirty();
entry.dirty(); entry.dirty();
...@@ -709,12 +801,17 @@ impl CRDT { ...@@ -709,12 +801,17 @@ impl CRDT {
return false; return false;
}; };
let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) { let (entry, user, stroke_idx) = match self.split_stroke_id_mut(stroke_id) {
Some(stroke) => stroke, Some(stroke) => stroke,
None => return false, None => return false,
}; };
let stroke = match entry.strokes.get_mut(stroke_idx) { let idx = match entry.get_stroke_idx(stroke_idx) {
Some(idx) => idx,
_ => return false,
};
let stroke = match entry.strokes.get_mut(idx) {
Some(stroke) => stroke, Some(stroke) => stroke,
_ => return false, _ => return false,
}; };
...@@ -736,12 +833,34 @@ impl CRDT { ...@@ -736,12 +833,34 @@ impl CRDT {
true true
} }
pub fn get_stroke_points(&self, stroke: &str) -> Option<&VecMap<Point>> { pub fn get_stroke_points(&self, stroke_id: &str) -> Option<&VecMap<Point>> {
self.get_stroke(stroke).map(|stroke| stroke.points.deref()) let (entry, _user, stroke_idx) = match self.split_stroke_id(stroke_id) {
Some(stroke) => stroke,
None => return None,
};
let idx = match entry.get_stroke_idx(stroke_idx) {
Some(idx) => idx,
None => return None,
};
entry.strokes.get(idx).map(|stroke| stroke.points.deref())
} }
pub fn get_stroke_intervals(&self, stroke: &str) -> Option<&IntervalUnion<f32>> { pub fn get_stroke_intervals(&self, stroke_id: &str) -> Option<&IntervalUnion<f32>> {
self.get_stroke(stroke) let (entry, _user, stroke_idx) = match self.split_stroke_id(stroke_id) {
Some(stroke) => stroke,
None => return None,
};
let idx = match entry.get_stroke_idx(stroke_idx) {
Some(idx) => idx,
_ => return None,
};
entry
.strokes
.get(idx)
.map(|stroke| stroke.intervals.deref()) .map(|stroke| stroke.intervals.deref())
} }
...@@ -755,7 +874,7 @@ impl CRDT { ...@@ -755,7 +874,7 @@ impl CRDT {
let mut stroke_events = Vec::new(); let mut stroke_events = Vec::new();
let mut interval_events = Vec::new(); let mut interval_events = Vec::new();
self.crdt.iter_mut().for_each(|(user, entry)| { for (user, entry) in self.crdt.deref_mut() {
if !entry.is_dirty() { if !entry.is_dirty() {
return; return;
}; };
...@@ -764,7 +883,7 @@ impl CRDT { ...@@ -764,7 +883,7 @@ impl CRDT {
let user = CRDT::uuid_to_string(*user) + "-"; let user = CRDT::uuid_to_string(*user) + "-";
entry.strokes.iter_mut().for_each(|(i, stroke)| { for (i, stroke) in entry.strokes.iter_mut().enumerate() {
if stroke.points.is_dirty() { if stroke.points.is_dirty() {
stroke.points.clean(); stroke.points.clean();
stroke_events.push((user.clone() + &i.to_string(), stroke.points.deref())); stroke_events.push((user.clone() + &i.to_string(), stroke.points.deref()));
...@@ -774,8 +893,8 @@ impl CRDT { ...@@ -774,8 +893,8 @@ impl CRDT {
stroke.intervals.clean(); stroke.intervals.clean();
interval_events.push((user.clone() + &i.to_string(), stroke.intervals.deref())); interval_events.push((user.clone() + &i.to_string(), stroke.intervals.deref()));
} }
}); }
}); }
for (user, points) in stroke_events { for (user, points) in stroke_events {
self.event_listener.on_stroke(user, points); self.event_listener.on_stroke(user, points);
...@@ -791,9 +910,12 @@ impl CRDT { ...@@ -791,9 +910,12 @@ impl CRDT {
return; return;
}; };
self.event_listener.on_deltas(&self.deltas); let mut deltas = HashMap::new();
std::mem::swap(&mut self.deltas, &mut deltas);
//web_sys::console::log_1(&format!("deltas {:?}", deltas).into());
self.deltas.clear(); self.event_listener.on_deltas(deltas);
} }
pub fn apply_deltas(&mut self, deltas: HashMap<StrokeID, StrokeDelta>) { pub fn apply_deltas(&mut self, deltas: HashMap<StrokeID, StrokeDelta>) {
...@@ -803,32 +925,152 @@ impl CRDT { ...@@ -803,32 +925,152 @@ impl CRDT {
self.crdt.dirty(); self.crdt.dirty();
deltas for (stroke_id, stroke_delta) in deltas {
.into_iter() let entry = self
.for_each(|(stroke_id, mut stroke_delta)| { .crdt
let entry = self .entry(stroke_id.0)
.crdt .or_insert_with(|| Dirty::new(User::new(), true));
.entry(stroke_id.0)
.or_insert_with(|| Dirty::new(User::new(), true)); entry.dirty();
let stroke = match entry
.strokes
.binary_search_by_key(&stroke_id.1, |stroke| stroke.index)
{
Ok(idx) => &mut entry.strokes[idx],
Err(idx) => {
entry.strokes.insert(idx, Stroke::new(stroke_id.1));
&mut entry.strokes[idx]
}
};
if !stroke_delta.points.is_empty() {
stroke.points.dirty();
entry.dirty(); stroke.points.extend(stroke_delta.points.into_iter());
}
let stroke = entry.strokes.entry(stroke_id.1).or_insert_with(Stroke::new); if stroke.intervals.union(&stroke_delta.intervals) {
stroke.intervals.dirty()
};
}
}
if !stroke_delta.points.is_empty() { pub fn get_state_vector(&self) -> StateVec {
stroke.points.dirty(); let mut state = StateVec::new();
stroke.points.append(&mut stroke_delta.points); state.strokes.reserve(self.crdt.len());
for (user, entry) in self.crdt.deref() {
let mut point_intervals: Vec<Interval<usize>> = Vec::new();
for stroke in &entry.strokes {
for point_idx in stroke.points.keys() {
match point_intervals.last_mut() {
Some(interval) if interval.to.up() == (stroke.index + point_idx) => {
interval.to = stroke.index + point_idx
}
_ => point_intervals.push(Interval::new(
stroke.index + point_idx,
stroke.index + point_idx,
)),
};
} }
}
if stroke.intervals.union(&stroke_delta.intervals) { state.strokes.insert(*user, IntervalUnion(point_intervals));
stroke.intervals.dirty() }
};
}); state
} }
pub fn get_state_vector(&self) {} pub fn fetch_deltas_from_state_vector(&self, user: String, remote_state: &StateVec) {
let mut deltas = HashMap::new();
//web_sys::console::log_1(&format!("remote state: {:?}", remote_state).into());
//web_sys::console::log_1(&format!("local state: {:?}", self.get_state_vector()).into());
for (user, strokes_state) in &self.get_state_vector().strokes {
let strokes = &self.crdt.get(user).unwrap().strokes;
if let Some(remote_strokes_state) = remote_state.strokes.get(user) {
//web_sys::console::log_1(&format!("remote strokes state: {:?}", remote_strokes_state).into());
//web_sys::console::log_1(&format!("local strokes state: {:?}", strokes_state).into());
let mut difference = (*strokes_state).clone();
difference.difference(remote_strokes_state);
//web_sys::console::log_1(&format!("strokes state difference: {:?}", difference).into());
if difference.is_empty() {
continue;
};
for interval in difference {
loop {
let stroke_idx = match strokes
.binary_search_by_key(&interval.from, |stroke| stroke.index)
{
Ok(stroke_idx) => stroke_idx,
Err(after_idx) => after_idx - 1,
};
let stroke = strokes.get(stroke_idx).unwrap();
let points_len = stroke.points.vec_len();
//web_sys::console::log_1(&format!("stroke_idx {:?}", stroke_idx).into());
//web_sys::console::log_1(&format!("stroke {:?}", stroke).into());
//web_sys::console::log_1(&format!("points_len {:?}", points_len).into());
let mut stroke_delta = StrokeDelta::new();
stroke_delta
.points
.reserve(cmp::min(points_len, interval.to - interval.from + 1));
//web_sys::console::log_1(&format!("point_idx range {:?}", (interval.from - stroke.index)..=cmp::min(interval.to - stroke.index, points_len - 1)).into());
for point_idx in (interval.from - stroke.index)
..=cmp::min(interval.to - stroke.index, points_len - 1)
{
stroke_delta
.points
.insert(point_idx, stroke.points.get(point_idx).unwrap().clone());
}
deltas.insert(StrokeID::new(*user, stroke.index), stroke_delta);
if interval.to < (stroke.index + points_len) {
break;
};
}
}
} else {
for stroke in strokes {
let len = stroke.points.len();
if len == 0 {
continue;
};
let mut stroke_delta = StrokeDelta::new();
stroke_delta.points.reserve(len);
for (point_idx, point) in stroke.points.deref() {
stroke_delta.points.insert(point_idx, point.clone());
}
deltas.insert(StrokeID::new(*user, stroke.index), stroke_delta);
}
}
}
//web_sys::console::log_1(&format!("deltas {:?}", deltas).into());
self.event_listener.on_deltas_from_state(user, deltas)
}
// TODO: add endStroke() method that finishes the named stroke // TODO: add endStroke() method that finishes the named stroke
// TODO: update addStroke() to use a new username (XOR variant + version bits in UUID) if last stroke still unfinished // TODO: update addStroke() to use a new username (XOR variant + version bits in UUID) if last stroke still unfinished
// TODO: keep track of users with unfinished strokes to use "fake" usernames optimally // TODO: keep track of users with unfinished strokes to use "fake" usernames optimally
// TODO: intervals in StateVec
} }
...@@ -28,6 +28,9 @@ extern "C" { ...@@ -28,6 +28,9 @@ extern "C" {
#[wasm_bindgen(structural, method)] #[wasm_bindgen(structural, method)]
pub fn on_deltas(this: &WasmEventListener, deltas: Box<[u8]>); pub fn on_deltas(this: &WasmEventListener, deltas: Box<[u8]>);
#[wasm_bindgen(structural, method)]
pub fn on_deltas_from_state(this: &WasmEventListener, user: String, deltas: Box<[u8]>);
} }
impl EventListener for WasmEventListener { impl EventListener for WasmEventListener {
...@@ -39,8 +42,15 @@ impl EventListener for WasmEventListener { ...@@ -39,8 +42,15 @@ impl EventListener for WasmEventListener {
self.on_interval(stroke, to_js_value(intervals).unwrap()) self.on_interval(stroke, to_js_value(intervals).unwrap())
} }
fn on_deltas(&self, deltas: &HashMap<StrokeID, StrokeDelta>) { fn on_deltas(&self, deltas: HashMap<StrokeID, StrokeDelta>) {
self.on_deltas(bincode::serialize(deltas).unwrap().into_boxed_slice()) self.on_deltas(bincode::serialize(&deltas).unwrap().into_boxed_slice())
}
fn on_deltas_from_state(&self, user: String, deltas: HashMap<StrokeID, StrokeDelta>) {
self.on_deltas_from_state(
user,
bincode::serialize(&deltas).unwrap().into_boxed_slice(),
)
} }
} }
...@@ -78,8 +88,8 @@ impl WasmCRDT { ...@@ -78,8 +88,8 @@ impl WasmCRDT {
self.0.erase_stroke(stroke, from, to) self.0.erase_stroke(stroke, from, to)
} }
pub fn get_stroke_points(&self, stroke: &str) -> Result<JsValue, JsValue> { pub fn get_stroke_points(&self, stroke_id: &str) -> Result<JsValue, JsValue> {
let points = match self.0.get_stroke_points(stroke) { let points = match self.0.get_stroke_points(stroke_id) {
Some(points) => points, Some(points) => points,
None => return Err(Error::new("Unknown stroke ID").into()), None => return Err(Error::new("Unknown stroke ID").into()),
}; };
...@@ -87,8 +97,8 @@ impl WasmCRDT { ...@@ -87,8 +97,8 @@ impl WasmCRDT {
Ok(to_js_value(points)?) Ok(to_js_value(points)?)
} }
pub fn get_stroke_intervals(&self, stroke: &str) -> Result<JsValue, JsValue> { pub fn get_stroke_intervals(&self, stroke_id: &str) -> Result<JsValue, JsValue> {
let intervals = match self.0.get_stroke_intervals(stroke) { let intervals = match self.0.get_stroke_intervals(stroke_id) {
Some(intervals) => intervals, Some(intervals) => intervals,
None => return Err(Error::new("Unknown stroke ID").into()), None => return Err(Error::new("Unknown stroke ID").into()),
}; };
...@@ -112,4 +122,23 @@ impl WasmCRDT { ...@@ -112,4 +122,23 @@ impl WasmCRDT {
Ok(self.0.apply_deltas(deltas)) Ok(self.0.apply_deltas(deltas))
} }
pub fn get_state_vector(&self) -> Box<[u8]> {
bincode::serialize(&self.0.get_state_vector())
.unwrap()
.into_boxed_slice()
}
pub fn fetch_deltas_from_state_vector(
&self,
user: String,
remote_state: Box<[u8]>,
) -> Result<(), JsValue> {
let remote_state = match bincode::deserialize(&remote_state) {
Ok(remote_state) => remote_state,
Err(error) => return Err(Error::new(&format!("{:?}", error)).into()),
};
Ok(self.0.fetch_deltas_from_state_vector(user, &remote_state))
}
} }
Subproject commit 9fd05f23024ef98ebaae809f31b30f9c2c644707 Subproject commit 80fab04b18e020ccb645947dc026915b4503886d
...@@ -13,6 +13,9 @@ const crdt = new wasm.WasmCRDT({ ...@@ -13,6 +13,9 @@ const crdt = new wasm.WasmCRDT({
console.log("on_deltas:", deltas) console.log("on_deltas:", deltas)
broadcasts.push(deltas) broadcasts.push(deltas)
}, },
on_deltas_from_state: (user, deltas) => {
console.log("on_deltas_from_state:", user, deltas)
},
}) })
crdt.set_user("36577c51-a80b-47d6-b3c3-cfb11f705b87") crdt.set_user("36577c51-a80b-47d6-b3c3-cfb11f705b87")
...@@ -51,6 +54,9 @@ const crdt2 = new wasm.WasmCRDT({ ...@@ -51,6 +54,9 @@ const crdt2 = new wasm.WasmCRDT({
on_deltas: (deltas) => { on_deltas: (deltas) => {
console.log("on_deltas:2", deltas) console.log("on_deltas:2", deltas)
}, },
on_deltas_from_state: (user, deltas) => {
console.log("on_deltas_from_state:", user, deltas)
},
}) })
console.log("pre apply 2nd deltas + fetch events") console.log("pre apply 2nd deltas + fetch events")
...@@ -58,7 +64,16 @@ crdt2.apply_deltas(broadcasts[1]) ...@@ -58,7 +64,16 @@ crdt2.apply_deltas(broadcasts[1])
crdt2.fetch_events() crdt2.fetch_events()
console.log("post apply 2nd deltas + fetch events") console.log("post apply 2nd deltas + fetch events")
console.log("pre apply 1st deltas + fetch events") console.log("CRDT StateVec", crdt.get_state_vector())
const state2 = crdt2.get_state_vector()
console.log("CRDT2 StateVec", state2)
console.log("pre fetch deltas from 1 for 2")
crdt.fetch_deltas_from_state_vector("moritz", state2)
console.log("post fetch deltas from 1 for 2")
/*console.log("pre apply 1st deltas + fetch events")
crdt2.apply_deltas(broadcasts[0]) crdt2.apply_deltas(broadcasts[0])
crdt2.fetch_events() crdt2.fetch_events()
console.log("post apply 1st deltas + fetch events") console.log("post apply 1st deltas + fetch events")*/
\ No newline at end of file \ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment