Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utxo set scanner #2072

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package scorex.crypto.authds.avltree.batch

import com.google.common.primitives.Ints
import org.ergoplatform.serialization.ManifestSerializer.MainnetManifestDepth
import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn}
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{topNodeHashKey, topNodeHeightKey}
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{noStoreSerializer, topNodeHashKey, topNodeHeightKey}
import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode}
import scorex.crypto.authds.{ADDigest, ADKey}
import scorex.util.encode.Base16
Expand Down Expand Up @@ -104,7 +105,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
def subtreeLoop(label: DigestType, builder: mutable.ArrayBuilder[Byte]): Unit = {
val nodeBytes = dbReader.get(label)
builder ++= nodeBytes
val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes)
val node = noStoreSerializer.parseBytes(nodeBytes)
node match {
case in: ProxyInternalNode[DigestType] =>
subtreeLoop(Digest32 @@@ in.leftLabel, builder)
Expand All @@ -123,7 +124,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
def manifestLoop(nodeDbKey: Array[Byte], level: Int, manifestBuilder: mutable.ArrayBuilder[Byte]): Unit = {
val nodeBytes = dbReader.get(nodeDbKey)
manifestBuilder ++= nodeBytes
val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes)
val node = noStoreSerializer.parseBytes(nodeBytes)
node match {
case in: ProxyInternalNode[DigestType] if level == manifestDepth =>
dumpSubtree(Digest32 @@@ in.leftLabel)
Expand Down Expand Up @@ -153,6 +154,38 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
rootNodeLabel
}
}

def iterateAVLTree(fromIndex: Int)(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit =
store.processSnapshot { dbReader =>

var current: Int = 0

def subtree(sid: Array[Byte]): BatchAVLProverSubtree[DigestType] = {
def loop(label: Array[Byte]): ProverNodes[DigestType] =
(noStoreSerializer.parseBytes(dbReader.get(label)): @unchecked) match {
case leaf: ProverLeaf[DigestType] => leaf
case i: ProxyInternalNode[DigestType] =>
i.getNew(loop(i.leftLabel), loop(i.rightLabel))
}
new BatchAVLProverSubtree[DigestType](loop(sid))
}

def proxyLoop(label: Array[Byte], level: Int): Unit =
noStoreSerializer.parseBytes(dbReader.get(label)) match {
case in: ProxyInternalNode[DigestType] if level == MainnetManifestDepth =>
if(current >= fromIndex) handleSubtree(subtree(in.leftLabel))
current += 1
if(current >= fromIndex) handleSubtree(subtree(in.rightLabel))
current += 1
case in: ProxyInternalNode[DigestType] =>
proxyLoop(in.leftLabel, level + 1)
proxyLoop(in.rightLabel, level + 1)
case _ =>
}

proxyLoop(dbReader.get(topNodeHashKey), 1)

}
}


Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/org/ergoplatform/ErgoApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.ergoplatform.local._
import org.ergoplatform.mining.ErgoMiner
import org.ergoplatform.mining.ErgoMiner.StartMining
import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker}
import org.ergoplatform.nodeView.history.ErgoSyncInfoMessageSpec
import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UtxoSetScanner}
import org.ergoplatform.nodeView.history.extra.ExtraIndexer
import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef}
import org.ergoplatform.settings.{Args, ErgoSettings, ErgoSettingsReader, NetworkType, ScorexSettings}
Expand Down Expand Up @@ -115,6 +115,8 @@ class ErgoApp(args: Args) extends ScorexLogging {
// Create an instance of ExtraIndexer actor (will start if "extraIndex = true" in config)
private val indexer: ActorRef = ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings)

UtxoSetScanner(nodeViewHolderRef)

private val syncTracker = ErgoSyncTracker(scorexSettings.network)

private val deliveryTracker: DeliveryTracker = DeliveryTracker.empty(ergoSettings)
Expand Down
25 changes: 23 additions & 2 deletions src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ import spire.syntax.all.cfor

import java.io.File
import org.ergoplatform.modifiers.history.extension.Extension
import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScan

import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -451,8 +455,22 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
}
}

//todo: update state in async way?
/**
* Whether to send blocks to wallet to scan. (Utxo scan keeps wallet height at 0)
* @return true if utxoBootstrap is not enabled; if it is enabled, then walletHeight > 0
*/
private def shouldScanBlocks: Boolean =
if(settings.nodeSettings.utxoSettings.utxoBootstrap) {
try {
Await.result(vault().getWalletStatus.map(_.height), Duration(3, SECONDS)) > 0
}catch {
case _: Throwable => false
}
}else {
true
}

//todo: update state in async way?
/**
* Remote and local persistent modifiers need to be appended to history, applied to state
* which also needs to be git propagated to mempool and wallet
Expand Down Expand Up @@ -509,7 +527,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
v
}

if (almostSynced) {
if (almostSynced && shouldScanBlocks) {
blocksApplied.foreach(newVault.scanPersistent)
}

Expand All @@ -520,6 +538,9 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti
case utxoStateReader: UtxoStateReader if headersHeight == fullBlockHeight =>
val recheckCommand = RecheckMempool(utxoStateReader, newMemPool)
context.system.eventStream.publish(recheckCommand)
if(!shouldScanBlocks) {
context.system.eventStream.publish(StartUtxoSetScan(false))
}
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.ergoplatform.mining.AutolykosPowScheme
import org.ergoplatform.modifiers.history._
import org.ergoplatform.modifiers.history.header.{Header, PreGenesisHeader}
import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock, NonHeaderBlockSection}
import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScanWithHistory
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.StartExtraIndexer
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{IndexedHeightKey, NewestVersion, NewestVersionBytes, SchemaVersionKey, getIndex}
import org.ergoplatform.nodeView.history.storage.HistoryStorage
Expand Down Expand Up @@ -295,8 +296,17 @@ object ErgoHistory extends ScorexLogging {
repairIfNeeded(history)

log.info("History database read")
if(ergoSettings.nodeSettings.extraIndex) // start extra indexer, if enabled

// start extra indexer, if enabled
if(ergoSettings.nodeSettings.extraIndex) {
context.system.eventStream.publish(StartExtraIndexer(history))
}

// set history for utxo set scanner, if bootstrapping by snapshot
if(ergoSettings.nodeSettings.utxoSettings.utxoBootstrap) {
context.system.eventStream.publish(StartUtxoSetScanWithHistory(history))
}

history
}

Expand Down
132 changes: 132 additions & 0 deletions src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package org.ergoplatform.nodeView.history

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import org.ergoplatform.ErgoBox
import org.ergoplatform.modifiers.BlockSection
import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.GetDataFromCurrentView
import org.ergoplatform.nodeView.history.UtxoSetScanner._
import org.ergoplatform.nodeView.history.storage.HistoryStorage
import org.ergoplatform.nodeView.state.UtxoState
import org.ergoplatform.nodeView.wallet.ErgoWallet
import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages.ScanBoxesFromUtxoSnapshot
import org.ergoplatform.serialization.ManifestSerializer
import org.ergoplatform.wallet.boxes.ErgoBoxSerializer
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage
import scorex.crypto.hash.Blake2b256
import scorex.db.ByteArrayWrapper
import scorex.util.{ModifierId, ScorexLogging, bytesToId}

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration

class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging {

private var history: ErgoHistory = _
private def historyStorage: HistoryStorage = history.historyStorage

private implicit val timeout: Timeout = Timeout(10, TimeUnit.SECONDS)
private implicit val duration: Duration = Duration.create(10, TimeUnit.SECONDS)

private val chunkBuffer: ArrayBuffer[(ModifierId,Array[ErgoBox])] = ArrayBuffer.empty[(ModifierId,Array[ErgoBox])]

private def readProgress(): (Int, Int) =
historyStorage.getIndex(utxoSetScanProgressKey).map(ByteBuffer.wrap).map { buffer =>
val current = buffer.getInt
val total = buffer.getInt
(current, total)
}.getOrElse((0, 0))

private def writeProgress(current: Int, total: Int): Unit = {
val buffer: ByteBuffer = ByteBuffer.allocate(8)
buffer.putInt(current)
buffer.putInt(total)
historyStorage.insert(Array((utxoSetScanProgressKey, buffer.array)), Array.empty[BlockSection])
}

private def sendBufferToWallet(wallet: ErgoWallet, current: Int): Unit = {
wallet.scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunkBuffer, current, MainnetTotal))
writeProgress(current, MainnetTotal)
chunkBuffer.clear()
}

private def run(): Unit = {

var (current, total) = readProgress()
if(total == 0 || // scan should not start yet, still syncing
current == MainnetTotal) // scan already done
return

val (state, wallet) = Await.result(
(nodeView ? GetDataFromCurrentView[UtxoState, (UtxoState, ErgoWallet)](x => (x.state, x.vault)))
.mapTo[(UtxoState, ErgoWallet)],
duration
)

val initialized: Boolean = Await.result(wallet.getWalletStatus.map(_.initialized), duration)
if(!initialized) return

log.info(s"Starting UTXO set snapshot scan for $total chunks")

state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current) { subtree =>
current += 1

chunkBuffer += ((
bytesToId(subtree.id),
subtree.leafValues.par.flatMap(ErgoBoxSerializer.parseBytesTry(_).toOption).toArray
))

if(chunkBuffer.size == 32) {
sendBufferToWallet(wallet, current)
}
}

// flush remaining data, if any
if(chunkBuffer.nonEmpty) {
sendBufferToWallet(wallet, current)
}

if(current == total) {
log.info(s"Successfully scanned $total Utxo set subtrees")
wallet.scanPersistent(history.bestFullBlockOpt.get)
}else {
log.error(s"Inconsistent Utxo set scan state: $current scanned subtrees out of $total")
}

}

override def receive: Receive = {
case StartUtxoSetScanWithHistory(history: ErgoHistory) =>
this.history = history
run()
case StartUtxoSetScan(rescan: Boolean) =>
if(readProgress()._1 == 0 || rescan) writeProgress(0, MainnetTotal)
run()
}

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[StartUtxoSetScanWithHistory])
context.system.eventStream.subscribe(self, classOf[StartUtxoSetScan])
}

}

object UtxoSetScanner {

case class StartUtxoSetScanWithHistory(history: ErgoHistory)

case class StartUtxoSetScan(rescan: Boolean)

private val MainnetTotal: Int = math.pow(2, ManifestSerializer.MainnetManifestDepth).toInt

private val utxoSetScanProgressKey: ByteArrayWrapper =
ByteArrayWrapper(Blake2b256.hash("scanned chunk"))

def apply(nodeView: ActorRef)(implicit system: ActorSystem): ActorRef =
system.actorOf(Props.create(classOf[UtxoSetScanner], nodeView))
}
13 changes: 12 additions & 1 deletion src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.ergoplatform.nodeView.wallet

import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.ask
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.modifiers.{ErgoFullBlock, BlockSection}
import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock}
import org.ergoplatform.nodeView.history.ErgoHistoryReader
import org.ergoplatform.nodeView.state.ErgoState
import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages._
Expand All @@ -11,6 +12,9 @@ import org.ergoplatform.wallet.boxes.{ReemissionData, ReplaceCompactCollectBoxSe
import org.ergoplatform.core.VersionTag
import scorex.util.ScorexLogging

import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, parameters: Parameters)
Expand All @@ -36,6 +40,13 @@ class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, param
override val walletActor: ActorRef =
ErgoWalletActor(settings, parameters, new ErgoWalletServiceImpl(settings), boxSelector, historyReader)

private val duration: Duration = Duration.create(10, TimeUnit.SECONDS)

def scanUtxoSnapshot(msg: ScanBoxesFromUtxoSnapshot): ErgoWallet = {
Await.result(walletActor ? msg, duration)
this
}

def scanOffchain(tx: ErgoTransaction): ErgoWallet = {
walletActor ! ScanOffChain(tx)
this
Expand Down
Loading
Loading