From 17803266d42f48c70a78b8d1fe7c388dc8f00214 Mon Sep 17 00:00:00 2001
From: Kevin Jahns <kevin.jahns@rwth-aachen.de>
Date: Tue, 27 Sep 2016 16:12:35 +0200
Subject: [PATCH] repairChecker: Yjs is now able to detect incorrect states
 that happen when messages get lost. When Yjs is in an incorrect state it
 repairs itself and syncs again

---
 src/Connector.js  | 10 ++++++++++
 src/Database.js   | 38 ++++++++++++++++++++++++++++++++++++++
 src/SpecHelper.js |  3 ++-
 3 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/src/Connector.js b/src/Connector.js
index 7e34534e..61e0972d 100644
--- a/src/Connector.js
+++ b/src/Connector.js
@@ -66,6 +66,16 @@ module.exports = function (Y/* :any */) {
       this.whenSyncedListeners = []
       return this.y.db.stopGarbageCollector()
     }
+    repair () {
+      console.info('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
+      for (var name in this.connections) {
+        this.connections[name].isSynced = false
+      }
+      this.isSynced = false
+      this.currentSyncTarget = null
+      this.broadcastedHB = false
+      this.findNextSyncTarget()
+    }
     setUserId (userId) {
       if (this.userId == null) {
         this.userId = userId
diff --git a/src/Database.js b/src/Database.js
index d1aca8fe..dbbec5e7 100644
--- a/src/Database.js
+++ b/src/Database.js
@@ -112,6 +112,41 @@ module.exports = function (Y /* :any */) {
       if (this.gcTimeout > 0) {
         garbageCollect()
       }
+      this.repairCheckInterval = !opts.repairCheckInterval ? 6000 : opts.repairCheckInterval
+      this.opsReceivedTimestamp = new Date()
+      this.startRepairCheck()
+    }
+    startRepairCheck () {
+      var os = this
+      if (this.repairCheckInterval > 0) {
+        this.repairCheckIntervalHandler = setInterval(function repairOnMissingOperations () {
+          /*
+            Case 1. No ops have been received in a while (new Date() - os.opsReceivedTimestamp > os.repairCheckInterval)
+              - 1.1 os.listenersById is empty. Then the state was correct the whole time. -> Nothing to do (nor to update)
+              - 1.2 os.listenersById is not empty.
+                      * Then the state was incorrect for at least {os.repairCheckInterval} seconds.
+                      * -> Remove everything in os.listenersById and sync again (connector.repair())
+            Case 2. An op has been received in the last {os.repairCheckInterval } seconds.
+                    It is not yet necessary to check for faulty behavior. Everything can still resolve itself. Wait for more messages.
+                    If nothing was received for a while and os.listenersById is still not emty, we are in case 1.2
+                    -> Do nothing
+
+            Baseline here is: we really only have to catch case 1.2..
+          */
+          if (
+            new Date() - os.opsReceivedTimestamp > os.repairCheckInterval &&
+            Object.keys(os.listenersById).length > 0 // os.listenersById is not empty
+          ) {
+            // haven't received operations for over {os.repairCheckInterval} seconds, resend state vector
+            os.listenersById = {}
+            os.opsReceivedTimestamp = new Date() // update so you don't send repair several times in a row
+            os.y.connector.repair()
+          }
+        }, this.repairCheckInterval)
+      }
+    }
+    stopRepairCheck () {
+      clearInterval(this.repairCheckIntervalHandler)
     }
     queueGarbageCollector (id) {
       if (this.y.isConnected()) {
@@ -207,6 +242,7 @@ module.exports = function (Y /* :any */) {
     * destroy () {
       clearInterval(this.gcInterval)
       this.gcInterval = null
+      this.stopRepairCheck()
       for (var key in this.initializedTypes) {
         var type = this.initializedTypes[key]
         if (type._destroy != null) {
@@ -246,12 +282,14 @@ module.exports = function (Y /* :any */) {
     /*
       Apply a list of operations.
 
+      * we save a timestamp, because we received new operations that could resolve ops in this.listenersById (see this.startRepairCheck)
       * get a transaction
       * check whether all Struct.*.requiredOps are in the OS
       * check if it is an expected op (otherwise wait for it)
       * check if was deleted, apply a delete operation after op was applied
     */
     apply (ops) {
+      this.opsReceivedTimestamp = new Date()
       for (var i = 0; i < ops.length; i++) {
         var o = ops[i]
         if (o.id == null || o.id[0] !== this.y.connector.userId) {
diff --git a/src/SpecHelper.js b/src/SpecHelper.js
index b06eca07..24eded79 100644
--- a/src/SpecHelper.js
+++ b/src/SpecHelper.js
@@ -337,7 +337,8 @@ g.createUsers = async(function * createUsers (self, numberOfUsers, database, ini
         name: database,
         namespace: 'User ' + i,
         cleanStart: true,
-        gcTimeout: -1
+        gcTimeout: -1,
+        repairCheckInterval: -1
       },
       connector: {
         name: 'Test',
-- 
GitLab