Skip to content
Snippets Groups Projects
Commit 4945a43d authored by Moritz Langenstein's avatar Moritz Langenstein
Browse files

(ml5717) Initial basic Rust+WASM CRDT with user update, user events, simple...

(ml5717) Initial basic Rust+WASM CRDT with user update, user events, simple delta broadcast generation
parents
No related branches found
No related tags found
No related merge requests found
**/.ipynb_checkpoints/
/target
**/*.rs.bk
Cargo.lock
bin/
pkg/
wasm-pack.log
www/node_modules/
[package]
name = "drawing-crdt"
version = "0.1.0"
authors = ["Moritz Langenstein <ml5717@ic.ac.uk>"]
edition = "2018"
build = "build.rs"
[build-dependencies]
capnpc = "0.11.1"
[lib]
crate-type = ["cdylib", "rlib"]
[features]
default = ["console_error_panic_hook"]
[dependencies]
wasm-bindgen = "0.2"
js-sys = "0.3.33"
uuid = "0.8.1"
capnp = "0.11.1"
serde = "1.0.104"
serde_derive = "1.0.104"
serde-wasm-bindgen = "0.1.3"
# The `console_error_panic_hook` crate provides better debugging of panics by
# logging them with `console.error`. This is great for development, but requires
# all the `std::fmt` and `std::panicking` infrastructure, so isn't great for
# code size when deploying.
console_error_panic_hook = { version = "0.1.1", optional = true }
# `wee_alloc` is a tiny allocator for wasm that is only ~1K in code size
# compared to the default allocator's ~10K. It is slower than the default
# allocator, however.
#
# Unfortunately, `wee_alloc` requires nightly Rust when targeting wasm for now.
wee_alloc = { version = "0.4.2", optional = true }
[dev-dependencies]
wasm-bindgen-test = "0.2"
[profile.release]
# Tell `rustc` to optimize for small code size.
opt-level = "s"
build.rs 0 → 100644
extern crate capnpc;
fn main() {
capnpc::CompilerCommand::new()
.src_prefix("schema")
.file("schema/crdt.capnp")
.run()
.expect("Compiling Cap'n Proto schema");
}
\ No newline at end of file
@0x8e28fa32ad5a1cb1; # unique file ID, generated by `capnp id`
struct Point {
x @0 :Int32;
y @1 :Int32;
width @2 :Float32;
colour @3 :UInt32;
}
struct Stroke {
points @0 :List(Point);
}
use std::ops::{Deref, DerefMut};
use std::collections::HashMap;
use serde::ser::{Serialize, Serializer, SerializeStruct};
use uuid::Uuid;
struct Dirty<T> {
value: T,
dirty: bool
}
impl<T> Deref for Dirty<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<T> DerefMut for Dirty<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
}
impl<T> Dirty<T> {
pub fn new(value: T, dirty: bool) -> Dirty<T> {
Dirty {
value,
dirty,
}
}
pub fn is_dirty(&self) -> bool {
self.dirty
}
pub fn clean(&mut self) {
self.dirty = false
}
pub fn dirty(&mut self) {
self.dirty = true
}
}
#[derive(Clone)]
pub struct Point {
x: i32,
y: i32,
weight: f32,
colour: u32,
}
impl Serialize for Point {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// There are four fields inside the struct
let mut state = serializer.serialize_struct("Point", 4)?;
state.serialize_field("x", &self.x)?;
state.serialize_field("y", &self.y)?;
state.serialize_field("weight", &self.weight)?;
state.serialize_field("colour", &format!("{:06X}", self.colour))?;
state.end()
}
}
impl Point {
fn try_new(x: i32, y: i32, weight: f32, colour: &str) -> Option<Point> {
match u32::from_str_radix(colour, 16) {
Ok(colour) => Some(Point { x, y, weight, colour }),
Err(_) => None,
}
}
}
#[derive(Copy, Clone, PartialEq, Serialize)]
struct Interval {
from: f32,
to: f32,
}
impl Interval {
fn new(from: f32, to: f32) -> Interval {
Interval{
from,
to,
}
}
}
#[derive(Serialize)]
pub struct IntervalUnion(Vec<Interval>);
impl IntervalUnion {
fn new() -> IntervalUnion {
IntervalUnion(Vec::new())
}
fn union(&mut self, other: &IntervalUnion) -> bool {
if other.0.is_empty() {
return false
};
let mut si = 0;
let mut oi = 0;
let mut intervals: Vec<Interval> = Vec::new();
while si < self.0.len() && oi < other.0.len() {
let interval = if self.0[si].from < other.0[oi].from {
let interval = self.0[si];
si += 1;
interval
} else {
let interval = other.0[oi];
oi += 1;
interval
};
match intervals.last_mut() {
Some(top) => {
if interval.from <= top.to {
top.to = f32::max(top.to, interval.to)
} else {
intervals.push(interval)
}
},
None => intervals.push(interval),
};
}
let (interval, tail) = if si < self.0.len() {
(self.0[si], &self.0[si+1..])
} else {
(other.0[oi], &other.0[oi+1..])
};
match intervals.last_mut() {
Some(top) => {
if interval.from <= top.to {
top.to = f32::max(top.to, interval.to)
} else {
intervals.push(interval)
}
},
None => intervals.push(interval),
};
intervals.extend_from_slice(tail);
let changed = intervals != self.0;
self.0 = intervals;
changed
}
}
impl From<Interval> for IntervalUnion {
fn from(interval: Interval) -> Self {
IntervalUnion(vec![interval])
}
}
struct Stroke {
points: Dirty<Vec<Option<Point>>>,
intervals: Dirty<IntervalUnion>,
}
impl Stroke {
fn new() -> Stroke {
Stroke {
points: Dirty::new(Vec::new(), true),
intervals: Dirty::new(IntervalUnion::new(), true),
}
}
}
#[derive(Serialize)]
pub struct StrokeDelta {
points: Vec<(usize, Point)>,
intervals: IntervalUnion,
}
impl StrokeDelta {
fn new() -> StrokeDelta {
StrokeDelta {
points: Vec::new(),
intervals: IntervalUnion::new(),
}
}
}
struct User {
strokes: Vec<Stroke>,
}
impl User {
fn new() -> User {
User {
strokes: Vec::new(),
}
}
}
#[derive(Hash, Eq, PartialEq)]
pub struct StrokeID(u128, usize);
impl Serialize for StrokeID {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&(CRDT::uuid_to_string(self.0) + "-" + &self.1.to_string()))
}
}
impl StrokeID {
fn new(user: u128, stroke_idx: usize) -> StrokeID {
StrokeID(user, stroke_idx)
}
}
pub trait EventListener {
fn on_stroke(&self, stroke: String, points: &Vec<Option<Point>>);
fn on_interval(&self, stroke: String, intervals: &IntervalUnion);
fn on_broadcast(&self, deltas: &HashMap<StrokeID, StrokeDelta>);
}
pub struct CRDT {
user: Option<u128>,
crdt: Dirty<HashMap<u128, Dirty<User>>>,
deltas: HashMap<StrokeID, StrokeDelta>,
event_listener: Box<dyn EventListener>,
}
impl CRDT {
pub fn new(event_listener: Box<dyn EventListener>) -> CRDT {
CRDT {
user: None,
crdt: Dirty::new(HashMap::new(), false),
deltas: HashMap::new(),
event_listener,
}
}
pub fn set_user(&mut self, user: &str) -> bool {
self.user = CRDT::string_to_uuid(user);
self.user.is_some()
}
fn string_to_uuid(uuid: &str) -> Option<u128> {
match Uuid::parse_str(uuid) {
Ok(uuid) => Some(uuid.as_u128()),
Err(_) => None,
}
}
fn uuid_to_string(uuid: u128) -> String {
Uuid::from_u128(uuid)
.to_hyphenated()
.encode_lower(&mut Uuid::encode_buffer())
.to_string()
}
pub fn get_user(&self) -> Option<String> {
self.user.map(CRDT::uuid_to_string)
}
pub fn add_stroke(&mut self, x: i32, y: i32, weight: f32, colour: &str) -> Option<String> {
let user = match self.user {
Some(user) => user,
None => return None,
};
let point = match Point::try_new(x, y, weight, colour) {
Some(point) => point,
None => return None,
};
let entry = self.crdt.entry(user).or_insert_with(|| Dirty::new(User::new(), true));
let mut stroke = Stroke::new();
stroke.points.push(Some(point.clone()));
entry.strokes.push(stroke);
let stroke_idx = entry.strokes.len() - 1;
// Implicit call to stroke.points.dirty() in Stroke::new()
entry.dirty();
self.crdt.dirty();
let delta = self.deltas.entry(StrokeID::new(user, stroke_idx)).or_insert_with(StrokeDelta::new);
delta.points.push((0, point));
Some(CRDT::uuid_to_string(user) + "-" + &stroke_idx.to_string())
}
fn get_user_mut(&mut self, stroke_id: &str) -> Option<(&mut Dirty<User>, u128, usize)> {
let (author, stroke) = match stroke_id.rfind('-') {
Some(split) => stroke_id.split_at(split),
None => return None,
};
let author = match CRDT::string_to_uuid(author) {
Some(author) => author,
None => return None
};
let entry = match self.crdt.get_mut(&author) {
Some(entry) => entry,
None => return None,
};
match stroke[1..].parse::<usize>() {
Ok(stroke_idx) => Some((entry, author, stroke_idx)),
_ => None,
}
}
fn get_stroke(&self, stroke_id: &str) -> Option<&Stroke> {
let (author, stroke) = match stroke_id.rfind('-') {
Some(split) => stroke_id.split_at(split),
None => return None,
};
let entry = match CRDT::string_to_uuid(author).as_ref().and_then(move |author| self.crdt.get(author)) {
Some(entry) => entry,
None => return None,
};
match stroke[1..].parse::<usize>() {
Ok(stroke) if stroke < entry.strokes.len() => Some(&entry.strokes[stroke]),
_ => None,
}
}
pub fn add_point(&mut self, stroke_id: &str, x: i32, y: i32, weight: f32, colour: &str) -> bool {
let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) {
Some(stroke) => stroke,
None => return false,
};
let stroke = match entry.strokes.get_mut(stroke_idx) {
Some(stroke) => stroke,
None => return false,
};
let point = match Point::try_new(x, y, weight, colour) {
Some(point) => point,
None => return false,
};
stroke.points.push(Some(point.clone()));
let point_idx = stroke.points.len() - 1;
stroke.points.dirty();
entry.dirty();
self.crdt.dirty();
let delta = self.deltas.entry(StrokeID::new(user, stroke_idx)).or_insert_with(StrokeDelta::new);
delta.points.push((point_idx, point));
true
}
pub fn erase_stroke(&mut self, stroke_id: &str, from: f32, to: f32) -> bool {
if from < 0.0 || to < from {
return false
};
let (entry, user, stroke_idx) = match self.get_user_mut(stroke_id) {
Some(stroke) => stroke,
None => return false,
};
let stroke = match entry.strokes.get_mut(stroke_idx) {
Some(stroke) => stroke,
None => return false,
};
let unit_interval = Interval::new(from, to).into();
if stroke.intervals.union(&unit_interval) {
stroke.intervals.dirty();
entry.dirty();
self.crdt.dirty();
let delta = self.deltas.entry(StrokeID::new(user, stroke_idx)).or_insert_with(StrokeDelta::new);
delta.intervals.union(&unit_interval);
}
true
}
pub fn get_stroke_points(&self, stroke: &str) -> Option<&Vec<Option<Point>>> {
self.get_stroke(stroke).map(|stroke| stroke.points.deref())
}
pub fn get_stroke_intervals(&self, stroke: &str) -> Option<&IntervalUnion> {
self.get_stroke(stroke).map(|stroke| stroke.intervals.deref())
}
pub fn fetch_events(&mut self) {
if !self.crdt.is_dirty() {
return
};
self.crdt.clean();
let mut stroke_events = Vec::new();
let mut interval_events = Vec::new();
self.crdt.iter_mut().for_each(|(user, entry)| {
if !entry.is_dirty() {
return
};
entry.clean();
let user = CRDT::uuid_to_string(*user) + "-";
entry.strokes.iter_mut().enumerate().for_each(|(i, stroke)| {
if stroke.points.is_dirty() {
stroke.points.clean();
stroke_events.push((user.clone() + &i.to_string(), stroke.points.deref()));
}
if stroke.intervals.is_dirty() {
stroke.intervals.clean();
interval_events.push((user.clone() + &i.to_string(), stroke.intervals.deref()));
}
});
});
for (user, points) in stroke_events {
self.event_listener.on_stroke(user, points);
}
for (user, intervals) in interval_events {
self.event_listener.on_interval(user, intervals);
}
}
pub fn fetch_broadcasts(&mut self) {
if self.deltas.is_empty() {
return
};
self.event_listener.on_broadcast(&self.deltas);
self.deltas.clear();
}
}
#[macro_use]
extern crate serde_derive;
use wasm_bindgen::prelude::*;
use std::option::Option;
use js_sys::Error;
use serde_wasm_bindgen::to_value as to_js_value;
use std::collections::HashMap;
#[cfg(feature = "wee_alloc")]
#[global_allocator]
static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;
mod utils;
mod crdt;
use crdt::{CRDT, EventListener, Point, IntervalUnion, StrokeDelta, StrokeID};
#[wasm_bindgen]
extern "C" {
#[derive(Clone)]
pub type WasmEventListener;
#[wasm_bindgen(structural, method)]
pub fn on_stroke(this: &WasmEventListener, stroke: String, points: JsValue);
#[wasm_bindgen(structural, method)]
pub fn on_interval(this: &WasmEventListener, stroke: String, intervals: JsValue);
#[wasm_bindgen(structural, method)]
pub fn on_broadcast(this: &WasmEventListener, deltas: JsValue);
}
impl EventListener for WasmEventListener {
fn on_stroke(&self, stroke: String, points: &Vec<Option<Point>>) {
#[allow(unused_must_use)] { // TODO: Error handling
to_js_value(points).map(|points| self.on_stroke(stroke, points));
}
}
fn on_interval(&self, stroke: String, intervals: &IntervalUnion) {
#[allow(unused_must_use)] { // TODO: Error handling
to_js_value(intervals).map(|intervals| self.on_interval(stroke, intervals));
}
}
fn on_broadcast(&self, deltas: &HashMap<StrokeID, StrokeDelta>) {
/*#[allow(unused_must_use)] { // TODO: Error handling
to_js_value(deltas).map(|deltas| self.on_broadcast(deltas));
}*/
let deltas = to_js_value(deltas).unwrap();
self.on_broadcast(deltas);
}
}
#[wasm_bindgen]
pub struct WasmCRDT(CRDT);
#[wasm_bindgen]
impl WasmCRDT {
// WARNING: The CRDT actually obtains "ownership" of the event listener, but it can
// still be modified and destructed from inside JavaScript
// Pass in a unique object with unique callbacks into the constructor without
// other references to them to uphold this contract
#[wasm_bindgen(constructor)]
pub fn new(event_listener: &WasmEventListener) -> WasmCRDT {
WasmCRDT(CRDT::new(Box::new((*event_listener).clone())))
}
pub fn set_user(&mut self, user: &str) -> bool {
self.0.set_user(user)
}
pub fn get_user(&self) -> Option<String> {
self.0.get_user()
}
pub fn add_stroke(&mut self, x: i32, y: i32, weight: f32, colour: &str) -> Option<String> {
self.0.add_stroke(x, y, weight, colour)
}
pub fn add_point(&mut self, stroke: &str, x: i32, y: i32, weight: f32, colour: &str) -> bool {
self.0.add_point(stroke, x, y, weight, colour)
}
pub fn erase_stroke(&mut self, stroke: &str, from: f32, to: f32) -> bool {
self.0.erase_stroke(stroke, from, to)
}
pub fn get_stroke_points(&self, stroke: &str) -> Result<JsValue, JsValue> {
let points = match self.0.get_stroke_points(stroke) {
Some(points) => points,
None => return Err(Error::new("Unknown stroke ID").into()),
};
Ok(to_js_value(points)?)
}
pub fn get_stroke_intervals(&self, stroke: &str) -> Result<JsValue, JsValue> {
let intervals = match self.0.get_stroke_intervals(stroke) {
Some(intervals) => intervals,
None => return Err(Error::new("Unknown stroke ID").into()),
};
Ok(to_js_value(intervals)?)
}
pub fn fetch_events(&mut self) {
self.0.fetch_events()
}
pub fn fetch_broadcasts(&mut self) {
self.0.fetch_broadcasts()
}
}
#[allow(dead_code)]
pub fn set_panic_hook() {
// When the `console_error_panic_hook` feature is enabled, we can call the
// `set_panic_hook` function at least once during initialization, and then
// we will get better error messages if our code ever panics.
//
// For more details see
// https://github.com/rustwasm/console_error_panic_hook#readme
#[cfg(feature = "console_error_panic_hook")]
console_error_panic_hook::set_once();
}
//! Test suite for the Web and headless browsers.
#![cfg(target_arch = "wasm32")]
extern crate wasm_bindgen_test;
use wasm_bindgen_test::*;
// wasm-pack test --headless --chrome
wasm_bindgen_test_configure!(run_in_browser);
#[wasm_bindgen_test]
fn pass() {
assert_eq!(1 + 1, 2);
}
// A dependency graph that contains any wasm must all be imported
// asynchronously. This `bootstrap.js` file does the single async import, so
// that no one else needs to worry about it again.
import("./index.js")
.catch(e => console.error("Error importing `index.js`:", e));
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Hello wasm-pack!</title>
</head>
<body>
<noscript>This page contains webassembly and javascript content, please enable javascript in your browser.</noscript>
<script src="./bootstrap.js"></script>
</body>
</html>
import * as wasm from "drawing-crdt"
const crdt = new wasm.WasmCRDT({
on_stroke: (stroke, points) => {
console.log("on_stroke:", stroke, points)
},
on_interval: (stroke, intervals) => {
console.log("on_interval", stroke, intervals)
},
on_broadcast: (deltas) => {
console.log("on_broadcast:", deltas)
},
})
crdt.set_user("36577c51-a80b-47d6-b3c3-cfb11f705b87")
let stroke_id = crdt.add_stroke(4, 2, 3.14, "ffff00")
crdt.add_point(stroke_id, 2, 4, 4.13, "0000ff")
crdt.erase_stroke(stroke_id, 0.0, 2.0)
console.log("pre fetch events")
crdt.fetch_events();
console.log("post fetch events")
console.log("pre fetch broadcasts")
crdt.fetch_broadcasts();
console.log("post fetch broadcasts")
crdt.erase_stroke(stroke_id, 0.0, 0.25)
crdt.erase_stroke(stroke_id, 0.5, 2.25)
crdt.erase_stroke(stroke_id, 2.5, 3.0)
console.log("pre fetch events")
crdt.fetch_events();
console.log("post fetch events")
console.log("pre fetch broadcasts")
crdt.fetch_broadcasts();
console.log("post fetch broadcasts")
Source diff could not be displayed: it is too large. Options to address this: view the blob.
{
"name": "create-wasm-app",
"version": "0.1.0",
"description": "create an app to consume rust-generated wasm packages",
"main": "index.js",
"bin": {
"create-wasm-app": ".bin/create-wasm-app.js"
},
"scripts": {
"build": "webpack --config webpack.config.js",
"start": "webpack-dev-server --host 0.0.0.0 --disable-host-check --port 3000"
},
"repository": {
"type": "git",
"url": "git+https://github.com/rustwasm/create-wasm-app.git"
},
"keywords": [
"webassembly",
"wasm",
"rust",
"webpack"
],
"author": "Ashley Williams <ashley666ashley@gmail.com>",
"license": "(MIT OR Apache-2.0)",
"bugs": {
"url": "https://github.com/rustwasm/create-wasm-app/issues"
},
"homepage": "https://github.com/rustwasm/create-wasm-app#readme",
"dependencies": {
"drawing-crdt": "file:../pkg"
},
"devDependencies": {
"webpack": "^4.29.3",
"webpack-cli": "^3.1.0",
"webpack-dev-server": "^3.1.5",
"copy-webpack-plugin": "^5.0.0"
}
}
const CopyWebpackPlugin = require("copy-webpack-plugin");
const path = require('path');
module.exports = {
entry: "./bootstrap.js",
output: {
path: path.resolve(__dirname, "dist"),
filename: "bootstrap.js",
},
mode: "development",
plugins: [
new CopyWebpackPlugin(['index.html'])
],
};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment