diff --git a/.vscode/launch.json b/.vscode/launch.json index 833c43e908ff82f69998177fa63fda270758568c..f5f0d3e1a3082c6f325a0e551678a6e95167310d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -2,7 +2,7 @@ "version": "0.2.0", "configurations": [ { - "name": "Launch", + "name": "Test", "type": "node", "request": "launch", "program": "node_modules/gulp/bin/gulp.js", diff --git a/declarations/Structs.js b/declarations/Structs.js index 58001bbc59828265c95a0bb6b7324fe202284e20..9855efeb3cf5735fa42121ca8e41822f98ac0414 100644 --- a/declarations/Structs.js +++ b/declarations/Structs.js @@ -12,6 +12,7 @@ type Struct = { struct: 'Insert' | 'Delete' }*/ type Struct = Insertion | Deletion +type Operation = Struct type Insertion = { id: Id, @@ -23,4 +24,30 @@ type Insertion = { type Deletion = { target: Id, struct: 'Delete' -} \ No newline at end of file +} + + +type MessageSyncStep1 = { + type: 'sync step 1', + deleteSet: any, + stateSet: any +} + +type MessageSyncStep2 = { + type: 'sync step 2', + os: Array<Operation>, + deleteSet: any, + stateSet: any +} + +type MessageUpdate = { + type: 'update', + ops: Array<Operation> +} + +type MessageSyncDone = { + type: 'sync done' +} + +type Message = MessageSyncStep1 | MessageSyncStep2 | MessageUpdate | MessageSyncDone + diff --git a/declarations/Y.js b/declarations/Y.js index e287be99abe2f3cdb9e9eaa2ba52ca6cd7208342..7a4bd94f809b25c99af1260c32ace059552dfb22 100644 --- a/declarations/Y.js +++ b/declarations/Y.js @@ -4,9 +4,10 @@ type YGlobal = { utils: Object; Struct: Object; AbstractDatabase: any; + AbstractConnector: any; } -type YInstance = { +type YConfig = { db: Object, connector: Object, root: Object @@ -14,4 +15,6 @@ type YInstance = { declare var YConcurrency_TestingMode : boolean -type Transaction<A> = Generator<any, A, any> \ No newline at end of file +type Transaction<A> = Generator<any, A, any> + +type SyncRole = 'master' | 'slave' \ No newline at end of file diff --git a/src/Connector.js b/src/Connector.js index 66b6a9d86ea5c8e791e926ffd0b993447856894c..a1bef56f3307638b50c478f65951c3ce416364d2 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -1,7 +1,25 @@ +/* @flow */ 'use strict' -module.exports = function (Y) { +module.exports = function (Y/* :YGlobal */) { class AbstractConnector { + /* :: + y: YConfig; + role: SyncRole; + connections: Object; + isSynced: boolean; + userEventListeners: Array<Function>; + whenSyncedListeners: Array<Function>; + currentSyncTarget: ?UserId; + syncingClients: Array<any>; + forwardToSyncingClients: boolean; + debug: boolean; + broadcastedHB: boolean; + syncStep2: Promise; + userId: UserId; + send: Function; + broadcast: Function; + */ /* opts contains the following information: role : String Role of this client ("master" or "slave") @@ -119,10 +137,12 @@ module.exports = function (Y) { var conn = this this.currentSyncTarget = syncUser this.y.db.requestTransaction(function *() { + var stateSet = yield* this.getStateSet() + var deleteSet = yield* this.getDeleteSet() conn.send(syncUser, { type: 'sync step 1', - stateSet: yield* this.getStateSet(), - deleteSet: yield* this.getDeleteSet() + stateSet: stateSet, + deleteSet: deleteSet }) }) } else { @@ -139,22 +159,23 @@ module.exports = function (Y) { } send (uid, message) { if (this.debug) { - console.log(`send ${this.userId} -> ${uid}: ${message.type}`, m) // eslint-disable-line + console.log(`send ${this.userId} -> ${uid}: ${message.type}`, message) // eslint-disable-line } } /* You received a raw message, and you know that it is intended for Yjs. Then call this function. */ - receiveMessage (sender, m) { + receiveMessage (sender/* :UserId */, message/* :Message */) { if (sender === this.userId) { return } if (this.debug) { - console.log(`receive ${sender} -> ${this.userId}: ${m.type}`, JSON.parse(JSON.stringify(m))) // eslint-disable-line + console.log(`receive ${sender} -> ${this.userId}: ${message.type}`, JSON.parse(JSON.stringify(message))) // eslint-disable-line } - if (m.type === 'sync step 1') { + if (message.type === 'sync step 1') { // TODO: make transaction, stream the ops let conn = this + let m = message this.y.db.requestTransaction(function *() { var currentStateSet = yield* this.getStateSet() yield* this.applyDeleteSet(m.deleteSet) @@ -176,7 +197,7 @@ module.exports = function (Y) { conn.send(sender, { type: 'sync done' }) - }, conn.syncingClientDuration) + }, 5000) // TODO: conn.syncingClientDuration) } else { conn.send(sender, { type: 'sync done' @@ -184,46 +205,50 @@ module.exports = function (Y) { } conn._setSyncedWith(sender) }) - } else if (m.type === 'sync step 2') { + } else if (message.type === 'sync step 2') { let conn = this var broadcastHB = !this.broadcastedHB this.broadcastedHB = true var db = this.y.db - var defer = Promise.defer() + var defer = {} + defer.promise = new Promise(function (resolve) { + defer.resolve = resolve + }) this.syncStep2 = defer.promise + let m /* :MessageSyncStep2 */ = message db.requestTransaction(function * () { yield* this.applyDeleteSet(m.deleteSet) this.store.apply(m.os) db.requestTransaction(function * () { var ops = yield* this.getOperations(m.stateSet) if (ops.length > 0) { - m = { + var update /* :MessageUpdate */ = { type: 'update', ops: ops } if (!broadcastHB) { // TODO: consider to broadcast here.. - conn.send(sender, m) + conn.send(sender, update) } else { // broadcast only once! - conn.broadcast(m) + conn.broadcast(update) } } defer.resolve() }) }) - } else if (m.type === 'sync done') { + } else if (message.type === 'sync done') { var self = this this.syncStep2.then(function () { self._setSyncedWith(sender) }) - } else if (m.type === 'update') { + } else if (message.type === 'update') { if (this.forwardToSyncingClients) { for (var client of this.syncingClients) { - this.send(client, m) + this.send(client, message) } } if (this.y.db.forwardAppliedOperations) { - var delops = m.ops.filter(function (o) { + var delops = message.ops.filter(function (o) { return o.struct === 'Delete' }) if (delops.length > 0) { @@ -233,7 +258,7 @@ module.exports = function (Y) { }) } } - this.y.db.apply(m.ops) + this.y.db.apply(message.ops) } } _setSyncedWith (user) { @@ -259,7 +284,7 @@ module.exports = function (Y) { does not support primitive values as array elements expects an ltx (less than xml) object */ - parseMessageFromXml (m) { + parseMessageFromXml (m/* :any */) { function parseArray (node) { for (var n of node.children) { if (n.getAttribute('isArray') === 'true') { @@ -269,7 +294,7 @@ module.exports = function (Y) { } } } - function parseObject (node) { + function parseObject (node/* :any */) { var json = {} for (var attrName in node.attrs) { var value = node.attrs[attrName] @@ -280,7 +305,7 @@ module.exports = function (Y) { json[attrName] = int } } - for (var n in node.children) { + for (var n/* :any */ in node.children) { var name = n.name if (n.getAttribute('isArray') === 'true') { json[name] = parseArray(n) diff --git a/src/Database.js b/src/Database.js index 0dbd427c63dd4db3f81f4a744d09af3590af05f7..8c7a1c398d1f83d219c165bf192fab7c4a2417be 100644 --- a/src/Database.js +++ b/src/Database.js @@ -1,7 +1,7 @@ /* @flow */ 'use strict' -module.exports = function (Y /* : YGlobal */) { +module.exports = function (Y /* :YGlobal */) { /* Partial definition of an OperationStore. TODO: name it Database, operation store only holds operations. @@ -16,7 +16,7 @@ module.exports = function (Y /* : YGlobal */) { */ class AbstractDatabase { /* :: - y: YInstance; + y: YConfig; forwardAppliedOperations: boolean; listenersById: Object; listenersByIdExecuteNow: Array<Object>;