Skip to content
Snippets Groups Projects
Commit fec03dc6 authored by Kevin Jahns's avatar Kevin Jahns
Browse files

added test connector, webrtc connector, ideas to apply operations with very low overhead

parent 3142b0f1
No related branches found
No related tags found
No related merge requests found
......@@ -4,16 +4,21 @@
},
"rules": {
"strict": 0,
"camelcase": [1, {"properties": "never"}]
"camelcase": [1, {"properties": "never"}],
"no-underscore-dangle": 0
},
"parser": "babel-eslint",
"globals": {
"OperationStore": true,
"AbstractOperationStore": true,
"AbstractTransaction": true,
"AbstractConnector": true,
"Transaction": true,
"IndexedDB": true,
"IDBRequest": true,
"GeneratorFunction": true
"GeneratorFunction": true,
"Y": true,
"setTimeout": true,
"setInterval": true
}
}
class AbstractConnector {
/*
opts
.role : String Role of this client ("master" or "slave")
.userId : String that uniquely defines the user.
*/
constructor (opts) {
if (opts == null){
opts = {};
}
if (opts.role == null || opts.role === "master") {
this.role = "master";
} else if (opts.role === "slave") {
this.role = "slave";
} else {
throw new Error("Role must be either 'master' or 'slave'!");
}
this.role = opts.role;
this.connections = {};
this.userEventListeners = [];
this.whenSyncedListeners = [];
this.currentSyncTarget = null;
}
setUserId (userId) {
this.os.setUserId(userId);
}
onUserEvent (f) {
this.userEventListeners.push(f);
}
userLeft (user : string) {
delete this.connections[user];
if (user === this.currentSyncTarget){
this.currentSyncTarget = null;
this.findNextSyncTarget();
}
for (var f of this.userEventListeners){
f({
action: "userLeft",
user: user
});
}
}
userJoined (user, role) {
if(role == null){
throw new Error("You must specify the role of the joined user!");
}
if (this.connections[user] != null) {
throw new Error("This user already joined!");
}
this.connections[user] = {
isSynced: false,
role: role
};
for (var f of this.userEventListeners) {
f({
action: "userJoined",
user: user,
role: role
});
}
}
// Execute a function _when_ we are connected.
// If not connected, wait until connected
whenSynced (f) {
if (this.isSynced === true) {
f();
} else {
this.whenSyncedListeners.push(f);
}
}
// returns false, if there is no sync target
// true otherwise
findNextSyncTarget () {
if (this.currentSyncTarget != null && this.connections[this.currentSyncTarget].isSynced === false) {
throw new Error("The current sync has not finished!")
}
for (var uid in this.connections) {
var u = this.connections[uid];
if (!u.isSynced) {
this.currentSyncTarget = uid;
this.send(uid, {
type: "sync step 1",
stateVector: hb.getStateVector()
});
return true;
}
}
// set the state to synced!
if (!this.isSynced) {
this.isSynced = true;
for (var f of this.whenSyncedListeners) {
f()
}
this.whenSyncedListeners = null;
} return false;
}
// You received a raw message, and you know that it is intended for to Yjs. Then call this function.
receiveMessage (sender, m) {
if (m.type === "sync step 1") {
// TODO: make transaction, stream the ops
var ops = yield* this.os.getOperations(m.stateVector);
// TODO: compare against m.sv!
var sv = yield* this.getStateVector();
this.send (sender, {
type: "sync step 2"
os: ops,
stateVector: sv
});
this.syncingClients.push(sender);
setTimeout(()=>{
this.syncingClients = this.syncingClients.filter(function(client){
return client !== sender;
});
this.send(sender, {
type: "sync done"
})
}, this.syncingClientDuration);
} else if (m.type === "sync step 2") {
var ops = this.os.getOperations(m.stateVector);
this.broadcast {
type: "update",
ops: ops
}
} else if (m.type === "sync done") {
this.connections[sender].isSynced = true;
this.findNextSyncTarget();
}
} else if (m.type === "update") {
for (var client of this.syncingClients) {
this.send(client, m);
}
this.os.apply(m.ops);
}
}
// Currently, the HB encodes operations as JSON. For the moment I want to keep it
// that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
// too much overhead. Y is very likely to get changed a lot in the future
//
// Because we don't want to encode JSON as string (with character escaping, wich makes it pretty much unreadable)
// we encode the JSON as XML.
//
// When the HB support encoding as XML, the format should look pretty much like this.
//
// does not support primitive values as array elements
// expects an ltx (less than xml) object
parseMessageFromXml (m) {
function parseArray (node) {
for (var n of node.children){
if (n.getAttribute("isArray") === "true") {
return parseArray(n);
} else {
return parseObject(n);
}
}
}
function parseObject (node) {
var json = {};
for (name in node.attrs) {
var value = node.attrs[name];
var int = parseInt(value);
if (isNaN(int) or (""+int) !== value){
json[name] = value;
} else {
json[name] = int;
}
}
for (n in node.children){
var name = n.name;
if (n.getAttribute("isArray") === "true") {
json[name] = parseArray(n);
} else {
json[name] = parseObject(n);
}
}
return json;
}
parseObject(node);
}
// encode message in xml
// we use string because Strophe only accepts an "xml-string"..
// So {a:4,b:{c:5}} will look like
// <y a="4">
// <b c="5"></b>
// </y>
// m - ltx element
// json - Object
encodeMessageToXml (m, json) {
// attributes is optional
function encodeObject (m, json) {
for (name in json) {
var value = json[name];
if (name == null) {
// nop
} else if (value.constructor === Object) {
encodeObject(m.c(name), value);
} else if (value.constructor === Array) {
encodeArray(m.c(name), value);
} else {
m.setAttribute(name, value);
}
}
}
function encodeArray (m, array) {
m.setAttribute("isArray", "true");
for (var e of array) {
if (e.constructor === Object) {
encodeObject(m.c("array-element"), e);
} else {
encodeArray(m.c("array-element"), e);
}
}
}
if (json.constructor === Object) {
encodeObject(m.c("y", {xmlns:"http://y.ninja/connector-stanza"}), json);
} else if (json.constructor === Array) {
encodeArray(m.c("y", {xmlns:"http://y.ninja/connector-stanza"}), json);
} else {
throw new Error("I can't encode this json!");
}
}
}
(function(){
function WebRTC(webrtc_options){
if(webrtc_options === undefined){
throw new Error("webrtc_options must not be undefined!")
}
var room = webrtc_options.room;
// connect per default to our server
if(webrtc_options.url === undefined){
webrtc_options.url = "https://yatta.ninja:8888";
}
var swr = new SimpleWebRTC(webrtc_options);
this.swr = swr;
var self = this;
var channel;
swr.once('connectionReady',function(user_id){
// SimpleWebRTC (swr) is initialized
swr.joinRoom(room);
swr.once('joinedRoom', function(){
// the client joined the specified room
// initialize the connector with the required parameters.
// You always should specify `role`, `syncMethod`, and `user_id`
self.init({
role : "slave",
syncMethod : "syncAll",
user_id : user_id
});
var i;
// notify the connector class about all the users that already
// joined the session
for(i in self.swr.webrtc.peers){
self.userJoined(self.swr.webrtc.peers[i].id, "slave");
}
swr.on("channelMessage", function(peer, room, message){
// The client received a message
// Check if the connector is already initialized,
// only then forward the message to the connector class
if(self.is_initialized && message.type === "yjs"){
self.receiveMessage(peer.id, message.payload);
}
});
});
swr.on("createdPeer", function(peer){
// a new peer/client joined the session.
// Notify the connector class, if the connector
// is already initialized
if(self.is_initialized){
// note: Since the WebRTC Connector only supports the SyncAll
// syncmethod, every client is a slave.
self.userJoined(peer.id, "slave");
}
});
swr.on("peerStreamRemoved",function(peer){
// a client left the session.
// Notify the connector class, if the connector
// is already initialized
if(self.is_initialized){
self.userLeft(peer.id);
}
});
});
}
// Specify how to send a message to a specific user (by uid)
WebRTC.prototype.send = function(uid, message){
var self = this;
// we have to make sure that the message is sent under all circumstances
var send = function(){
// check if the clients still exists
var peer = self.swr.webrtc.getPeers(uid)[0];
var success;
if(peer){
// success is true, if the message is successfully sent
success = peer.sendDirectly("simplewebrtc", "yjs", message);
}
if(!success){
// resend the message if it didn't work
window.setTimeout(send,500);
}
};
// try to send the message
send();
};
// specify how to broadcast a message to all users
// (it may send the message back to itself).
// The webrtc connecor tries to send it to every single clients directly
WebRTC.prototype.broadcast = function(message){
this.swr.sendDirectlyToAll("simplewebrtc","yjs",message);
};
Y.Connectors.WebRTC = WebRTC;
})()
var connectorAdapter = (){
#
# @params new Connector(options)
# @param options.syncMethod {String} is either "syncAll" or "master-slave".
# @param options.role {String} The role of this client
# (slave or master (only used when syncMethod is master-slave))
# @param options.perform_send_again {Boolean} Whetehr to whether to resend the HB after some time period. This reduces sync errors, but has some overhead (optional)
#
init: (options)->
req = (name, choices)=>
if options[name]?
if (not choices?) or choices.some((c)->c is options[name])
@[name] = options[name]
else
throw new Error "You can set the '"+name+"' option to one of the following choices: "+JSON.encode(choices)
else
throw new Error "You must specify "+name+", when initializing the Connector!"
req "syncMethod", ["syncAll", "master-slave"]
req "role", ["master", "slave"]
req "user_id"
@on_user_id_set?(@user_id)
# whether to resend the HB after some time period. This reduces sync errors.
# But this is not necessary in the test-connector
if options.perform_send_again?
@perform_send_again = options.perform_send_again
else
@perform_send_again = true
# A Master should sync with everyone! TODO: really? - for now its safer this way!
if @role is "master"
@syncMethod = "syncAll"
# is set to true when this is synced with all other connections
@is_synced = false
# Peerjs Connections: key: conn-id, value: object
@connections = {}
# List of functions that shall process incoming data
@receive_handlers ?= []
# whether this instance is bound to any y instance
@connections = {}
@current_sync_target = null
@sent_hb_to_all_users = false
@is_initialized = true
onUserEvent: (f)->
@connections_listeners ?= []
@connections_listeners.push f
isRoleMaster: ->
@role is "master"
isRoleSlave: ->
@role is "slave"
findNewSyncTarget: ()->
@current_sync_target = null
if @syncMethod is "syncAll"
for user, c of @connections
if not c.is_synced
@performSync user
break
if not @current_sync_target?
@setStateSynced()
null
userLeft: (user)->
delete @connections[user]
@findNewSyncTarget()
if @connections_listeners?
for f in @connections_listeners
f {
action: "userLeft"
user: user
}
userJoined: (user, role)->
if not role?
throw new Error "Internal: You must specify the role of the joined user! E.g. userJoined('uid:3939','slave')"
# a user joined the room
@connections[user] ?= {}
@connections[user].is_synced = false
if (not @is_synced) or @syncMethod is "syncAll"
if @syncMethod is "syncAll"
@performSync user
else if role is "master"
# TODO: What if there are two masters? Prevent sending everything two times!
@performSyncWithMaster user
if @connections_listeners?
for f in @connections_listeners
f {
action: "userJoined"
user: user
role: role
}
#
# Execute a function _when_ we are connected. If not connected, wait until connected.
# @param f {Function} Will be executed on the Connector context.
#
whenSynced: (args)->
if args.constructor is Function
args = [args]
if @is_synced
args[0].apply this, args[1..]
else
@compute_when_synced ?= []
@compute_when_synced.push args
#
# Execute an function when a message is received.
# @param f {Function} Will be executed on the PeerJs-Connector context. f will be called with (sender_id, broadcast {true|false}, message).
#
onReceive: (f)->
@receive_handlers.push f
#
# perform a sync with a specific user.
#
performSync: (user)->
if not @current_sync_target?
@current_sync_target = user
@send user,
sync_step: "getHB"
send_again: "true"
data: @getStateVector()
if not @sent_hb_to_all_users
@sent_hb_to_all_users = true
hb = @getHB([]).hb
_hb = []
for o in hb
_hb.push o
if _hb.length > 10
@broadcast
sync_step: "applyHB_"
data: _hb
_hb = []
@broadcast
sync_step: "applyHB"
data: _hb
#
# When a master node joined the room, perform this sync with him. It will ask the master for the HB,
# and will broadcast his own HB
#
performSyncWithMaster: (user)->
@current_sync_target = user
@send user,
sync_step: "getHB"
send_again: "true"
data: @getStateVector()
hb = @getHB([]).hb
_hb = []
for o in hb
_hb.push o
if _hb.length > 10
@broadcast
sync_step: "applyHB_"
data: _hb
_hb = []
@broadcast
sync_step: "applyHB"
data: _hb
#
# You are sure that all clients are synced, call this function.
#
setStateSynced: ()->
if not @is_synced
@is_synced = true
if @compute_when_synced?
for el in @compute_when_synced
f = el[0]
args = el[1..]
f.apply(args)
delete @compute_when_synced
null
# executed when the a state_vector is received. listener will be called only once!
whenReceivedStateVector: (f)->
@when_received_state_vector_listeners ?= []
@when_received_state_vector_listeners.push f
#
# You received a raw message, and you know that it is intended for to Yjs. Then call this function.
#
receiveMessage: (sender, res)->
if not res.sync_step?
for f in @receive_handlers
f sender, res
else
if sender is @user_id
return
if res.sync_step is "getHB"
# call listeners
if @when_received_state_vector_listeners?
for f in @when_received_state_vector_listeners
f.call this, res.data
delete @when_received_state_vector_listeners
data = @getHB(res.data)
hb = data.hb
_hb = []
# always broadcast, when not synced.
# This reduces errors, when the clients goes offline prematurely.
# When this client only syncs to one other clients, but looses connectors,
# before syncing to the other clients, the online clients have different states.
# Since we do not want to perform regular syncs, this is a good alternative
if @is_synced
sendApplyHB = (m)=>
@send sender, m
else
sendApplyHB = (m)=>
@broadcast m
for o in hb
_hb.push o
if _hb.length > 10
sendApplyHB
sync_step: "applyHB_"
data: _hb
_hb = []
sendApplyHB
sync_step : "applyHB"
data: _hb
if res.send_again? and @perform_send_again
send_again = do (sv = data.state_vector)=>
()=>
hb = @getHB(sv).hb
for o in hb
_hb.push o
if _hb.length > 10
@send sender,
sync_step: "applyHB_"
data: _hb
_hb = []
@send sender,
sync_step: "applyHB",
data: _hb
sent_again: "true"
setTimeout send_again, 3000
else if res.sync_step is "applyHB"
@applyHB(res.data, sender is @current_sync_target)
if (@syncMethod is "syncAll" or res.sent_again?) and (not @is_synced) and ((@current_sync_target is sender) or (not @current_sync_target?))
@connections[sender].is_synced = true
@findNewSyncTarget()
else if res.sync_step is "applyHB_"
@applyHB(res.data, sender is @current_sync_target)
# Currently, the HB encodes operations as JSON. For the moment I want to keep it
# that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
# too much overhead. Y is very likely to get changed a lot in the future
#
# Because we don't want to encode JSON as string (with character escaping, wich makes it pretty much unreadable)
# we encode the JSON as XML.
#
# When the HB support encoding as XML, the format should look pretty much like this.
# does not support primitive values as array elements
# expects an ltx (less than xml) object
parseMessageFromXml: (m)->
parse_array = (node)->
for n in node.children
if n.getAttribute("isArray") is "true"
parse_array n
else
parse_object n
parse_object = (node)->
json = {}
for name, value of node.attrs
int = parseInt(value)
if isNaN(int) or (""+int) isnt value
json[name] = value
else
json[name] = int
for n in node.children
name = n.name
if n.getAttribute("isArray") is "true"
json[name] = parse_array n
else
json[name] = parse_object n
json
parse_object m
# encode message in xml
# we use string because Strophe only accepts an "xml-string"..
# So {a:4,b:{c:5}} will look like
# <y a="4">
# <b c="5"></b>
# </y>
# m - ltx element
# json - guess it ;)
#
encodeMessageToXml: (m, json)->
# attributes is optional
encode_object = (m, json)->
for name,value of json
if not value?
# nop
else if value.constructor is Object
encode_object m.c(name), value
else if value.constructor is Array
encode_array m.c(name), value
else
m.setAttribute(name,value)
m
encode_array = (m, array)->
m.setAttribute("isArray","true")
for e in array
if e.constructor is Object
encode_object m.c("array-element"), e
else
encode_array m.c("array-element"), e
m
if json.constructor is Object
encode_object m.c("y",{xmlns:"http://y.ninja/connector-stanza"}), json
else if json.constructor is Array
encode_array m.c("y",{xmlns:"http://y.ninja/connector-stanza"}), json
else
throw new Error "I can't encode this json!"
setIsBoundToY: ()->
@on_bound_to_y?()
delete @when_bound_to_y
@is_bound_to_y = true
}
};
// returns a rendom element of o
// works on Object, and Array
function getRandom (o) {
if (o instanceof Array) {
return o[Math.floor(Math.random() * o.length)];
} else if (o.constructor === Object) {
var keys = [];
for (var key in o) {
keys.push(key);
}
return o[getRandom(keys)];
}
}
var globalRoom = {
users: {},
buffers: {},
removeUser: function(user){
for (var u of this.users) {
u.userLeft(user);
}
delete this.users[user];
delete this.buffers[user];
},
addUser: function(connector){
for (var u of this.users) {
u.userJoined(connector.userId);
}
this.users[connector.userId] = connector;
this.buffers[connector.userId] = [];
}
};
setInterval(function(){
var bufs = [];
for (var i in globalRoom.buffers) {
if (globalRoom.buffers[i].length > 0) {
bufs.push(i);
}
}
if (bufs.length > 0) {
var userId = getRandom(bufs);
var m = globalRoom.buffers[userId];
var user = globalRoom.users[userId];
user.receiveMessage(m);
}
}, 10);
var userIdCounter = 0;
class Test extends AbstractConnector {
constructor (options) {
if(options === undefined){
throw new Error("Options must not be undefined!");
}
super({
role: "master"
});
this.setUserId((userIdCounter++) + "");
}
send (uid, message) {
globalRoom.buffers[uid].push(message);
}
broadcast (message) {
for (var buf of globalRoom.buffers) {
buf.push(message);
}
}
disconnect () {
globalRoom.removeUser(this.userId);
}
}
Y.Test = Test;
class WebRTC extends AbstractConnector {
constructor (options) {
if(options === undefined){
throw new Error("Options must not be undefined!");
}
super({
role: "slave"
});
var room = options.room;
// connect per default to our server
if(options.url == null){
options.url = "https://yatta.ninja:8888";
}
var swr = new SimpleWebRTC(options); //eslint-disable-line no-undef
this.swr = swr;
var self = this;
swr.once("connectionReady", function(userId){
// SimpleWebRTC (swr) is initialized
swr.joinRoom(room);
swr.once("joinedRoom", function(){
self.setUserId(userId);
var i;
// notify the connector class about all the users that already
// joined the session
for(i in self.swr.webrtc.peers){
self.userJoined(self.swr.webrtc.peers[i].id, "master");
}
swr.on("channelMessage", function(peer, room_, message){
// The client received a message
// Check if the connector is already initialized,
// only then forward the message to the connector class
if(message.type != null ){
self.receiveMessage(peer.id, message.payload);
}
});
});
swr.on("createdPeer", function(peer){
// a new peer/client joined the session.
// Notify the connector class, if the connector
// is already initialized
self.userJoined(peer.id, "master");
});
swr.on("peerStreamRemoved", function(peer){
// a client left the session.
// Notify the connector class, if the connector
// is already initialized
self.userLeft(peer.id);
});
});
}
send (uid, message) {
var self = this;
// we have to make sure that the message is sent under all circumstances
var send = function(){
// check if the clients still exists
var peer = self.swr.webrtc.getPeers(uid)[0];
var success;
if(peer){
// success is true, if the message is successfully sent
success = peer.sendDirectly("simplewebrtc", "yjs", message);
}
if(!success){
// resend the message if it didn't work
setTimeout(send, 500);
}
};
// try to send the message
send();
}
broadcast (message) {
this.swr.sendDirectlyToAll("simplewebrtc", "yjs", message);
}
}
Y.WebRTC = WebRTC;
......@@ -120,7 +120,7 @@ class AbstractOperationStore { //eslint-disable-line no-unused-vars
}
// notify parent listeners, if possible
var listeners = this.parentListeners[op.parent];
if ( this.parentListenersRequestPending
if ( this.parentListenersRequestPending
|| ( listeners == null )
|| ( listeners.length === 0 )) {
return;
......@@ -141,7 +141,7 @@ class AbstractOperationStore { //eslint-disable-line no-unused-vars
for (var parent_id in activatedOperations){
var parent = yield* this.getOperation(parent_id);
Struct[parent.type].notifyObservers(activatedOperations[parent_id]);
}
}
})
}
......
/* @flow */
// Op is anything that we could get from the OperationStore.
struct Op = Object;
type Op = Object;
var Struct = {
Operation: { //eslint-disable-line no-unused-vars
......@@ -17,10 +17,11 @@ var Struct = {
content : any,
left : Struct.Insert,
right : Struct.Insert,
parent : Struct.List) : Struct.Insert {
parent : Struct.List) : Insert {
op.left = left ? left.id : null;
op.origin = op.left;
op.right = right ? right.id : null;
op.parent = parent.id;
op.struct = "Insert";
yield* Struct.Operation.create.call(this, op, user);
......@@ -127,14 +128,26 @@ var Struct = {
},
execute: function* (op) {
// nop
}
ref: function* (op, pos) : Struct.Insert | undefined{
},
ref: function* (op : Op, pos : number) : Insert {
var o = op.start;
while ( pos !== 0 || o == null) {
o = (yield* this.getOperation(op.start)).right;
o = (yield* this.getOperation(o)).right;
pos--;
}
return (o == null) ? null : yield* this.getOperation(o);
}
},
map: function* (o : Op, f : Function) : Array<any> {
o = o.start;
var res = [];
while ( pos !== 0 || o == null) {
var operation = yield* this.getOperation(o);
res.push(f(operation.content));
o = operation.right;
pos--;
}
return res;
},
insert: function* (op, pos : number, contents : Array<any>) {
var o = yield* Struct.List.ref.call(this, op, pos);
var o_end = yield* this.getOperation(o.right);
......
......@@ -7,8 +7,12 @@
this._model = _model;
}
*val (pos) {
var o = yield* this.Struct.List.ref(pos);
return o ? o.content : null;
if (pos != null) {
var o = yield* this.Struct.List.ref(this._model, pos);
return o ? o.content : null;
} else {
return yield* this.Struct.List.map(this._model, function(c){return c; });
}
}
*insert (pos, contents) {
yield* this.Struct.List.insert(pos, contents);
......@@ -17,9 +21,7 @@
Y.List = function* YList(){
var model = yield* this.Struct.List.create();
return new Y.List.Create(model);
}
return new List(model);
};
Y.List.Create = List;
Y.List = List;
})();
/* @flow */
function Y (opts) {
function Y (opts) { //eslint-disable-line no-unused-vars
var connector = opts.connector;
Y.Connectors[connector.name]
Y.Connectors[connector.name]();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment