diff --git a/index.js b/index.js index bd093fda..6c614b66 100644 --- a/index.js +++ b/index.js @@ -714,7 +714,7 @@ class Hypercore extends EventEmitter { once = false this._findingPeers-- if (this.core !== null && --this.core.replicator.findingPeers === 0) { - this.core.replicator.updateAll() + this.core.replicator.queueUpdateAll() } } } diff --git a/lib/replicator.js b/lib/replicator.js index e6cb1ad8..eae8922c 100644 --- a/lib/replicator.js +++ b/lib/replicator.js @@ -48,8 +48,8 @@ const SCALE_LATENCY = 50 const NOT_DOWNLOADING_SLACK = (20000 + Math.random() * 20000) | 0 const MAX_PEERS_UPGRADE = 3 const LAST_BLOCKS = 256 - const MAX_RANGES = 64 +const MIN_PEER_UPDATE_ALL = 40 const NOT_AVAILABLE = 1 const INVALID_REQUEST = 2 @@ -754,7 +754,7 @@ class Peer { if (this.remoteOpened === false) { this.replicator._ifAvailable-- - this.replicator.updateAll() + this.replicator.queueUpdateAll() return } @@ -1298,7 +1298,7 @@ class Peer { const req = request > 0 ? this.replicator._inflight.get(request) : null if (req === null || req.peer !== this) { - this.replicator.updateAll() + this.replicator.queueUpdateAll() return } @@ -2021,6 +2021,9 @@ module.exports = class Replicator { this._notDownloadingLinger = notDownloadingLinger this._notDownloadingTimer = null + this._updateAllBump = null + this._updateAllBound = this.updateAll.bind(this) + const self = this this._onstreamclose = onstreamclose @@ -2203,7 +2206,7 @@ module.exports = class Replicator { const ref = this._addUpgrade().attach(session) - this.updateAll() + this.queueUpdateAll() return ref } @@ -2213,7 +2216,7 @@ module.exports = class Replicator { const ref = b.attach(session) this._queueBlock(b) - this.updateAll() + this.queueUpdateAll() return ref } @@ -2223,7 +2226,7 @@ module.exports = class Replicator { const ref = s.attach(session) this._seeks.push(s) - this.updateAll() + this.queueUpdateAll() return ref } @@ -2266,7 +2269,7 @@ module.exports = class Replicator { return ref } - this.updateAll() + this.queueUpdateAll() return ref } @@ -2288,7 +2291,7 @@ module.exports = class Replicator { } for (const replicator of updated) { - replicator.updateAll() + replicator.queueUpdateAll() } } @@ -2300,7 +2303,7 @@ module.exports = class Replicator { cleared = true } - if (cleared) this.updateAll() + if (cleared) this.queueUpdateAll() } _matchingRequest(req, data) { @@ -2460,7 +2463,7 @@ module.exports = class Replicator { } this._onpeerupdate(false, peer) - if (inflight) this.updateAll() + if (inflight) this.queueUpdateAll() } _queueBlock(b) { @@ -2650,7 +2653,7 @@ module.exports = class Replicator { this._updatesPending = 0 } - if (this._inflight.idle || updateAll) this.updateAll() + if (this._inflight.idle || updateAll) this.queueUpdateAll() } _maybeResolveIfAvailableRanges() { @@ -2716,7 +2719,7 @@ module.exports = class Replicator { this._clearRequest(peer, req) } - this.updateAll() + this.queueUpdateAll() } _openSkipBitfield() { @@ -2839,7 +2842,7 @@ module.exports = class Replicator { const f = this._addReorg(data.fork, peer) if (f === null) { - this.updateAll() + this.queueUpdateAll() return } @@ -2860,7 +2863,7 @@ module.exports = class Replicator { } } - this.updateAll() + this.queueUpdateAll() } // Never throws, allowed to run in the background @@ -2900,7 +2903,7 @@ module.exports = class Replicator { for (const f of old) f.resolve() f.resolve() - this.updateAll() + this.queueUpdateAll() } _maybeUpdate() { @@ -3023,16 +3026,25 @@ module.exports = class Replicator { this._maybeResolveIfAvailableRanges() } - updateAll() { + updateAll(limit = Infinity) { // Quick shortcut to wait for flushing reorgs - not needed but less waisted requests if (this._applyingReorg !== null) return + // Clear queued scan if a full scan + if (this._updateAllBump !== null && limit === Infinity) { + clearTimeout(this._updateAllBump) + this._updateAllBump = null + } + const peers = new RandomIterator(this.peers) + let tried = 0 for (const peer of peers) { + tried++ if (this._updatePeer(peer) === true) { peers.requeue() } + if (tried >= limit) break } // Check if we can skip the non primary check fully @@ -3041,16 +3053,34 @@ module.exports = class Replicator { return } + tried = 0 for (const peer of peers.restart()) { + tried++ if (this._updatePeerNonPrimary(peer) === true) { peers.requeue() } + if (tried >= limit) break } this._checkUpgradeIfAvailable() this._maybeResolveIfAvailableRanges() } + getUpdateAllDelay(peers) { + return Math.min(3000, Math.max(100, (peers.length - MIN_PEER_UPDATE_ALL) * 5)) + } + + queueUpdateAll() { + if (this.peers.length < MIN_PEER_UPDATE_ALL) return this.updateAll() + + // Immediately scan subset + this.updateAll(MIN_PEER_UPDATE_ALL) + + // Schedule full scan + if (this._updateAllBump !== null) return //skip if already scheduled + this._updateAllBump = setTimeout(this._updateAllBound, this.getUpdateAllDelay(this.peers)) + } + onpeerdestroy() { if (--this._active === 0) this.core.checkIfIdle() }