Skip to content
Snippets Groups Projects
Commit 17803266 authored by Kevin Jahns's avatar Kevin Jahns
Browse files

repairChecker: Yjs is now able to detect incorrect states that happen when...

repairChecker: Yjs is now able to detect incorrect states that happen when messages get lost. When Yjs is in an incorrect state it repairs itself and syncs again
parent f0e88d19
No related branches found
No related tags found
No related merge requests found
...@@ -66,6 +66,16 @@ module.exports = function (Y/* :any */) { ...@@ -66,6 +66,16 @@ module.exports = function (Y/* :any */) {
this.whenSyncedListeners = [] this.whenSyncedListeners = []
return this.y.db.stopGarbageCollector() return this.y.db.stopGarbageCollector()
} }
repair () {
console.info('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
for (var name in this.connections) {
this.connections[name].isSynced = false
}
this.isSynced = false
this.currentSyncTarget = null
this.broadcastedHB = false
this.findNextSyncTarget()
}
setUserId (userId) { setUserId (userId) {
if (this.userId == null) { if (this.userId == null) {
this.userId = userId this.userId = userId
......
...@@ -112,6 +112,41 @@ module.exports = function (Y /* :any */) { ...@@ -112,6 +112,41 @@ module.exports = function (Y /* :any */) {
if (this.gcTimeout > 0) { if (this.gcTimeout > 0) {
garbageCollect() garbageCollect()
} }
this.repairCheckInterval = !opts.repairCheckInterval ? 6000 : opts.repairCheckInterval
this.opsReceivedTimestamp = new Date()
this.startRepairCheck()
}
startRepairCheck () {
var os = this
if (this.repairCheckInterval > 0) {
this.repairCheckIntervalHandler = setInterval(function repairOnMissingOperations () {
/*
Case 1. No ops have been received in a while (new Date() - os.opsReceivedTimestamp > os.repairCheckInterval)
- 1.1 os.listenersById is empty. Then the state was correct the whole time. -> Nothing to do (nor to update)
- 1.2 os.listenersById is not empty.
* Then the state was incorrect for at least {os.repairCheckInterval} seconds.
* -> Remove everything in os.listenersById and sync again (connector.repair())
Case 2. An op has been received in the last {os.repairCheckInterval } seconds.
It is not yet necessary to check for faulty behavior. Everything can still resolve itself. Wait for more messages.
If nothing was received for a while and os.listenersById is still not emty, we are in case 1.2
-> Do nothing
Baseline here is: we really only have to catch case 1.2..
*/
if (
new Date() - os.opsReceivedTimestamp > os.repairCheckInterval &&
Object.keys(os.listenersById).length > 0 // os.listenersById is not empty
) {
// haven't received operations for over {os.repairCheckInterval} seconds, resend state vector
os.listenersById = {}
os.opsReceivedTimestamp = new Date() // update so you don't send repair several times in a row
os.y.connector.repair()
}
}, this.repairCheckInterval)
}
}
stopRepairCheck () {
clearInterval(this.repairCheckIntervalHandler)
} }
queueGarbageCollector (id) { queueGarbageCollector (id) {
if (this.y.isConnected()) { if (this.y.isConnected()) {
...@@ -207,6 +242,7 @@ module.exports = function (Y /* :any */) { ...@@ -207,6 +242,7 @@ module.exports = function (Y /* :any */) {
* destroy () { * destroy () {
clearInterval(this.gcInterval) clearInterval(this.gcInterval)
this.gcInterval = null this.gcInterval = null
this.stopRepairCheck()
for (var key in this.initializedTypes) { for (var key in this.initializedTypes) {
var type = this.initializedTypes[key] var type = this.initializedTypes[key]
if (type._destroy != null) { if (type._destroy != null) {
...@@ -246,12 +282,14 @@ module.exports = function (Y /* :any */) { ...@@ -246,12 +282,14 @@ module.exports = function (Y /* :any */) {
/* /*
Apply a list of operations. Apply a list of operations.
* we save a timestamp, because we received new operations that could resolve ops in this.listenersById (see this.startRepairCheck)
* get a transaction * get a transaction
* check whether all Struct.*.requiredOps are in the OS * check whether all Struct.*.requiredOps are in the OS
* check if it is an expected op (otherwise wait for it) * check if it is an expected op (otherwise wait for it)
* check if was deleted, apply a delete operation after op was applied * check if was deleted, apply a delete operation after op was applied
*/ */
apply (ops) { apply (ops) {
this.opsReceivedTimestamp = new Date()
for (var i = 0; i < ops.length; i++) { for (var i = 0; i < ops.length; i++) {
var o = ops[i] var o = ops[i]
if (o.id == null || o.id[0] !== this.y.connector.userId) { if (o.id == null || o.id[0] !== this.y.connector.userId) {
......
...@@ -337,7 +337,8 @@ g.createUsers = async(function * createUsers (self, numberOfUsers, database, ini ...@@ -337,7 +337,8 @@ g.createUsers = async(function * createUsers (self, numberOfUsers, database, ini
name: database, name: database,
namespace: 'User ' + i, namespace: 'User ' + i,
cleanStart: true, cleanStart: true,
gcTimeout: -1 gcTimeout: -1,
repairCheckInterval: -1
}, },
connector: { connector: {
name: 'Test', name: 'Test',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment