diff --git a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala index 2d541569e7..5ca683c706 100644 --- a/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala +++ b/avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala @@ -2,7 +2,7 @@ package scorex.crypto.authds.avltree.batch import com.google.common.primitives.Ints import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn} -import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{topNodeHashKey, topNodeHeightKey} +import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{noStoreSerializer, topNodeHashKey, topNodeHeightKey} import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode} import scorex.crypto.authds.{ADDigest, ADKey} import scorex.util.encode.Base16 @@ -104,7 +104,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) def subtreeLoop(label: DigestType, builder: mutable.ArrayBuilder[Byte]): Unit = { val nodeBytes = dbReader.get(label) builder ++= nodeBytes - val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes) + val node = noStoreSerializer.parseBytes(nodeBytes) node match { case in: ProxyInternalNode[DigestType] => subtreeLoop(Digest32 @@@ in.leftLabel, builder) @@ -123,7 +123,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) def manifestLoop(nodeDbKey: Array[Byte], level: Int, manifestBuilder: mutable.ArrayBuilder[Byte]): Unit = { val nodeBytes = dbReader.get(nodeDbKey) manifestBuilder ++= nodeBytes - val node = VersionedLDBAVLStorage.noStoreSerializer.parseBytes(nodeBytes) + val node = noStoreSerializer.parseBytes(nodeBytes) node match { case in: ProxyInternalNode[DigestType] if level == manifestDepth => dumpSubtree(Digest32 @@@ in.leftLabel) @@ -153,6 +153,44 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore) rootNodeLabel } } + + /** + * Split the AVL+ tree to 2^depth number of subtrees and process them + * @param fromIndex - only start processing subtrees from this index + * @param depth - depth at whitch to split AVL+ tree to subtrees + * @param handleSubtree - function to process subtree + */ + def iterateAVLTree(fromIndex: Int, depth: Int)(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit = + store.processSnapshot { dbReader => + + var current: Int = 0 + + def subtree(sid: Array[Byte]): BatchAVLProverSubtree[DigestType] = { + def loop(label: Array[Byte]): ProverNodes[DigestType] = + (noStoreSerializer.parseBytes(dbReader.get(label)): @unchecked) match { + case leaf: ProverLeaf[DigestType] => leaf + case i: ProxyInternalNode[DigestType] => + i.getNew(loop(i.leftLabel), loop(i.rightLabel)) + } + new BatchAVLProverSubtree[DigestType](loop(sid)) + } + + def proxyLoop(label: Array[Byte], level: Int): Unit = + noStoreSerializer.parseBytes(dbReader.get(label)) match { + case in: ProxyInternalNode[DigestType] if level == depth => + if(current >= fromIndex) handleSubtree(subtree(in.leftLabel)) + current += 1 + if(current >= fromIndex) handleSubtree(subtree(in.rightLabel)) + current += 1 + case in: ProxyInternalNode[DigestType] => + proxyLoop(in.leftLabel, level + 1) + proxyLoop(in.rightLabel, level + 1) + case _ => + } + + proxyLoop(dbReader.get(topNodeHashKey), 1) + + } } diff --git a/avldb/src/test/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorageSpecification.scala b/avldb/src/test/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorageSpecification.scala index 810e986b00..57b3420612 100644 --- a/avldb/src/test/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorageSpecification.scala +++ b/avldb/src/test/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorageSpecification.scala @@ -12,10 +12,11 @@ import scorex.crypto.authds.{ADDigest, ADKey, ADValue, SerializedAdProof} import scorex.util.encode.Base16 import scorex.crypto.hash.{Blake2b256, Digest32} import scorex.db.{LDBFactory, LDBVersionedStore} -import scorex.util.ByteArrayBuilder +import scorex.util.{ByteArrayBuilder, ModifierId, bytesToId} import scorex.util.serialization.VLQByteBufferWriter import scorex.utils.{Random => RandomBytes} +import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} @@ -370,4 +371,22 @@ class VersionedLDBAVLStorageSpecification } } + property("iterate AVL tree") { + val prover = createPersistentProver() + val current = 11 + val depth = 6 + blockchainWorkflowTest(prover) + + val chunkBuffer: ArrayBuffer[(ModifierId,Array[Array[Byte]])] = ArrayBuffer.empty[(ModifierId,Array[Array[Byte]])] + + prover.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current, depth) { subtree => + chunkBuffer += (( + bytesToId(subtree.id), + subtree.leafValues.toArray + )) + } + + chunkBuffer.size shouldBe math.pow(2, depth) - current + } + } diff --git a/src/main/scala/org/ergoplatform/ErgoApp.scala b/src/main/scala/org/ergoplatform/ErgoApp.scala index c483237aa6..e73c43c3e3 100644 --- a/src/main/scala/org/ergoplatform/ErgoApp.scala +++ b/src/main/scala/org/ergoplatform/ErgoApp.scala @@ -10,7 +10,7 @@ import org.ergoplatform.local._ import org.ergoplatform.mining.ErgoMiner import org.ergoplatform.mining.ErgoMiner.StartMining import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker} -import org.ergoplatform.nodeView.history.ErgoSyncInfoMessageSpec +import org.ergoplatform.nodeView.history.{ErgoSyncInfoMessageSpec, UtxoSetScanner} import org.ergoplatform.nodeView.history.extra.ExtraIndexer import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef} import org.ergoplatform.settings.{Args, ErgoSettings, ErgoSettingsReader, NetworkType, ScorexSettings} @@ -115,6 +115,8 @@ class ErgoApp(args: Args) extends ScorexLogging { None } + UtxoSetScanner(nodeViewHolderRef) + private val syncTracker = ErgoSyncTracker(scorexSettings.network) private val deliveryTracker: DeliveryTracker = DeliveryTracker.empty(ergoSettings) diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index 8f81b7f201..839003d23e 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -26,9 +26,13 @@ import spire.syntax.all.cfor import java.io.File import org.ergoplatform.modifiers.history.extension.Extension +import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScan import scala.annotation.tailrec import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.{Duration, SECONDS} import scala.util.{Failure, Success, Try} /** @@ -449,8 +453,22 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti } } - //todo: update state in async way? + /** + * Whether to send blocks to wallet to scan. (Utxo scan keeps wallet height at 0) + * @return true if utxoBootstrap is not enabled; if it is enabled, then walletHeight > 0 + */ + private def shouldScanBlocks: Boolean = + if(settings.nodeSettings.utxoSettings.utxoBootstrap) { + try { + Await.result(vault().getWalletStatus.map(_.height), Duration(3, SECONDS)) > 0 + }catch { + case _: Throwable => false + } + }else { + true + } + //todo: update state in async way? /** * Remote and local persistent modifiers need to be appended to history, applied to state * which also needs to be git propagated to mempool and wallet @@ -507,7 +525,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti v } - if (almostSynced) { + if (almostSynced && shouldScanBlocks) { blocksApplied.foreach(newVault.scanPersistent) } @@ -518,6 +536,9 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti case utxoStateReader: UtxoStateReader if headersHeight == fullBlockHeight => val recheckCommand = RecheckMempool(utxoStateReader, newMemPool) context.system.eventStream.publish(recheckCommand) + if(!shouldScanBlocks) { + context.system.eventStream.publish(StartUtxoSetScan(false)) + } case _ => } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala index c001dd8e64..e0025f82ad 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala @@ -8,6 +8,7 @@ import org.ergoplatform.mining.AutolykosPowScheme import org.ergoplatform.modifiers.history._ import org.ergoplatform.modifiers.history.header.{Header, PreGenesisHeader} import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock, NonHeaderBlockSection} +import org.ergoplatform.nodeView.history.UtxoSetScanner.InitializeUtxoSetScannerWithHistory import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.StartExtraIndexer import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{IndexedHeightKey, NewestVersion, NewestVersionBytes, SchemaVersionKey, getIndex} import org.ergoplatform.nodeView.history.storage.HistoryStorage @@ -295,8 +296,17 @@ object ErgoHistory extends ScorexLogging { repairIfNeeded(history) log.info("History database read") - if(ergoSettings.nodeSettings.extraIndex) // start extra indexer, if enabled + + // start extra indexer, if enabled + if(ergoSettings.nodeSettings.extraIndex) { context.system.eventStream.publish(StartExtraIndexer(history)) + } + + // set history for utxo set scanner, if bootstrapping by snapshot + if(ergoSettings.nodeSettings.utxoSettings.utxoBootstrap) { + context.system.eventStream.publish(InitializeUtxoSetScannerWithHistory(history)) + } + history } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala new file mode 100644 index 0000000000..e3d476c337 --- /dev/null +++ b/src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala @@ -0,0 +1,171 @@ +package org.ergoplatform.nodeView.history + +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.pattern.ask +import akka.util.Timeout +import org.ergoplatform.ErgoBox +import org.ergoplatform.modifiers.BlockSection +import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.GetDataFromCurrentView +import org.ergoplatform.nodeView.history.UtxoSetScanner._ +import org.ergoplatform.nodeView.history.storage.HistoryStorage +import org.ergoplatform.nodeView.state.UtxoState +import org.ergoplatform.nodeView.wallet.ErgoWallet +import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages.ScanBoxesFromUtxoSnapshot +import org.ergoplatform.serialization.ManifestSerializer +import org.ergoplatform.serialization.ManifestSerializer.MainnetManifestDepth +import org.ergoplatform.wallet.boxes.ErgoBoxSerializer +import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage +import scorex.crypto.hash.Blake2b256 +import scorex.db.ByteArrayWrapper +import scorex.util.{ModifierId, ScorexLogging, bytesToId} + +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration + +/** + * This class is used to provide the current UTXO set for wallet scans when bootstrapping + * by UTXO set snapshot. This is done by creating a snapshot of the UTXO set, deserializing + * the raw bytes to ErgoBoxes and sending them to the wallet actor in chunks. + * @param nodeView - NodeView actor to get wallet and UTXOs from + */ +class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging { + + private var history: ErgoHistory = _ + private def historyStorage: HistoryStorage = history.historyStorage + + private implicit val timeout: Timeout = Timeout(10, TimeUnit.SECONDS) + private implicit val duration: Duration = Duration.create(10, TimeUnit.SECONDS) + + /** + * Internal buffer that holds deserialized AVL subtrees until they are sent to wallet + */ + private val chunkBuffer: ArrayBuffer[(ModifierId,Array[ErgoBox])] = ArrayBuffer.empty[(ModifierId,Array[ErgoBox])] + + /** + * Reads the current progress of the scanner. + * @return (current segment, total segments) + */ + private def readProgress(): (Int, Int) = + historyStorage.getIndex(utxoSetScanProgressKey).map(ByteBuffer.wrap).map { buffer => + val current = buffer.getInt + val total = buffer.getInt + (current, total) + }.getOrElse((0, 0)) + + /** + * Writes progress to db. + * @param current - current retrieved segment + * @param total - total segment count + */ + private def writeProgress(current: Int, total: Int): Unit = { + val buffer: ByteBuffer = ByteBuffer.allocate(8) + buffer.putInt(current) + buffer.putInt(total) + historyStorage.insert(Array((utxoSetScanProgressKey, buffer.array)), Array.empty[BlockSection]) + } + + /** + * Send deserialized AVL subtrees to wallet for scanning. + * @param wallet - wallet to send to + * @param current - current retrieved segment + */ + private def sendBufferToWallet(wallet: ErgoWallet, current: Int): Unit = { + wallet.scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunkBuffer, current, MainnetTotal)) + writeProgress(current, MainnetTotal) + chunkBuffer.clear() + } + + private def run(): Unit = { + + var (current, total) = readProgress() + if(total == 0 || // scan should not start yet, still syncing + current == MainnetTotal) // scan already done + return + + val (state, wallet) = Await.result( + (nodeView ? GetDataFromCurrentView[UtxoState, (UtxoState, ErgoWallet)](x => (x.state, x.vault))) + .mapTo[(UtxoState, ErgoWallet)], + duration + ) + + val initialized: Boolean = Await.result(wallet.getWalletStatus.map(_.initialized), duration) + if(!initialized) // wallet is not initialized + return + + log.info(s"Starting UTXO set snapshot scan for $total chunks") + + state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current, MainnetManifestDepth) { subtree => + current += 1 + + chunkBuffer += (( + bytesToId(subtree.id), + subtree.leafValues.par.flatMap(ErgoBoxSerializer.parseBytesTry(_).toOption).toArray + )) + + if(chunkBuffer.size == 32) { + sendBufferToWallet(wallet, current) + } + } + + // flush remaining data, if any + if(chunkBuffer.nonEmpty) { + sendBufferToWallet(wallet, current) + } + + if(current == total) { + log.info(s"Successfully scanned $total Utxo set subtrees") + // send newest block to wallet, if blocks were applied since scan began it will go back to scan them + wallet.scanPersistent(history.bestFullBlockOpt.get) + }else { + log.error(s"Inconsistent Utxo set scan state: $current scanned subtrees out of $total") + } + + } + + override def receive: Receive = { + case InitializeUtxoSetScannerWithHistory(history: ErgoHistory) => + this.history = history + run() + case StartUtxoSetScan(rescan: Boolean) => + if(readProgress()._1 == 0 || // + rescan) // start over UTXO set scan + writeProgress(0, MainnetTotal) + run() + } + + override def preStart(): Unit = { + context.system.eventStream.subscribe(self, classOf[InitializeUtxoSetScannerWithHistory]) + context.system.eventStream.subscribe(self, classOf[StartUtxoSetScan]) + } + +} + +object UtxoSetScanner { + + /** + * Initialize UTXO set scanner with database and try continuing scan if possible + * @param history - database handle + */ + case class InitializeUtxoSetScannerWithHistory(history: ErgoHistory) + + /** + * Start scanning UTXO set, or continue if the scan was interrupted, or start over if rescan = true + * @param rescan - whether to start over or just continue scan + */ + case class StartUtxoSetScan(rescan: Boolean) + + /** + * Number of subtrees to divide AVL tree to + */ + private val MainnetTotal: Int = math.pow(2, ManifestSerializer.MainnetManifestDepth).toInt + + private val utxoSetScanProgressKey: ByteArrayWrapper = + ByteArrayWrapper(Blake2b256.hash("scanned chunk")) + + def apply(nodeView: ActorRef)(implicit system: ActorSystem): ActorRef = + system.actorOf(Props.create(classOf[UtxoSetScanner], nodeView)) +} diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala index 54c0808eb0..4b95f5d147 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWallet.scala @@ -1,8 +1,9 @@ package org.ergoplatform.nodeView.wallet import akka.actor.{ActorRef, ActorSystem} +import akka.pattern.ask import org.ergoplatform.modifiers.mempool.ErgoTransaction -import org.ergoplatform.modifiers.{ErgoFullBlock, BlockSection} +import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock} import org.ergoplatform.nodeView.history.ErgoHistoryReader import org.ergoplatform.nodeView.state.ErgoState import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages._ @@ -11,6 +12,9 @@ import org.ergoplatform.wallet.boxes.{ReemissionData, ReplaceCompactCollectBoxSe import org.ergoplatform.core.VersionTag import scorex.util.ScorexLogging +import java.util.concurrent.TimeUnit +import scala.concurrent.Await +import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, parameters: Parameters) @@ -36,6 +40,13 @@ class ErgoWallet(historyReader: ErgoHistoryReader, settings: ErgoSettings, param override val walletActor: ActorRef = ErgoWalletActor(settings, parameters, new ErgoWalletServiceImpl(settings), boxSelector, historyReader) + private val duration: Duration = Duration.create(10, TimeUnit.SECONDS) + + def scanUtxoSnapshot(msg: ScanBoxesFromUtxoSnapshot): ErgoWallet = { + Await.result(walletActor ? msg, duration) + this + } + def scanOffchain(tx: ErgoTransaction): ErgoWallet = { walletActor ! ScanOffChain(tx) this diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala index c65599f212..fe356e10f4 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala @@ -17,6 +17,7 @@ import org.ergoplatform.wallet.interface4j.SecretString import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages._ import org.ergoplatform._ import org.ergoplatform.core.VersionTag +import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScan import org.ergoplatform.utils.ScorexEncoding import scorex.util.ScorexLogging import scala.concurrent.duration._ @@ -230,9 +231,24 @@ class ErgoWalletActor(settings: ErgoSettings, ) context.become(loadedWallet(newState)) + case ScanBoxesFromUtxoSnapshot(subtrees, current, total) => + val newState = subtrees.zipWithIndex.foldLeft(state) { case (accState, ((id, boxes), i)) => + val chunk = current - subtrees.size + i + 1 + ergoWalletService.scanSnapshotChunk(accState, boxes, id, settings.walletSettings.dustLimit) match { + case Failure(ex) => + val errorMsg = s"Failed to scan ${boxes.length} boxes in chunk $chunk / $total: ${ex.getMessage}" + accState.copy(error = Some(errorMsg)) + case Success(updatedState) => + log.info(s"Successfully scanned ${boxes.length} boxes in chunk $chunk / $total") + updatedState + } + } + context.become(loadedWallet(newState)) + sender() ! "ok" + // rescan=true means we serve a user request for rescan from arbitrary height case ScanInThePast(blockHeight, rescan) => - val nextBlockHeight = state.expectedNextBlockHeight(blockHeight, settings.nodeSettings.isFullBlocksPruned) + val nextBlockHeight = state.expectedNextBlockHeight(historyReader.readMinimalFullBlockHeight(), settings.nodeSettings.isFullBlocksPruned) if (nextBlockHeight == blockHeight || rescan) { val newState = historyReader.bestFullBlockAt(blockHeight) match { @@ -262,7 +278,7 @@ class ErgoWalletActor(settings: ErgoSettings, //scan block transactions case ScanOnChain(newBlock) => if (state.secretIsSet(settings.walletSettings.testMnemonic)) { // scan blocks only if wallet is initialized - val nextBlockHeight = state.expectedNextBlockHeight(newBlock.height, settings.nodeSettings.isFullBlocksPruned) + val nextBlockHeight = state.expectedNextBlockHeight(historyReader.readMinimalFullBlockHeight(), settings.nodeSettings.isFullBlocksPruned) if (nextBlockHeight == newBlock.height) { log.info(s"Wallet is going to scan a block ${newBlock.id} on chain at height ${newBlock.height}") val newState = @@ -277,7 +293,7 @@ class ErgoWalletActor(settings: ErgoSettings, context.become(loadedWallet(newState)) } else if (nextBlockHeight < newBlock.height) { log.warn(s"Wallet: skipped blocks found starting from $nextBlockHeight, going back to scan them") - self ! ScanInThePast(nextBlockHeight, false) + self ! ScanInThePast(nextBlockHeight, rescan = false) } else { log.warn(s"Wallet: block in the past reported at ${newBlock.height}, blockId: ${newBlock.id}") } @@ -340,9 +356,13 @@ class ErgoWalletActor(settings: ErgoSettings, log.info(s"Rescanning the wallet from height: $fromHeight") ergoWalletService.recreateRegistry(state, settings) match { case Success(newState) => - context.become(loadedWallet(newState.copy(rescanInProgress = true))) - val heightToScanFrom = Math.min(newState.fullHeight, fromHeight) - self ! ScanInThePast(heightToScanFrom, rescan = true) + if(settings.nodeSettings.utxoSettings.utxoBootstrap) { + context.system.eventStream.publish(StartUtxoSetScan(true)) + }else { + context.become(loadedWallet(newState.copy(rescanInProgress = true))) + val heightToScanFrom = Math.min(newState.fullHeight, fromHeight) + self ! ScanInThePast(heightToScanFrom, rescan = true) + } sender() ! Success(()) case f@Failure(t) => log.error("Error during rescan attempt: ", t) diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActorMessages.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActorMessages.scala index c1f12a4941..37685035e9 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActorMessages.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActorMessages.scala @@ -17,6 +17,7 @@ import org.ergoplatform.core.VersionTag import scorex.util.ModifierId import sigmastate.Values.SigmaBoolean import sigmastate.crypto.DLogProtocol.{DLogProverInput, ProveDlog} +import scala.collection.mutable.ArrayBuffer import scala.util.Try /** @@ -36,6 +37,14 @@ object ErgoWalletActorMessages { // Publicly available signals for the wallet actor + /** + * Scan AVL subtrees containing UTXOs + * @param subtrees - AVL subtrees to scan + * @param current - current number of last subtree + * @param total - total number of subtrees + */ + final case class ScanBoxesFromUtxoSnapshot(subtrees: ArrayBuffer[(ModifierId,Array[ErgoBox])], current: Int, total: Int) + /** * Command to scan offchain transaction * diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletReader.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletReader.scala index ca8fa867d0..04c4648ade 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletReader.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletReader.scala @@ -29,7 +29,7 @@ trait ErgoWalletReader extends NodeViewComponent { val walletActor: ActorRef - private implicit val timeout: Timeout = Timeout(60, TimeUnit.SECONDS) + protected implicit val timeout: Timeout = Timeout(60, TimeUnit.SECONDS) /** Returns the Future generated mnemonic phrase. * @param pass storage encription password diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala index b84402e288..51fdecc991 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletService.scala @@ -220,6 +220,16 @@ trait ErgoWalletService { */ def scanBlockUpdate(state: ErgoWalletState, block: ErgoFullBlock, dustLimit: Option[Long]): Try[ErgoWalletState] + /** + * Scan boxes extracted from Utxo set subtree + * @param state - current wallet state + * @param boxes - box array to scan + * @param subtreeId - id of Utxo set subtree (used instead of blockId as version id) + * @param dustLimit - boxes with value smaller than dustLimit are disregarded in wallet scan logic + * @return new wallet state + */ + def scanSnapshotChunk(state: ErgoWalletState, boxes: Array[ErgoBox], subtreeId: ModifierId, dustLimit: Option[Long]): Try[ErgoWalletState] + /** * Sign a transaction */ @@ -595,6 +605,22 @@ class ErgoWalletServiceImpl(override val ergoSettings: ErgoSettings) extends Erg state.copy(registry = reg, offChainRegistry = offReg, outputsFilter = Some(updatedOutputsFilter)) } + override def scanSnapshotChunk(state: ErgoWalletState, + boxes: Array[ErgoBox], + subtreeId: ModifierId, + dustLimit: Option[Long]): Try[ErgoWalletState] = + WalletScanLogic.scanSnapshotBoxes( + state.registry, + state.offChainRegistry, + state.walletVars, + boxes, + subtreeId, + state.outputsFilter, + dustLimit, + ergoSettings.walletSettings.walletProfile).map { case (reg, offReg, updatedOutputsFilter) => + state.copy(registry = reg, offChainRegistry = offReg, outputsFilter = Some(updatedOutputsFilter)) + } + override def updateUtxoState(state: ErgoWalletState): ErgoWalletState = { (state.mempoolReaderOpt, state.stateReaderOpt) match { case (Some(mr), Some(sr)) => diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletState.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletState.scala index 03fe2cc98d..87ab34f681 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletState.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletState.scala @@ -101,7 +101,7 @@ case class ErgoWalletState( } // expected height of a next block when the wallet is receiving a new block with the height blockHeight - def expectedNextBlockHeight(blockHeight: Height, isFullBlocksPruned: Boolean): Height = { + def expectedNextBlockHeight(minimalFullBlockHeight: Height, isFullBlocksPruned: Boolean): Height = { val walletHeight = getWalletHeight if (!isFullBlocksPruned) { // Node has all the full blocks and applies them sequentially @@ -109,7 +109,7 @@ case class ErgoWalletState( } else { // Node has pruned blockchain if (walletHeight == 0) { - blockHeight // todo: should be height of first non-pruned block + minimalFullBlockHeight // height of first non-pruned block } else { walletHeight + 1 } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala index 68eb204d9c..1c609128bc 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala @@ -1,6 +1,7 @@ package org.ergoplatform.nodeView.wallet import com.google.common.hash.BloomFilter +import org.ergoplatform.ErgoBox import org.ergoplatform.modifiers.ErgoFullBlock import org.ergoplatform.modifiers.mempool.ErgoTransaction import org.ergoplatform.nodeView.wallet.IdUtils.{EncodedBoxId, encodedBoxId} @@ -165,7 +166,51 @@ object WalletScanLogic extends ScorexLogging { } // function effects: updating registry and offchainRegistry datasets - registry.updateOnBlock(scanRes, blockId, height) + updateRegistryAndOffchain(registry, offChainRegistry, outputsFilter, scanRes, blockId, height) + } + + def scanSnapshotBoxes(registry: WalletRegistry, + offChainRegistry: OffChainRegistry, + walletVars: WalletVars, + boxes: Array[ErgoBox], + subtreeId: ModifierId, + cachedOutputsFilter: Option[BloomFilter[Array[Byte]]], + dustLimit: Option[Long], + walletProfile: WalletProfile): Try[(WalletRegistry, OffChainRegistry, BloomFilter[Array[Byte]])] = { + + // Take unspent wallet outputs Bloom Filter from cache + // or recreate it from unspent outputs stored in the database + val outputsFilter = cachedOutputsFilter.getOrElse { + val bf = WalletCache.emptyFilter(walletProfile.outputsFilterSize) + registry.allUnspentBoxes().foreach(tb => bf.put(tb.box.id)) + bf + } + + // extract wallet- (and external scans) related outputs + val myOutputs = boxes.flatMap { box => + filterWalletOutput(box, Some(box.creationHeight), walletVars, dustLimit) + } + + // add extracted outputs to the filter + myOutputs.foreach { out => + outputsFilter.put(out.box.id) + } + + val scanRes = ScanResults(myOutputs, Seq.empty, Seq.empty) + + /** Pass subtreeId as blockId; set height to 0 so when UTXO set scan is finished normal wallet scan + * will start with the first non-pruned block (see [[ErgoWalletState.expectedNextBlockHeight]]) + */ + updateRegistryAndOffchain(registry, offChainRegistry, outputsFilter, scanRes, subtreeId, 0) + } + + def updateRegistryAndOffchain(registry: WalletRegistry, + offChainRegistry: OffChainRegistry, + outputsFilter: BloomFilter[Array[Byte]], + scanRes: ScanResults, + versionId: ModifierId, + height: Int): Try[(WalletRegistry, OffChainRegistry, BloomFilter[Array[Byte]])] = + registry.updateOnBlock(scanRes, versionId, height) .map { _ => //data needed to update the offchain-registry val walletUnspent = registry.walletUnspentBoxes() @@ -174,8 +219,6 @@ object WalletScanLogic extends ScorexLogging { (registry, updatedOffchainRegistry, outputsFilter) } - } - /** * Extracts all outputs which contain tracked bytes from the given transaction. @@ -183,79 +226,82 @@ object WalletScanLogic extends ScorexLogging { def extractWalletOutputs(tx: ErgoTransaction, inclusionHeight: Option[Int], walletVars: WalletVars, - dustLimit: Option[Long]): Seq[TrackedBox] = { + dustLimit: Option[Long]): Seq[TrackedBox] = + tx.outputs.flatMap(filterWalletOutput(_, inclusionHeight, walletVars, dustLimit)) + + def filterWalletOutput(box: ErgoBox, + inclusionHeight: Option[Int], + walletVars: WalletVars, + dustLimit: Option[Long]): Option[TrackedBox] = { val trackedBytes: Seq[Array[Byte]] = walletVars.trackedBytes val miningScriptsBytes: Seq[Array[Byte]] = walletVars.miningScriptsBytes val externalScans: Seq[Scan] = walletVars.externalScans - tx.outputs.flatMap { bx => - - // First, we check apps triggered by the tx output - val appsTriggered = - externalScans - .filter(_.trackingRule.filter(bx)) - .map(app => app.scanId -> app.walletInteraction) + // First, we check apps triggered by the tx output + val appsTriggered = + externalScans + .filter(_.trackingRule.filter(box)) + .map(app => app.scanId -> app.walletInteraction) - val boxScript = bx.propositionBytes + val boxScript = box.propositionBytes - // then check whether Bloom filter built on top of payment & mining scripts of the p2pk-wallet - val statuses: Set[ScanId] = if (walletVars.scriptsFilter.mightContain(boxScript)) { + // then check whether Bloom filter built on top of payment & mining scripts of the p2pk-wallet + val statuses: Set[ScanId] = if (walletVars.scriptsFilter.mightContain(boxScript)) { - // first, we are checking mining script - val miningIncomeTriggered = miningScriptsBytes.exists(ms => boxScript.sameElements(ms)) + // first, we are checking mining script + val miningIncomeTriggered = miningScriptsBytes.exists(ms => boxScript.sameElements(ms)) - val prePaymentStatuses = if (miningIncomeTriggered) { - val miningStatus: (ScanId, ScanWalletInteraction.Value) = if (walletVars.settings.miningRewardDelay > 0) { - MiningScanId -> ScanWalletInteraction.Off // scripts are different, so off is kinda overkill - } else { - //tweak for tests - PaymentsScanId -> ScanWalletInteraction.Off - } - appsTriggered :+ miningStatus + val prePaymentStatuses = if (miningIncomeTriggered) { + val miningStatus: (ScanId, ScanWalletInteraction.Value) = if (walletVars.settings.miningRewardDelay > 0) { + MiningScanId -> ScanWalletInteraction.Off // scripts are different, so off is kinda overkill } else { - appsTriggered + //tweak for tests + PaymentsScanId -> ScanWalletInteraction.Off } + appsTriggered :+ miningStatus + } else { + appsTriggered + } - if (prePaymentStatuses.nonEmpty && - !prePaymentStatuses.exists(t => ScanWalletInteraction.interactingWithWallet(t._2))) { - // if other scans intercept the box, and the scans are not sharing the box, - // then the box is not being tracked by the p2pk-wallet - prePaymentStatuses.map(_._1).toSet - } else { - //check whether payment is triggered (Bloom filter has false positives) - val paymentsTriggered = trackedBytes.exists(bs => boxScript.sameElements(bs)) - - val otherIds = prePaymentStatuses.map(_._1).toSet - if (paymentsTriggered) { - Set(PaymentsScanId) ++ otherIds - } else { - otherIds - } - } + if (prePaymentStatuses.nonEmpty && + !prePaymentStatuses.exists(t => ScanWalletInteraction.interactingWithWallet(t._2))) { + // if other scans intercept the box, and the scans are not sharing the box, + // then the box is not being tracked by the p2pk-wallet + prePaymentStatuses.map(_._1).toSet } else { - val appScans = appsTriggered.map(_._1).toSet + //check whether payment is triggered (Bloom filter has false positives) + val paymentsTriggered = trackedBytes.exists(bs => boxScript.sameElements(bs)) - // Add p2pk-wallet if there's a scan enforcing that - if (appsTriggered.exists(_._2 == ScanWalletInteraction.Forced)) { - appScans ++ Set(PaymentsScanId) + val otherIds = prePaymentStatuses.map(_._1).toSet + if (paymentsTriggered) { + Set(PaymentsScanId) ++ otherIds } else { - appScans + otherIds } } + } else { + val appScans = appsTriggered.map(_._1).toSet - if (statuses.nonEmpty) { - if (dustLimit.exists(bx.value <= _)){ - // filter out boxes with value that is considered dust - None - } else { - val tb = TrackedBox(tx.id, bx.index, inclusionHeight, None, None, bx, statuses) - log.debug("New tracked box: " + tb.boxId, " scans: " + tb.scans) - Some(tb) - } + // Add p2pk-wallet if there's a scan enforcing that + if (appsTriggered.exists(_._2 == ScanWalletInteraction.Forced)) { + appScans ++ Set(PaymentsScanId) } else { + appScans + } + } + + if (statuses.nonEmpty) { + if (dustLimit.exists(box.value <= _)) { + // filter out boxes with value that is considered dust None + } else { + val tb = TrackedBox(box.transactionId, box.index, inclusionHeight, None, None, box, statuses) + log.debug("New tracked box: " + tb.boxId, " scans: " + tb.scans) + Some(tb) } + } else { + None } } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/persistence/WalletRegistry.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/persistence/WalletRegistry.scala index f3e7deba48..4039c6ea37 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/persistence/WalletRegistry.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/persistence/WalletRegistry.scala @@ -269,7 +269,8 @@ class WalletRegistry(private val store: LDBVersionedStore)(ws: WalletSettings) e // and update wallet digest updateDigest(bag3) { case WalletDigest(height, wBalance, wTokensSeq) => - if (height + 1 != blockHeight) { + val isUtxoSnapshotScan = height == 0 && blockHeight == 0 + if (height + 1 != blockHeight && !isUtxoSnapshotScan) { log.error(s"Blocks were skipped during wallet scanning, from $height until $blockHeight") } val spentWalletBoxes = spentBoxesWithTx.map(_._2).filter(_.scans.contains(PaymentsScanId))