diff --git a/src/Connector.js b/src/Connector.js index 61e0972db9690a3f27ae758575604af50a9ad3aa..f3ea1174e4802d264fdc6a95499ad2d906f76157 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -1,6 +1,9 @@ /* @flow */ 'use strict' +function canRead (auth) { return auth === 'read' || auth === 'write' } +function canWrite (auth) { return auth === 'write' } + module.exports = function (Y/* :any */) { class AbstractConnector { /* :: @@ -54,6 +57,8 @@ module.exports = function (Y/* :any */) { this.syncStep2 = Promise.resolve() this.broadcastOpBuffer = [] this.protocolVersion = 11 + this.authInfo = opts.auth || null + this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access } reconnect () { } @@ -87,6 +92,9 @@ module.exports = function (Y/* :any */) { onUserEvent (f) { this.userEventListeners.push(f) } + removeUserEventListener (f) { + this.userEventListeners = this.userEventListeners.filter(g => { f !== g }) + } userLeft (user) { if (this.connections[user] != null) { delete this.connections[user] @@ -163,7 +171,8 @@ module.exports = function (Y/* :any */) { type: 'sync step 1', stateSet: stateSet, deleteSet: deleteSet, - protocolVersion: conn.protocolVersion + protocolVersion: conn.protocolVersion, + auth: conn.authInfo }) }) } else { @@ -217,7 +226,7 @@ module.exports = function (Y/* :any */) { */ receiveMessage (sender/* :UserId */, message/* :Message */) { if (sender === this.userId) { - return + return Promise.resolve() } if (this.debug) { console.log(`receive ${sender} -> ${this.userId}: ${message.type}`, JSON.parse(JSON.stringify(message))) // eslint-disable-line @@ -232,91 +241,118 @@ module.exports = function (Y/* :any */) { type: 'sync stop', protocolVersion: this.protocolVersion }) - return + return Promise.reject('Incompatible protocol version') } - if (message.type === 'sync step 1') { - let conn = this - let m = message - this.y.db.requestTransaction(function *() { - var currentStateSet = yield* this.getStateSet() - yield* this.applyDeleteSet(m.deleteSet) - - var ds = yield* this.getDeleteSet() - var ops = yield* this.getOperations(m.stateSet) - conn.send(sender, { - type: 'sync step 2', - os: ops, - stateSet: currentStateSet, - deleteSet: ds, - protocolVersion: this.protocolVersion - }) - if (this.forwardToSyncingClients) { - conn.syncingClients.push(sender) - setTimeout(function () { - conn.syncingClients = conn.syncingClients.filter(function (cli) { - return cli !== sender - }) - conn.send(sender, { - type: 'sync done' - }) - }, 5000) // TODO: conn.syncingClientDuration) - } else { - conn.send(sender, { - type: 'sync done' + if (message.auth != null && this.connections[sender] != null) { + // authenticate using auth in message + var auth = this.checkAuth(message.auth, this.y) + this.connections[sender].auth = auth + auth.then(auth => { + for (var f of this.userEventListeners) { + f({ + action: 'userAuthenticated', + user: sender, + auth: auth }) } - conn._setSyncedWith(sender) }) - } else if (message.type === 'sync step 2') { - let conn = this - var broadcastHB = !this.broadcastedHB - this.broadcastedHB = true - var db = this.y.db - var defer = {} - defer.promise = new Promise(function (resolve) { - defer.resolve = resolve - }) - this.syncStep2 = defer.promise - let m /* :MessageSyncStep2 */ = message - db.requestTransaction(function * () { - yield* this.applyDeleteSet(m.deleteSet) - this.store.apply(m.os) - db.requestTransaction(function * () { - var ops = yield* this.getOperations(m.stateSet) - if (ops.length > 0) { - if (!broadcastHB) { // TODO: consider to broadcast here.. + } else if (this.connections[sender] != null && this.connections[sender].auth == null) { + // authenticate without otherwise + this.connections[sender].auth = this.checkAuth(null, this.y) + } + if (this.connections[sender] != null && this.connections[sender].auth != null) { + return this.connections[sender].auth.then((auth) => { + if (message.type === 'sync step 1' && canRead(auth)) { + let conn = this + let m = message + + this.y.db.requestTransaction(function *() { + var currentStateSet = yield* this.getStateSet() + if (canWrite(auth)) { + yield* this.applyDeleteSet(m.deleteSet) + } + + var ds = yield* this.getDeleteSet() + var ops = yield* this.getOperations(m.stateSet) + conn.send(sender, { + type: 'sync step 2', + os: ops, + stateSet: currentStateSet, + deleteSet: ds, + protocolVersion: this.protocolVersion, + auth: this.authInfo + }) + if (this.forwardToSyncingClients) { + conn.syncingClients.push(sender) + setTimeout(function () { + conn.syncingClients = conn.syncingClients.filter(function (cli) { + return cli !== sender + }) + conn.send(sender, { + type: 'sync done' + }) + }, 5000) // TODO: conn.syncingClientDuration) + } else { conn.send(sender, { - type: 'update', - ops: ops + type: 'sync done' }) - } else { - // broadcast only once! - conn.broadcastOps(ops) + } + conn._setSyncedWith(sender) + }) + } else if (message.type === 'sync step 2' && canWrite(auth)) { + let conn = this + var broadcastHB = !this.broadcastedHB + this.broadcastedHB = true + var db = this.y.db + var defer = {} + defer.promise = new Promise(function (resolve) { + defer.resolve = resolve + }) + this.syncStep2 = defer.promise + let m /* :MessageSyncStep2 */ = message + db.requestTransaction(function * () { + yield* this.applyDeleteSet(m.deleteSet) + this.store.apply(m.os) + db.requestTransaction(function * () { + var ops = yield* this.getOperations(m.stateSet) + if (ops.length > 0) { + if (!broadcastHB) { // TODO: consider to broadcast here.. + conn.send(sender, { + type: 'update', + ops: ops + }) + } else { + // broadcast only once! + conn.broadcastOps(ops) + } + } + defer.resolve() + }) + }) + } else if (message.type === 'sync done') { + var self = this + this.syncStep2.then(function () { + self._setSyncedWith(sender) + }) + } else if (message.type === 'update' && canWrite(auth)) { + if (this.forwardToSyncingClients) { + for (var client of this.syncingClients) { + this.send(client, message) } } - defer.resolve() - }) - }) - } else if (message.type === 'sync done') { - var self = this - this.syncStep2.then(function () { - self._setSyncedWith(sender) - }) - } else if (message.type === 'update') { - if (this.forwardToSyncingClients) { - for (var client of this.syncingClients) { - this.send(client, message) - } - } - if (this.y.db.forwardAppliedOperations) { - var delops = message.ops.filter(function (o) { - return o.struct === 'Delete' - }) - if (delops.length > 0) { - this.broadcastOps(delops) + if (this.y.db.forwardAppliedOperations) { + var delops = message.ops.filter(function (o) { + return o.struct === 'Delete' + }) + if (delops.length > 0) { + this.broadcastOps(delops) + } + } + this.y.db.apply(message.ops) } - } - this.y.db.apply(message.ops) + }) + } else { + return Promise.reject('Unable to deliver message') } } _setSyncedWith (user) { diff --git a/src/Connectors/Test.js b/src/Connectors/Test.js index b78ce7b64659794263ab9c2afa951942ac48bd04..c2e20de4dc8c3e0c5f535ae4294bc50b4d6554fb 100644 --- a/src/Connectors/Test.js +++ b/src/Connectors/Test.js @@ -24,11 +24,21 @@ module.exports = function (Y) { } }, whenTransactionsFinished: function () { - var ps = [] - for (var name in this.users) { - ps.push(this.users[name].y.db.whenTransactionsFinished()) - } - return Promise.all(ps) + var self = this + return new Promise (function (resolve, reject) { + // The connector first has to send the messages to the db. + // Wait for the checkAuth-function to resolve + // The test lib only has a simple checkAuth function: `() => Promise.resolve()` + // Just add a function to the event-queue, in order to wait for the event. + // TODO: this may be buggy in test applications (but it isn't be for real-life apps) + setTimeout(function () { + var ps = [] + for (var name in self.users) { + ps.push(self.users[name].y.db.whenTransactionsFinished()) + } + Promise.all(ps).then(resolve, reject) + }, 0) + }) }, flushOne: function flushOne () { var bufs = [] @@ -54,8 +64,9 @@ module.exports = function (Y) { delete buff[sender] } var user = globalRoom.users[userId] - user.receiveMessage(m[0], m[1]) - return user.y.db.whenTransactionsFinished() + return user.receiveMessage(m[0], m[1]).then(function () { + return user.y.db.whenTransactionsFinished() + }, function () {}) } else { return false } @@ -72,16 +83,14 @@ module.exports = function (Y) { } globalRoom.whenTransactionsFinished().then(nextFlush) } else { - setTimeout(function () { - var c = globalRoom.flushOne() - if (c) { - c.then(function () { - globalRoom.whenTransactionsFinished().then(nextFlush) - }) - } else { - resolve() - } - }, 0) + var c = globalRoom.flushOne() + if (c) { + c.then(function () { + globalRoom.whenTransactionsFinished().then(nextFlush) + }) + } else { + resolve() + } } } globalRoom.whenTransactionsFinished().then(nextFlush) @@ -107,7 +116,7 @@ module.exports = function (Y) { this.syncingClientDuration = 0 } receiveMessage (sender, m) { - super.receiveMessage(sender, JSON.parse(JSON.stringify(m))) + return super.receiveMessage(sender, JSON.parse(JSON.stringify(m))) } send (userId, message) { var buffer = globalRoom.buffers[userId] @@ -154,7 +163,7 @@ module.exports = function (Y) { if (buff[sender].length === 0) { delete buff[sender] } - this.receiveMessage(m[0], m[1]) + yield this.receiveMessage(m[0], m[1]) } yield self.whenTransactionsFinished() })