close
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ class Hypercore extends EventEmitter {
once = false
this._findingPeers--
if (this.core !== null && --this.core.replicator.findingPeers === 0) {
this.core.replicator.updateAll()
this.core.replicator.queueUpdateAll()
}
}
}
Expand Down
62 changes: 46 additions & 16 deletions lib/replicator.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ const SCALE_LATENCY = 50
const NOT_DOWNLOADING_SLACK = (20000 + Math.random() * 20000) | 0
const MAX_PEERS_UPGRADE = 3
const LAST_BLOCKS = 256

const MAX_RANGES = 64
const MIN_PEER_UPDATE_ALL = 40

const NOT_AVAILABLE = 1
const INVALID_REQUEST = 2
Expand Down Expand Up @@ -754,7 +754,7 @@ class Peer {

if (this.remoteOpened === false) {
this.replicator._ifAvailable--
this.replicator.updateAll()
this.replicator.queueUpdateAll()
return
}

Expand Down Expand Up @@ -1298,7 +1298,7 @@ class Peer {
const req = request > 0 ? this.replicator._inflight.get(request) : null

if (req === null || req.peer !== this) {
this.replicator.updateAll()
this.replicator.queueUpdateAll()
return
}

Expand Down Expand Up @@ -2021,6 +2021,9 @@ module.exports = class Replicator {
this._notDownloadingLinger = notDownloadingLinger
this._notDownloadingTimer = null

this._updateAllBump = null
this._updateAllBound = this.updateAll.bind(this)

const self = this
this._onstreamclose = onstreamclose

Expand Down Expand Up @@ -2203,7 +2206,7 @@ module.exports = class Replicator {

const ref = this._addUpgrade().attach(session)

this.updateAll()
this.queueUpdateAll()

return ref
}
Expand All @@ -2213,7 +2216,7 @@ module.exports = class Replicator {
const ref = b.attach(session)

this._queueBlock(b)
this.updateAll()
this.queueUpdateAll()

return ref
}
Expand All @@ -2223,7 +2226,7 @@ module.exports = class Replicator {
const ref = s.attach(session)

this._seeks.push(s)
this.updateAll()
this.queueUpdateAll()

return ref
}
Expand Down Expand Up @@ -2266,7 +2269,7 @@ module.exports = class Replicator {
return ref
}

this.updateAll()
this.queueUpdateAll()

return ref
}
Expand All @@ -2288,7 +2291,7 @@ module.exports = class Replicator {
}

for (const replicator of updated) {
replicator.updateAll()
replicator.queueUpdateAll()
}
}

Expand All @@ -2300,7 +2303,7 @@ module.exports = class Replicator {
cleared = true
}

if (cleared) this.updateAll()
if (cleared) this.queueUpdateAll()
}

_matchingRequest(req, data) {
Expand Down Expand Up @@ -2460,7 +2463,7 @@ module.exports = class Replicator {
}

this._onpeerupdate(false, peer)
if (inflight) this.updateAll()
if (inflight) this.queueUpdateAll()
}

_queueBlock(b) {
Expand Down Expand Up @@ -2650,7 +2653,7 @@ module.exports = class Replicator {
this._updatesPending = 0
}

if (this._inflight.idle || updateAll) this.updateAll()
if (this._inflight.idle || updateAll) this.queueUpdateAll()
}

_maybeResolveIfAvailableRanges() {
Expand Down Expand Up @@ -2716,7 +2719,7 @@ module.exports = class Replicator {
this._clearRequest(peer, req)
}

this.updateAll()
this.queueUpdateAll()
}

_openSkipBitfield() {
Expand Down Expand Up @@ -2839,7 +2842,7 @@ module.exports = class Replicator {
const f = this._addReorg(data.fork, peer)

if (f === null) {
this.updateAll()
this.queueUpdateAll()
return
}

Expand All @@ -2860,7 +2863,7 @@ module.exports = class Replicator {
}
}

this.updateAll()
this.queueUpdateAll()
}

// Never throws, allowed to run in the background
Expand Down Expand Up @@ -2900,7 +2903,7 @@ module.exports = class Replicator {
for (const f of old) f.resolve()
f.resolve()

this.updateAll()
this.queueUpdateAll()
}

_maybeUpdate() {
Expand Down Expand Up @@ -3023,16 +3026,25 @@ module.exports = class Replicator {
this._maybeResolveIfAvailableRanges()
}

updateAll() {
updateAll(limit = Infinity) {
// Quick shortcut to wait for flushing reorgs - not needed but less waisted requests
if (this._applyingReorg !== null) return

// Clear queued scan if a full scan
if (this._updateAllBump !== null && limit === Infinity) {
clearTimeout(this._updateAllBump)
this._updateAllBump = null
}

const peers = new RandomIterator(this.peers)

let tried = 0
for (const peer of peers) {
tried++
if (this._updatePeer(peer) === true) {
peers.requeue()
}
if (tried >= limit) break
}

// Check if we can skip the non primary check fully
Expand All @@ -3041,16 +3053,34 @@ module.exports = class Replicator {
return
}

tried = 0
for (const peer of peers.restart()) {
tried++
if (this._updatePeerNonPrimary(peer) === true) {
peers.requeue()
}
if (tried >= limit) break
}

this._checkUpgradeIfAvailable()
this._maybeResolveIfAvailableRanges()
}

getUpdateAllDelay(peers) {
return Math.min(3000, Math.max(100, (peers.length - MIN_PEER_UPDATE_ALL) * 5))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok if MIN_PEER_UPDATE is relatively non small, like 40-50

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated MIN_PEER_UPDATE_ALL to 40. Previously was 30.

}

queueUpdateAll() {
if (this.peers.length < MIN_PEER_UPDATE_ALL) return this.updateAll()

// Immediately scan subset
this.updateAll(MIN_PEER_UPDATE_ALL)

// Schedule full scan
if (this._updateAllBump !== null) return //skip if already scheduled
this._updateAllBump = setTimeout(this._updateAllBound, this.getUpdateAllDelay(this.peers))
}

onpeerdestroy() {
if (--this._active === 0) this.core.checkIfIdle()
}
Expand Down
Loading