diff --git a/src/Connector.js b/src/Connector.js index 96be5dfb97290c33b180ab536e83b6ec896e69f5..e02c0ff574ec20589bb7d910a8911c8a974af60e 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -26,6 +26,16 @@ class AbstractConnector { // eslint-disable-line no-unused-vars this.debug = opts.debug === true this.broadcastedHB = false } + reconnect () { + } + disconnect () { + this.connections = {} + this.isSynced = false + this.currentSyncTarget = null + this.broadcastedHB = false + this.syncingClients = [] + this.whenSyncedListeners = [] + } setUserId (userId) { this.userId = userId this.y.db.setUserId(userId) @@ -97,7 +107,8 @@ class AbstractConnector { // eslint-disable-line no-unused-vars this.y.db.requestTransaction(function *() { conn.send(syncUser, { type: 'sync step 1', - stateVector: yield* this.getStateVector() + stateSet: yield* this.getStateSet(), + deleteSet: yield* this.getDeleteSet() }) }) } @@ -107,13 +118,12 @@ class AbstractConnector { // eslint-disable-line no-unused-vars for (var f of this.whenSyncedListeners) { f() } - this.whenSyncedListeners = null + this.whenSyncedListeners = [] } } send (uid, message) { if (this.debug) { - console.log(`me -> ${uid}: ${message.type}`);// eslint-disable-line - console.dir(m); // eslint-disable-line + console.log(`me -> ${uid}: ${message.type}`, m);// eslint-disable-line } super(uid, message) } @@ -123,19 +133,27 @@ class AbstractConnector { // eslint-disable-line no-unused-vars return } if (this.debug) { - console.log(`${sender} -> me: ${m.type}`);// eslint-disable-line - console.dir(m); // eslint-disable-line + console.log(`${sender} -> me: ${m.type}`, m);// eslint-disable-line } if (m.type === 'sync step 1') { // TODO: make transaction, stream the ops let conn = this this.y.db.requestTransaction(function *() { - var ops = yield* this.getOperations(m.stateVector) - var sv = yield* this.getStateVector() + var ops = yield* this.getOperations(m.stateSet) + var dels = yield* this.getOpsFromDeleteSet(m.deleteSet) + if (dels.length > 0) { + this.store.apply(dels) + // broadcast missing dels from syncing client + this.store.y.connector.broadcast({ + type: 'update', + ops: dels + }) + } conn.send(sender, { type: 'sync step 2', os: ops, - stateVector: sv + stateSet: yield* this.getStateSet(), + deleteSet: yield* this.getDeleteSet() }) if (this.forwardToSyncingClients) { conn.syncingClients.push(sender) @@ -159,7 +177,10 @@ class AbstractConnector { // eslint-disable-line no-unused-vars var broadcastHB = !this.broadcastedHB this.broadcastedHB = true this.y.db.requestTransaction(function *() { - var ops = yield* this.getOperations(m.stateVector) + var ops = yield* this.getOperations(m.stateSet) + var dels = yield* this.getOpsFromDeleteSet(m.deleteSet) + this.store.apply(dels) + this.store.apply(m.os) if (ops.length > 0) { m = { type: 'update', diff --git a/src/Connectors/Test.js b/src/Connectors/Test.js index 68d47c7c99636e444af02bed045b65a2a4721424..1fd665fc093cdeb40065ced9f6cd3044b15cbd51 100644 --- a/src/Connectors/Test.js +++ b/src/Connectors/Test.js @@ -64,8 +64,13 @@ class Test extends AbstractConnector { globalRoom.buffers[key].push(JSON.parse(JSON.stringify([this.userId, message]))) } } + reconnect () { + globalRoom.addUser(this) + super() + } disconnect () { globalRoom.removeUser(this.userId) + super() } flush () { var buff = globalRoom.buffers[this.userId] diff --git a/src/Helper.spec.js b/src/Helper.spec.js index 813d8df810b1f37d94b1c9d2af82d7873e7c92fb..67dafc826e35617b35c03daaf83e1d297c6b1511 100644 --- a/src/Helper.spec.js +++ b/src/Helper.spec.js @@ -38,16 +38,27 @@ async function applyRandomTransactions (users, objects, transactions, numberOfTr var f = getRandom(transactions) f(root) } - for (var i = 0; i < numberOfTransactions; i++) { - var r = Math.random() - if (r >= 0.9) { - // 10% chance to flush - users[0].connector.flushOne() - } else { - randomTransaction(getRandom(objects)) + function applyTransactions () { + for (var i = 0; i < numberOfTransactions / 2 + 1; i++) { + var r = Math.random() + if (r >= 0.9) { + // 10% chance to flush + users[0].connector.flushOne() + } else { + randomTransaction(getRandom(objects)) + } + wait() } - wait() } + applyTransactions() + await users[0].connector.flushAll() + users[0].disconnect() + await wait() + applyTransactions() + await users[0].connector.flushAll() + users[0].reconnect() + await wait() + await users[0].connector.flushAll() } async function compareAllUsers(users){//eslint-disable-line @@ -55,7 +66,7 @@ async function compareAllUsers(users){//eslint-disable-line var db1 = [] function * t1 () { s1 = yield* this.getStateSet() - ds1 = yield* this.getDeletionSet() + ds1 = yield* this.getDeleteSet() allDels1 = [] yield* this.ds.iterate(null, null, function (d) { allDels1.push(d) @@ -63,7 +74,7 @@ async function compareAllUsers(users){//eslint-disable-line } function * t2 () { s2 = yield* this.getStateSet() - ds2 = yield* this.getDeletionSet() + ds2 = yield* this.getDeleteSet() allDels2 = [] yield* this.ds.iterate(null, null, function (d) { allDels2.push(d) diff --git a/src/OperationStores/Memory.js b/src/OperationStores/Memory.js index 4759c90e37a362963531ef81d930c96ee0749627..385923c05cfbfb48614001ed60083d56b58817e0 100644 --- a/src/OperationStores/Memory.js +++ b/src/OperationStores/Memory.js @@ -107,11 +107,12 @@ class DeleteStore extends RBTree { // eslint-disable-line } } }) - for (; pos < dv.len; pos++) { + for (; pos < dv.length; pos++) { d = dv[pos] createDeletions(user, d[0], d[1]) } } + return deletions } } @@ -124,13 +125,13 @@ Y.Memory = (function () { // eslint-disable-line no-unused-vars this.os = store.os this.ds = store.ds } - * getDeletionSet (id) { + * getDeleteSet (id) { return this.ds.toDeleteSet(id) } * isDeleted (id) { return this.ds.isDeleted(id) } - * getDeletions (ds) { + * getOpsFromDeleteSet (ds) { return this.ds.getDeletions(ds) } * setOperation (op) { // eslint-disable-line @@ -201,13 +202,20 @@ Y.Memory = (function () { // eslint-disable-line no-unused-vars var res = [] for (var op of ops) { res.push(yield* this.makeOperationReady(startSS, op)) + var state = startSS[op.id[0]] || 0 + if (state === op.id[1]) { + startSS[op.id[0]] = state + 1 + } else { + throw new Error('Unexpected operation!') + } } return res } * makeOperationReady (ss, op) { // instead of ss, you could use currSS (a ss that increments when you add an operation) - var clock + op = copyObject(op) var o = op + var clock while (o.right != null) { // while unknown, go to the right clock = ss[o.right[0]] @@ -216,8 +224,16 @@ Y.Memory = (function () { // eslint-disable-line no-unused-vars } o = yield* this.getOperation(o.right) } - op = copyObject(op) op.right = o.right + while (o.left != null) { + // while unknown, go to the right + clock = ss[o.left[0]] + if (clock != null && o.left[1] < clock) { + break + } + o = yield* this.getOperation(o.left) + } + op.left = o.left return op } } diff --git a/src/OperationStores/Memory.spec.js b/src/OperationStores/Memory.spec.js index 34ae136068b462ffd9de8c226b88b11c13e8ee9f..bbe077bebb2ee0115c6f21294ae15bc571bf33c5 100644 --- a/src/OperationStores/Memory.spec.js +++ b/src/OperationStores/Memory.spec.js @@ -19,5 +19,14 @@ describe('Memory', function () { expect(ds.isDeleted(['u1', 11])).toBeTruthy() expect(ds.toDeleteSet()).toBeTruthy({'u1': [10, 2]}) }) + it('Creates operations', function () { + ds.add({id: ['5', 3], len: 2}) + var dels = ds.getDeletions({5: [[4, 1]]}) + expect(dels.length === 1).toBeTruthy() + expect(dels[0]).toEqual({ + struct: 'Delete', + target: ['5', 3] + }) + }) }) }) diff --git a/src/Types/Array.spec.js b/src/Types/Array.spec.js index b9b9c9836dfbed6274552fce02c963bea61e86e7..d7060132e559e0bc3a5b2cbdfbcd95b293202e00 100644 --- a/src/Types/Array.spec.js +++ b/src/Types/Array.spec.js @@ -1,17 +1,17 @@ /* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions */ /* eslint-env browser,jasmine */ -var numberOfYArrayTests = 80 +var numberOfYArrayTests = 10 describe('Array Type', function () { - var y1, y2, y3, flushAll + var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll - jasmine.DEFAULT_TIMEOUT_INTERVAL = 50000 + jasmine.DEFAULT_TIMEOUT_INTERVAL = 5000 beforeEach(async function (done) { await createUsers(this, 5) - y1 = this.users[0].root - y2 = this.users[1].root - y3 = this.users[2].root + y1 = (yconfig1 = this.users[0]).root + y2 = (yconfig2 = this.users[1]).root + y3 = (yconfig3 = this.users[2]).root flushAll = this.users[0].connector.flushAll done() }) @@ -59,6 +59,42 @@ describe('Array Type', function () { expect(l2.toArray()).toEqual([0, 2, 'y']) done() }) + it('Handles getOperations ascending ids bug in late sync', async function (done) { + var l1, l2 + l1 = await y1.set('Array', Y.Array) + l1.insert(0, ['x', 'y']) + await flushAll() + yconfig3.disconnect() + yconfig2.disconnect() + await wait() + l2 = await y2.get('Array') + l2.insert(1, [2]) + l2.insert(1, [3]) + await flushAll() + yconfig2.reconnect() + yconfig3.reconnect() + await wait() + await flushAll() + expect(l1.toArray()).toEqual(l2.toArray()) + done() + }) + it('Handles deletions in late sync', async function (done) { + var l1, l2 + l1 = await y1.set('Array', Y.Array) + l1.insert(0, ['x', 'y']) + await flushAll() + yconfig2.disconnect() + await wait() + l2 = await y2.get('Array') + l2.delete(1, 1) + l1.delete(0, 2) + await flushAll() + yconfig2.reconnect() + await wait() + await flushAll() + expect(l1.toArray()).toEqual(l2.toArray()) + done() + }) it('Basic insert. Then delete the whole array', async function (done) { var l1, l2, l3 l1 = await y1.set('Array', Y.Array) @@ -73,6 +109,42 @@ describe('Array Type', function () { expect(l2.toArray()).toEqual([]) done() }) + it('Basic insert. Then delete the whole array (merge listeners on late sync)', async function (done) { + var l1, l2, l3 + l1 = await y1.set('Array', Y.Array) + l1.insert(0, ['x', 'y', 'z']) + await flushAll() + yconfig2.disconnect() + l1.delete(0, 3) + l2 = await y2.get('Array') + await wait() + yconfig2.reconnect() + await wait() + l3 = await y3.get('Array') + await flushAll() + expect(l1.toArray()).toEqual(l2.toArray()) + expect(l2.toArray()).toEqual(l3.toArray()) + expect(l2.toArray()).toEqual([]) + done() + }) + it('Basic insert. Then delete the whole array (merge deleter on late sync)', async function (done) { + var l1, l2, l3 + l1 = await y1.set('Array', Y.Array) + l1.insert(0, ['x', 'y', 'z']) + await flushAll() + yconfig1.disconnect() + l1.delete(0, 3) + l2 = await y2.get('Array') + await wait() + yconfig1.reconnect() + await wait() + l3 = await y3.get('Array') + await flushAll() + expect(l1.toArray()).toEqual(l2.toArray()) + expect(l2.toArray()).toEqual(l3.toArray()) + expect(l2.toArray()).toEqual([]) + done() + }) it('throw insert & delete events', async function (done) { var array = await this.users[0].root.set('array', Y.Array) var event @@ -97,7 +169,7 @@ describe('Array Type', function () { done() }) }) - describe(`${numberOfYArrayTests} Random tests`, function () { + describe(`Random tests`, function () { var randomArrayTransactions = [ function insert (array) { array.insert(getRandomNumber(array.toArray().length), [getRandomNumber()]) @@ -136,6 +208,13 @@ describe('Array Type', function () { done() }) it(`succeed after ${numberOfYArrayTests} actions`, async function (done) { + while (this.users.length > 2) { + this.users.pop().disconnect() + this.arrays.pop() + } + for (var u of this.users) { + u.connector.debug = true + } await applyRandomTransactions(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests) await flushAll() await compareArrayValues(this.arrays) diff --git a/src/y.js b/src/y.js index 2b72d3029a089dd446af4df8c1b949e192656660..dc86d3540744656dc3299020028da8e8d8bfd60c 100644 --- a/src/y.js +++ b/src/y.js @@ -29,6 +29,15 @@ class YConfig { // eslint-disable-line no-unused-vars callback(yconfig) }) } + isConnected () { + return this.connector.isSynced + } + disconnect () { + this.connector.disconnect() + } + reconnect () { + this.connector.reconnect() + } destroy () { this.connector.disconnect() this.db.removeDatabase()