diff --git a/README.md b/README.md index 434aefabdc..5f462ca12c 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ To run specific Ergo version `` as a service with custom config `/path/ -e MAX_HEAP=3G \ ergoplatform/ergo: -- -c /etc/myergo.conf -Available versions can be found on [Ergo Docker image page](https://hub.docker.com/r/ergoplatform/ergo/tags), for example, `v4.0.41`. +Available versions can be found on [Ergo Docker image page](https://hub.docker.com/r/ergoplatform/ergo/tags), for example, `v4.0.42`. This will connect to the Ergo mainnet or testnet following your configuration passed in `myergo.conf` and network flag `--`. Every default config value would be overwritten with corresponding value in `myergo.conf`. `MAX_HEAP` variable can be used to control how much memory can the node consume. diff --git a/benchmarks/src/test/scala/org/ergoplatform/nodeView/mempool/ErgoMemPoolBenchmark.scala b/benchmarks/src/test/scala/org/ergoplatform/nodeView/mempool/ErgoMemPoolBenchmark.scala index c010b00e51..31cb65386e 100644 --- a/benchmarks/src/test/scala/org/ergoplatform/nodeView/mempool/ErgoMemPoolBenchmark.scala +++ b/benchmarks/src/test/scala/org/ergoplatform/nodeView/mempool/ErgoMemPoolBenchmark.scala @@ -43,7 +43,7 @@ object ErgoMemPoolBenchmark private def bench(txsInIncomeOrder: Seq[ErgoTransaction]): Unit = { var pool = ErgoMemPool.empty(settings) - txsInIncomeOrder.foreach(tx => pool = pool.put(UnconfirmedTransaction(tx)).get) + txsInIncomeOrder.foreach(tx => pool = pool.put(UnconfirmedTransaction(tx, None)).get) } performance of "ErgoMemPool awaiting" in { diff --git a/benchmarks/src/test/scala/org/ergoplatform/nodeView/mempool/MempoolPerformanceBench.scala b/benchmarks/src/test/scala/org/ergoplatform/nodeView/mempool/MempoolPerformanceBench.scala index c428ad00b4..c15db166f0 100644 --- a/benchmarks/src/test/scala/org/ergoplatform/nodeView/mempool/MempoolPerformanceBench.scala +++ b/benchmarks/src/test/scala/org/ergoplatform/nodeView/mempool/MempoolPerformanceBench.scala @@ -14,5 +14,6 @@ class MempoolPerformanceBench extends AnyPropSpec override val memPool: ErgoMemPool = ErgoMemPool.empty(settings) override val memPoolGenerator: Gen[ErgoMemPool] = emptyMemPoolGen override val transactionGenerator: Gen[ErgoTransaction] = invalidErgoTransactionGen - override val unconfirmedTxGenerator: Gen[UnconfirmedTransaction] = invalidErgoTransactionGen.map(UnconfirmedTransaction.apply) + override val unconfirmedTxGenerator: Gen[UnconfirmedTransaction] = + invalidErgoTransactionGen.map(tx => UnconfirmedTransaction(tx, None)) } diff --git a/src/main/resources/api/openapi.yaml b/src/main/resources/api/openapi.yaml index bf849f6802..c8403d3116 100644 --- a/src/main/resources/api/openapi.yaml +++ b/src/main/resources/api/openapi.yaml @@ -1,7 +1,7 @@ openapi: "3.0.2" info: - version: "4.0.41" + version: "4.0.42" title: Ergo Node API description: API docs for Ergo Node. Models are shared between all Ergo products contact: diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 3483105c30..d4898ecf16 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -400,7 +400,7 @@ scorex { nodeName = "ergo-node" # Network protocol version to be sent in handshakes - appVersion = 4.0.41 + appVersion = 4.0.42 # Network agent name. May contain information about client code # stack, starting from core code-base up to the end graphical interface. diff --git a/src/main/resources/mainnet.conf b/src/main/resources/mainnet.conf index bc13991a3c..743454a1c7 100644 --- a/src/main/resources/mainnet.conf +++ b/src/main/resources/mainnet.conf @@ -49,15 +49,15 @@ ergo { # The node still applying transactions to UTXO set and so checks UTXO set digests for each block. # Block at checkpoint height is to be checked against expected one. checkpoint = { - height = 823877 - blockId = "cb5242533e727bf73989ef3b286d031abf13b72b3145a6e06fa728eb0f7cb658" + height = 829020 + blockId = "5c311a0c62cc541c9a028b7e66bf3685287edf6d7699b46eb982dd5af008bb55" } # List with hex-encoded identifiers of transactions banned from getting into memory pool blacklistedTransactions = [] # maximum cost of transaction for it to be propagated - maxTransactionCost = 5000000 + maxTransactionCost = 4900000 } } @@ -65,7 +65,7 @@ scorex { network { magicBytes = [1, 0, 2, 4] bindAddress = "0.0.0.0:9030" - nodeName = "ergo-mainnet-4.0.41" + nodeName = "ergo-mainnet-4.0.42" nodeName = ${?NODENAME} knownPeers = [ "213.239.193.208:9030", diff --git a/src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala b/src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala index 4f0a6a4637..b93d64c21e 100644 --- a/src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala +++ b/src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala @@ -8,7 +8,7 @@ import org.ergoplatform.{ErgoBox, ErgoLikeContext, ErgoLikeTransaction, JsonCode import org.ergoplatform.http.api.ApiEncoderOption.Detalization import org.ergoplatform.ErgoBox.RegisterId import org.ergoplatform.mining.{groupElemFromBytes, groupElemToBytes} -import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction, UnsignedErgoTransaction} +import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnsignedErgoTransaction} import org.ergoplatform.nodeView.history.ErgoHistory.Difficulty import org.ergoplatform.settings.ErgoAlgos import org.ergoplatform.nodeView.wallet.persistence.WalletDigest @@ -195,26 +195,6 @@ trait ApiCodecs extends JsonCodecs { } yield ErgoTransaction(ergoLikeTx) } - // We are not using this encoder for now, but may use in future - implicit val unconfirmedTxEncoder: Encoder[UnconfirmedTransaction] = { unconfirmedTx => - Json.obj( - "transaction" -> transactionEncoder(unconfirmedTx.transaction), - "lastCost" -> unconfirmedTx.lastCost.asJson, - "createdTime" -> unconfirmedTx.createdTime.asJson, - "lastCheckedTime" -> unconfirmedTx.lastCheckedTime.asJson - ) - } - - // We are not using this decoder for now, but may use in future - implicit val unconfirmedTxDecoder: Decoder[UnconfirmedTransaction] = { cursor => - for { - tx <- transactionDecoder(cursor) - lastCost <- cursor.downField("lastCost").as[Option[Int]] - createdTime <- cursor.downField("createdTime").as[Long] - lastCheckedTime <- cursor.downField("lastCheckedTime").as[Long] - } yield UnconfirmedTransaction(tx, lastCost, createdTime, lastCheckedTime, Some(tx.bytes)) - } - implicit val sigmaBooleanEncoder: Encoder[SigmaBoolean] = { diff --git a/src/main/scala/org/ergoplatform/http/api/ErgoBaseApiRoute.scala b/src/main/scala/org/ergoplatform/http/api/ErgoBaseApiRoute.scala index d88371ea20..97949231c4 100644 --- a/src/main/scala/org/ergoplatform/http/api/ErgoBaseApiRoute.scala +++ b/src/main/scala/org/ergoplatform/http/api/ErgoBaseApiRoute.scala @@ -49,10 +49,10 @@ trait ErgoBaseApiRoute extends ApiRoute with ApiCodecs { (nodeViewActorRef ? LocallyGeneratedTransaction(unconfirmedTx)) .mapTo[ProcessingOutcome] .flatMap { - case Accepted => Future.successful(unconfirmedTx.transaction.id) - case DoubleSpendingLoser(_) => Future.failed(new IllegalArgumentException("Double spending attempt")) - case Declined(ex) => Future.failed(ex) - case Invalidated(ex) => Future.failed(ex) + case _: Accepted => Future.successful(unconfirmedTx.transaction.id) + case _: DoubleSpendingLoser => Future.failed(new IllegalArgumentException("Double spending attempt")) + case d: Declined => Future.failed(d.e) + case i: Invalidated => Future.failed(i.e) } completeOrRecoverWith(resultFuture) { ex => ApiError.BadRequest(ex.getMessage) @@ -78,9 +78,9 @@ trait ErgoBaseApiRoute extends ApiRoute with ApiCodecs { val maxTxCost = ergoSettings.nodeSettings.maxTransactionCost utxo.withMempool(mp) .validateWithCost(tx, maxTxCost) - .map(cost => UnconfirmedTransaction(tx, Some(cost), now, now, bytes)) + .map(cost => UnconfirmedTransaction(tx, Some(cost), now, now, bytes, source = None)) case _ => - tx.statelessValidity().map(_ => UnconfirmedTransaction(tx, None, now, now, bytes)) + tx.statelessValidity().map(_ => UnconfirmedTransaction(tx, None, now, now, bytes, source = None)) } } diff --git a/src/main/scala/org/ergoplatform/http/api/WalletApiRoute.scala b/src/main/scala/org/ergoplatform/http/api/WalletApiRoute.scala index 5c586053b4..256034fb42 100644 --- a/src/main/scala/org/ergoplatform/http/api/WalletApiRoute.scala +++ b/src/main/scala/org/ergoplatform/http/api/WalletApiRoute.scala @@ -173,7 +173,7 @@ case class WalletApiRoute(readersHolder: ActorRef, requests, inputsRaw, dataInputsRaw, - tx => Future(Success(UnconfirmedTransaction(tx))), + tx => Future(Success(UnconfirmedTransaction(tx, source = None))), utx => ApiResponse(utx.transaction) ) } diff --git a/src/main/scala/org/ergoplatform/local/CleanupWorker.scala b/src/main/scala/org/ergoplatform/local/CleanupWorker.scala index 5692ee7b99..0110490eaa 100644 --- a/src/main/scala/org/ergoplatform/local/CleanupWorker.scala +++ b/src/main/scala/org/ergoplatform/local/CleanupWorker.scala @@ -12,6 +12,7 @@ import scorex.core.transaction.state.TransactionValidation import scorex.util.{ModifierId, ScorexLogging} import scala.annotation.tailrec +import scala.collection.mutable import scala.util.{Failure, Success} /** @@ -82,26 +83,28 @@ class CleanupWorker(nodeViewHolderRef: ActorRef, //internal loop function validating transactions, returns validated and invalidated transaction ids @tailrec def validationLoop(txs: Seq[UnconfirmedTransaction], - validated: Seq[UnconfirmedTransaction], - invalidated: Seq[ModifierId], - costAcc: Long): (Seq[UnconfirmedTransaction], Seq[ModifierId]) = { + validated: mutable.ArrayBuilder[UnconfirmedTransaction], + invalidated: mutable.ArrayBuilder[ModifierId], + costAcc: Long + ): (mutable.ArrayBuilder[UnconfirmedTransaction], mutable.ArrayBuilder[ModifierId]) = { txs match { case head :: tail if costAcc < CostLimit => state.validateWithCost(head.transaction, nodeSettings.maxTransactionCost) match { case Success(txCost) => - val updTx = head.onRecheck(txCost) - validationLoop(tail, validated :+ updTx, invalidated, txCost + costAcc) + val updTx = head.withCost(txCost) + validationLoop(tail, validated += updTx, invalidated, txCost + costAcc) case Failure(e) => val txId = head.id log.info(s"Transaction $txId invalidated: ${e.getMessage}") - validationLoop(tail, validated, invalidated :+ txId, head.lastCost.getOrElse(0) + costAcc) //add old cost + validationLoop(tail, validated, invalidated += txId, head.lastCost.getOrElse(0) + costAcc) //add old cost } case _ => validated -> invalidated } } - validationLoop(txsToValidate, Seq.empty, Seq.empty, 0L) + val res = validationLoop(txsToValidate, mutable.ArrayBuilder.make(), mutable.ArrayBuilder.make(), 0L) + wrapRefArray(res._1.result()) -> wrapRefArray(res._2.result()) } } diff --git a/src/main/scala/org/ergoplatform/modifiers/mempool/UnconfirmedTransaction.scala b/src/main/scala/org/ergoplatform/modifiers/mempool/UnconfirmedTransaction.scala index 5a72b221f1..e6bace01e4 100644 --- a/src/main/scala/org/ergoplatform/modifiers/mempool/UnconfirmedTransaction.scala +++ b/src/main/scala/org/ergoplatform/modifiers/mempool/UnconfirmedTransaction.scala @@ -1,13 +1,24 @@ package org.ergoplatform.modifiers.mempool +import scorex.core.network.ConnectedPeer import scorex.util.{ModifierId, ScorexLogging} - +/** + * Wrapper for unconfirmed transaction and corresponding data + * + * @param transaction - unconfirmed transaction + * @param lastCost - validation cost during last check + * @param createdTime - when transaction entered the pool + * @param lastCheckedTime - when last validity check was done + * @param transactionBytes - transaction bytes, to avoid serializations when we send it over the wire + * @param source - peer which delivered the transaction (None if transaction submitted via API) + */ case class UnconfirmedTransaction(transaction: ErgoTransaction, lastCost: Option[Int], createdTime: Long, lastCheckedTime: Long, - transactionBytes: Option[Array[Byte]]) + transactionBytes: Option[Array[Byte]], + source: Option[ConnectedPeer]) extends ScorexLogging { def id: ModifierId = transaction.id @@ -15,7 +26,7 @@ case class UnconfirmedTransaction(transaction: ErgoTransaction, /** * Updates cost and last checked time of unconfirmed transaction */ - def onRecheck(cost: Int): UnconfirmedTransaction = { + def withCost(cost: Int): UnconfirmedTransaction = { copy(lastCost = Some(cost), lastCheckedTime = System.currentTimeMillis()) } @@ -23,14 +34,14 @@ case class UnconfirmedTransaction(transaction: ErgoTransaction, object UnconfirmedTransaction { - def apply(tx: ErgoTransaction): UnconfirmedTransaction = { + def apply(tx: ErgoTransaction, source: Option[ConnectedPeer]): UnconfirmedTransaction = { val now = System.currentTimeMillis() - UnconfirmedTransaction(tx, None, now, now, Some(tx.bytes)) + UnconfirmedTransaction(tx, None, now, now, Some(tx.bytes), source) } - def apply(tx: ErgoTransaction, txBytes: Array[Byte]): UnconfirmedTransaction = { + def apply(tx: ErgoTransaction, txBytes: Array[Byte], source: Option[ConnectedPeer]): UnconfirmedTransaction = { val now = System.currentTimeMillis() - UnconfirmedTransaction(tx, None, now, now, Some(txBytes)) + UnconfirmedTransaction(tx, None, now, now, Some(txBytes), source) } } diff --git a/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala b/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala index b2484d292e..5eb81d490c 100644 --- a/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala +++ b/src/main/scala/org/ergoplatform/network/ErgoNodeViewSynchronizer.scala @@ -3,7 +3,7 @@ package org.ergoplatform.network import akka.actor.SupervisorStrategy.{Restart, Stop} import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorRef, ActorRefFactory, DeathPactException, OneForOneStrategy, Props} import org.ergoplatform.modifiers.history.header.Header -import org.ergoplatform.modifiers.mempool.{ErgoTransaction, ErgoTransactionSerializer, UnconfirmedTransaction} +import org.ergoplatform.modifiers.mempool.{ErgoTransactionSerializer, UnconfirmedTransaction} import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock} import org.ergoplatform.nodeView.history.{ErgoSyncInfoV1, ErgoSyncInfoV2} import org.ergoplatform.nodeView.history._ @@ -12,7 +12,7 @@ import org.ergoplatform.nodeView.ErgoNodeViewHolder.BlockAppliedTransactions import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoSyncInfo, ErgoSyncInfoMessageSpec} import org.ergoplatform.nodeView.mempool.{ErgoMemPool, ErgoMemPoolReader} import org.ergoplatform.settings.{Constants, ErgoSettings} -import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.{ChainIsHealthy, ChainIsStuck, GetNodeViewChanges, IsChainHealthy, ModifiersFromRemote, TransactionsFromRemote} +import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages._ import org.ergoplatform.nodeView.ErgoNodeViewHolder._ import scorex.core.consensus.{Equal, Fork, Nonsense, Older, Unknown, Younger} import scorex.core.network.ModifiersStatus.Requested @@ -33,6 +33,7 @@ import scorex.util.{ModifierId, ScorexLogging} import scorex.core.network.DeliveryTracker import scorex.core.network.peer.PenaltyType import scorex.core.transaction.state.TransactionValidation.TooHighCostError +import ErgoNodeViewSynchronizer.{IncomingTxInfo, TransactionProcessingCacheRecord} import scala.annotation.tailrec import scala.collection.mutable @@ -41,7 +42,8 @@ import scala.concurrent.duration._ import scala.util.{Failure, Random, Success} /** - * Tweaks on top of Scorex' NodeViewSynchronizer made to optimize Ergo network + * Contains most top-level logic for p2p networking, communicates with lower-level p2p code and other parts of the + * client application */ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, viewHolderRef: ActorRef, @@ -90,6 +92,28 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, // when we got last modifier, both unconfirmed transactions and block sections count private var lastModifierGotTime: Long = 0 + /** + * The node stops to accept transactions if declined table reaches this max size. It prevents spam attacks trying + * to bloat the table (or exhaust node's CPU) + */ + private val MaxDeclined = 400 + + /** + * No more than this number of unparsed transactions can be cached + */ + private val MaxProcessingTransactionsCacheSize = 50 + + /** + * Max cost of transactions we are going to process between blocks + */ + private val MempoolCostPerBlock = 12000000 + + /** + * Currently max transaction cost is higher but will be eventually cut down to this value + */ + private val OptimisticMaxTransactionCost = 2000000 + + /** * Dictionary (tx id -> checking time), which is storing transactions declined by the mempool, as mempool is not * storing this information. We keep declined transactions in the dictionary for few blocks just, as declined @@ -98,10 +122,60 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, private val declined = mutable.TreeMap[ModifierId, Long]() /** - * The node stops to accept transactions if declined table reaches this max size. It prevents spam attacks trying - * to bloat the table (or exhaust node's CPU) + * Counter which contains total cost of transactions entered mempool or rejected by it since last block processed. + * Used to avoid sudden spikes in load, limiting transactions processing time and make it comparable to block's + * processing time */ - private val MaxDeclined = 400 + private var interblockCost = IncomingTxInfo.empty() + + /** + * Cache which contains bytes of transactions we received but not parsed and processed yet + */ + private val txProcessingCache = mutable.Map[ModifierId, TransactionProcessingCacheRecord]() + + /** + * To be called when the node is synced and new block arrives, to reset transactions cost counter + */ + private def clearInterblockCost(): Unit = { + interblockCost = IncomingTxInfo.empty() + } + + /** + * To be called when the node is synced and new block arrives, to resume transaction bytes cache processing + */ + private def processFirstTxProcessingCacheRecord(): Unit = { + txProcessingCache.headOption.foreach { case (txId, processingCacheRecord) => + parseAndProcessTransaction(txId, processingCacheRecord.txBytes, processingCacheRecord.source) + txProcessingCache -= txId + } + } + + /** + * To be called when mempool reporting on finished transaction validation. + * This method adds validation cost to counter and send another + */ + private def processMempoolResult(processingResult: InitialTransactionCheckOutcome): Unit = { + val ReserveCostValue = 5000 + + val costOpt = processingResult.transaction.lastCost + if (costOpt.isEmpty) { + // should not be here, and so ReserveCostValue should not be used + log.warn("Cost is empty in processMempoolResult") + } + val cost = costOpt.getOrElse(ReserveCostValue) + val ng = processingResult match { + case _: FailedTransaction => interblockCost.copy(invalidatedCost = interblockCost.invalidatedCost + cost) + case _: SuccessfulTransaction => interblockCost.copy(acceptedCost = interblockCost.acceptedCost + cost) + case _: DeclinedTransaction => interblockCost.copy(declinedCost = interblockCost.declinedCost + cost) + } + + log.debug(s"Old global cost info: $interblockCost, new $ng, tx processing cache size: ${txProcessingCache.size}") + interblockCost = ng + + if (interblockCost.totalCost < MempoolCostPerBlock) { + processFirstTxProcessingCacheRecord() + } + } /** * Register periodic events @@ -540,9 +614,19 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, if (typeId == Transaction.ModifierTypeId) { // filter out transactions already in the mempool val notInThePool = requestedModifiers.filterKeys(id => !mp.contains(id)) - // parse all transactions not in the mempool and send them to node view holder - val parsed: Iterable[UnconfirmedTransaction] = parseTransactions(notInThePool, remote) - viewHolderRef ! TransactionsFromRemote(parsed) + val (toProcess, toPutIntoCache) = if (interblockCost.totalCost < MempoolCostPerBlock) { + // if we are within per-block limits, parse and process first transaction + (notInThePool.headOption, notInThePool.tail) + } else { + (None, notInThePool) + } + + toProcess.foreach { case (txId, txBytes) => + parseAndProcessTransaction(txId, txBytes, remote) + } + toPutIntoCache.foreach { case (txId, txBytes) => + txProcessingCache.put(txId, new TransactionProcessingCacheRecord(txBytes, remote)) + } } else { Constants.modifierSerializers.get(typeId) match { case Some(serializer: ScorexSerializer[BlockSection]@unchecked) => @@ -570,25 +654,24 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, } /** - * Parse transactions coming from remote, filtering out ones which are too big on the way + * Parse transaction coming from remote, filtering out immediately too big one, and send parsed transaction + * to mempool for processing */ - def parseTransactions(txs: Map[ModifierId, Array[Byte]], remote: ConnectedPeer): Iterable[UnconfirmedTransaction] = { - txs.flatMap{ case (id, bytes) => - if (bytes.length > settings.nodeSettings.maxTransactionSize) { - deliveryTracker.setInvalid(id, Transaction.ModifierTypeId) - penalizeMisbehavingPeer(remote) - log.warn(s"Transaction size ${bytes.length} from ${remote.toString} exceeds limit ${settings.nodeSettings.maxTransactionSize}") - None - } else { - ErgoTransactionSerializer.parseBytesTry(bytes) match { - case Success(tx) if id == tx.id => - Some(UnconfirmedTransaction(tx, bytes)) - case _ => - // Penalize peer and do nothing - it will be switched to correct state on CheckDelivery - penalizeMisbehavingPeer(remote) - log.warn(s"Failed to parse transaction with declared id ${encoder.encodeId(id)} from ${remote.toString}") - None - } + def parseAndProcessTransaction(id: ModifierId, bytes: Array[Byte], remote: ConnectedPeer): Unit = { + if (bytes.length > settings.nodeSettings.maxTransactionSize) { + deliveryTracker.setInvalid(id, Transaction.ModifierTypeId) + penalizeMisbehavingPeer(remote) + log.warn(s"Transaction size ${bytes.length} from ${remote.toString} " + + s"exceeds limit ${settings.nodeSettings.maxTransactionSize}") + } else { + ErgoTransactionSerializer.parseBytesTry(bytes) match { + case Success(tx) if id == tx.id => + val utx = UnconfirmedTransaction(tx, bytes, Some(remote)) + viewHolderRef ! TransactionFromRemote(utx) + case _ => + // Penalize peer and do nothing - it will be switched to correct state on CheckDelivery + penalizeMisbehavingPeer(remote) + log.warn(s"Failed to parse transaction with declared id ${encoder.encodeId(id)} from ${remote.toString}") } } } @@ -666,9 +749,10 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, // We download transactions only if following conditions met: def txAcceptanceFilter: Boolean = { settings.nodeSettings.stateType.holdsUtxoSet && // node holds UTXO set - hr.isHeadersChainSynced && // our chain is synced hr.headersHeight >= syncTracker.maxHeight().getOrElse(0) && // our best header is not worse than best around hr.fullBlockHeight == hr.headersHeight && // we have all the full blocks + interblockCost.totalCost <= MempoolCostPerBlock * 3 / 2 && // we can download some extra to fill cache + txProcessingCache.size <= MaxProcessingTransactionsCacheSize && // txs processing cache is not overfull declined.size < MaxDeclined // the node is not stormed by transactions is has to decline } @@ -686,7 +770,8 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, val notDeclined = notApplied.filter(id => !declined.contains(id)) log.info(s"Processing ${invData.ids.length} tx invs from $peer, " + s"${unknownMods.size} of them are unknown, requesting $notDeclined") - notDeclined + val txsToAsk = (MempoolCostPerBlock - interblockCost.totalCost) / OptimisticMaxTransactionCost + notDeclined.take(txsToAsk) } else { Seq.empty } @@ -868,7 +953,7 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, // helper method to clear declined transactions after some off, so the node may accept them again private def clearDeclined(): Unit = { - val clearTimeout = FiniteDuration(7, MINUTES) + val clearTimeout = FiniteDuration(10, MINUTES) val now = System.currentTimeMillis() val toRemove = declined.filter { case (_, time) => @@ -899,30 +984,39 @@ class ErgoNodeViewSynchronizer(networkControllerRef: ActorRef, broadcastInvForNewModifier(mod) if (mod.isInstanceOf[ErgoFullBlock]) { clearDeclined() + clearInterblockCost() + processFirstTxProcessingCacheRecord() // resume cache processing } - case SuccessfulTransaction(tx) => + case st@SuccessfulTransaction(utx) => + val tx = utx.transaction deliveryTracker.setHeld(tx.id, Transaction.ModifierTypeId) + processMempoolResult(st) broadcastModifierInv(tx) - case DeclinedTransaction(id: ModifierId) => - declined.put(id, System.currentTimeMillis()) - - case FailedTransaction(id, error, immediateFailure) => - if (immediateFailure) { - // penalize sender only in case transaction was invalidated at first validation. - deliveryTracker.setInvalid(id, Transaction.ModifierTypeId).foreach { peer => - error match { - case TooHighCostError(_) => - log.info(s"Penalize spamming peer $peer for too costly transaction $id") - penalizeSpammingPeer(peer) - case _ => - log.info(s"Penalize peer $peer for too costly transaction $id (reason: $error)") - penalizeMisbehavingPeer(peer) - } + case dt@DeclinedTransaction(utx: UnconfirmedTransaction) => + declined.put(utx.id, System.currentTimeMillis()) + processMempoolResult(dt) + + case ft@FailedTransaction(utx, error) => + val id = utx.id + processMempoolResult(ft) + + utx.source.foreach { peer => + // no need to call deliveryTracker.setInvalid, as mempool will consider invalidated tx in contains() + error match { + case TooHighCostError(_) => + log.info(s"Penalize spamming peer $peer for too costly transaction $id") + penalizeSpammingPeer(peer) + case _ => + log.info(s"Penalize peer $peer for too costly transaction $id (reason: $error)") + penalizeMisbehavingPeer(peer) } } + case FailedOnRecheckTransaction(_, _) => + // do nothing for now + case SyntacticallySuccessfulModifier(mod) => deliveryTracker.setHeld(mod.id, mod.modifierTypeId) @@ -1038,6 +1132,23 @@ object ErgoNodeViewSynchronizer { (implicit context: ActorRefFactory, ex: ExecutionContext): ActorRef = context.actorOf(props(networkControllerRef, viewHolderRef, syncInfoSpec, settings, timeProvider, syncTracker, deliveryTracker)) + /** + * Container for aggregated costs of accepted, declined or invalidated transactions. Can be used to track global + * state of total cost of transactions received (since last block processed), or per-peer state + */ + case class IncomingTxInfo(acceptedCost: Int, declinedCost: Int, invalidatedCost: Int) { + val totalCost: Int = acceptedCost + declinedCost + invalidatedCost + } + + object IncomingTxInfo { + def empty(): IncomingTxInfo = IncomingTxInfo(0, 0, 0) + } + + /** + * Transaction bytes and source peer to be recorded in a cache and processed later + */ + class TransactionProcessingCacheRecord(val txBytes: Array[Byte], val source: ConnectedPeer) + case object CheckModifiersToDownload object ReceivableMessages { @@ -1090,17 +1201,24 @@ object ErgoNodeViewSynchronizer { // hierarchy of events regarding modifiers application outcome trait ModificationOutcome extends NodeViewHolderEvent - /** - * @param immediateFailure - a flag indicating whether a transaction was invalid by the moment it was received. - */ - case class FailedTransaction(transactionId: ModifierId, error: Throwable, immediateFailure: Boolean) extends ModificationOutcome + trait InitialTransactionCheckOutcome extends ModificationOutcome { + val transaction: UnconfirmedTransaction + } - case class SuccessfulTransaction(transaction: ErgoTransaction) extends ModificationOutcome + case class FailedTransaction(transaction: UnconfirmedTransaction, error: Throwable) extends InitialTransactionCheckOutcome + + case class SuccessfulTransaction(transaction: UnconfirmedTransaction) extends InitialTransactionCheckOutcome /** * Transaction declined by the mempool (not permanently invalidated, so pool can accept it in future) */ - case class DeclinedTransaction(transactionId: ModifierId) extends ModificationOutcome + case class DeclinedTransaction(transaction: UnconfirmedTransaction) extends InitialTransactionCheckOutcome + + /** + * Transaction which was failed not immediately but after sitting for some time in the mempool or during block + * candidate generation + */ + case class FailedOnRecheckTransaction(id : ModifierId, error: Throwable) extends ModificationOutcome case class RecoverableFailedModification(modifier: BlockSection, error: Throwable) extends ModificationOutcome diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala index 3f72f24964..8b1dbf61b7 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoNodeViewHolder.scala @@ -258,21 +258,24 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti val tx = unconfirmedTx.transaction val (newPool, processingOutcome) = memoryPool().process(unconfirmedTx, minimalState()) processingOutcome match { - case ProcessingOutcome.Accepted => + case acc: ProcessingOutcome.Accepted => log.debug(s"Unconfirmed transaction $tx added to the memory pool") val newVault = vault().scanOffchain(tx) updateNodeView(updatedVault = Some(newVault), updatedMempool = Some(newPool)) - context.system.eventStream.publish(SuccessfulTransaction(tx)) - case ProcessingOutcome.Invalidated(e) => + context.system.eventStream.publish(SuccessfulTransaction(acc.tx)) + case i: ProcessingOutcome.Invalidated => + val e = i.e log.debug(s"Transaction $tx invalidated. Cause: ${e.getMessage}") updateNodeView(updatedMempool = Some(newPool)) - context.system.eventStream.publish(FailedTransaction(tx.id, e, immediateFailure = true)) - case ProcessingOutcome.DoubleSpendingLoser(winnerTxs) => // do nothing + context.system.eventStream.publish(FailedTransaction(unconfirmedTx.withCost(i.cost), e)) + case dbl: ProcessingOutcome.DoubleSpendingLoser => // do nothing + val winnerTxs = dbl.winnerTxIds log.debug(s"Transaction $tx declined, as other transactions $winnerTxs are paying more") - context.system.eventStream.publish(DeclinedTransaction(tx.id)) - case ProcessingOutcome.Declined(e) => // do nothing + context.system.eventStream.publish(DeclinedTransaction(unconfirmedTx.withCost(dbl.cost))) + case dcl: ProcessingOutcome.Declined => // do nothing + val e = dcl.e log.debug(s"Transaction $tx declined, reason: ${e.getMessage}") - context.system.eventStream.publish(DeclinedTransaction(tx.id)) + context.system.eventStream.publish(DeclinedTransaction(unconfirmedTx.withCost(dcl.cost))) } processingOutcome } @@ -349,7 +352,7 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti blocksApplied: Seq[BlockSection], memPool: ErgoMemPool, state: State): ErgoMemPool = { - val rolledBackTxs = blocksRemoved.flatMap(extractTransactions).map(UnconfirmedTransaction.apply) + val rolledBackTxs = blocksRemoved.flatMap(extractTransactions).map(tx => UnconfirmedTransaction(tx, None)) val appliedTxs = blocksApplied.flatMap(extractTransactions) context.system.eventStream.publish(BlockAppliedTransactions(appliedTxs.map(_.id))) memPool.putWithoutCheck(rolledBackTxs) @@ -582,8 +585,8 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti } protected def transactionsProcessing: Receive = { - case TransactionsFromRemote(unconfirmedTx) => - unconfirmedTx.foreach(txModify) + case TransactionFromRemote(unconfirmedTx) => + txModify(unconfirmedTx) case LocallyGeneratedTransaction(unconfirmedTx) => sender() ! txModify(unconfirmedTx) case RecheckedTransactions(unconfirmedTxs) => @@ -594,9 +597,9 @@ abstract class ErgoNodeViewHolder[State <: ErgoState[State]](settings: ErgoSetti case EliminateTransactions(ids) => val updatedPool = memoryPool().filter(unconfirmedTx => !ids.contains(unconfirmedTx.transaction.id)) updateNodeView(updatedMempool = Some(updatedPool)) + val e = new Exception("Became invalid") ids.foreach { id => - val e = new Exception("Became invalid") - context.system.eventStream.publish(FailedTransaction(id, e, immediateFailure = false)) + context.system.eventStream.publish(FailedOnRecheckTransaction(id, e)) } } @@ -662,9 +665,9 @@ object ErgoNodeViewHolder { case class LocallyGeneratedTransaction(tx: UnconfirmedTransaction) /** - * Wrapper for transactions cominng from P2P network + * Wrapper for transaction coming from P2P network */ - case class TransactionsFromRemote(unconfirmedTxs: Iterable[UnconfirmedTransaction]) + case class TransactionFromRemote(unconfirmedTx: UnconfirmedTransaction) /** * Wrapper for transactions which sit in mempool for long enough time, so `CleanWorker` is re-checking their diff --git a/src/main/scala/org/ergoplatform/nodeView/mempool/ErgoMemPool.scala b/src/main/scala/org/ergoplatform/nodeView/mempool/ErgoMemPool.scala index 4448551aca..fc6c5faf3d 100644 --- a/src/main/scala/org/ergoplatform/nodeView/mempool/ErgoMemPool.scala +++ b/src/main/scala/org/ergoplatform/nodeView/mempool/ErgoMemPool.scala @@ -148,7 +148,8 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool, // Check if transaction is double-spending inputs spent in the mempool. // If so, the new transacting is replacing older ones if it has bigger weight (fee/byte) than them on average. // Otherwise, the new transaction being rejected. - private def acceptIfNoDoubleSpend(unconfirmedTransaction: UnconfirmedTransaction): (ErgoMemPool, ProcessingOutcome) = { + private def acceptIfNoDoubleSpend(unconfirmedTransaction: UnconfirmedTransaction, + validationStartTime: Long): (ErgoMemPool, ProcessingOutcome) = { val tx = unconfirmedTransaction.transaction val doubleSpendingWtxs = tx.inputs.flatMap { inp => @@ -164,17 +165,19 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool, val doubleSpendingTxs = doubleSpendingWtxs.map(wtx => pool.orderedTransactions(wtx)).toSeq val p = pool.put(unconfirmedTransaction, feeF).remove(doubleSpendingTxs) val updPool = new ErgoMemPool(p, stats, sortingOption) - updPool -> ProcessingOutcome.Accepted + updPool -> new ProcessingOutcome.Accepted(unconfirmedTransaction, validationStartTime) } else { - this -> ProcessingOutcome.DoubleSpendingLoser(doubleSpendingWtxs.map(_.id)) + this -> new ProcessingOutcome.DoubleSpendingLoser(doubleSpendingWtxs.map(_.id), validationStartTime) } } else { val poolSizeLimit = nodeSettings.mempoolCapacity if (pool.size == poolSizeLimit && weighted(tx, feeF).weight <= pool.orderedTransactions.lastKey.weight) { - this -> ProcessingOutcome.Declined(new Exception("Transaction pays less than any other in the pool being full")) + val exc = new Exception("Transaction pays less than any other in the pool being full") + this -> new ProcessingOutcome.Declined(exc, validationStartTime) } else { - new ErgoMemPool(pool.put(unconfirmedTransaction, feeF), stats, sortingOption) -> ProcessingOutcome.Accepted + val updPool = new ErgoMemPool(pool.put(unconfirmedTransaction, feeF), stats, sortingOption) + updPool -> new ProcessingOutcome.Accepted(unconfirmedTransaction, validationStartTime) } } } @@ -182,9 +185,12 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool, def process(unconfirmedTx: UnconfirmedTransaction, state: ErgoState[_]): (ErgoMemPool, ProcessingOutcome) = { val tx = unconfirmedTx.transaction log.info(s"Processing mempool transaction: $tx") + val validationStartTime = System.currentTimeMillis() + val blacklistedTransactions = nodeSettings.blacklistedTransactions if(blacklistedTransactions.nonEmpty && blacklistedTransactions.contains(tx.id)) { - this.invalidate(unconfirmedTx) -> ProcessingOutcome.Invalidated(new Exception("blacklisted tx")) + val exc = new Exception("blacklisted tx") + this.invalidate(unconfirmedTx) -> new ProcessingOutcome.Invalidated(exc, validationStartTime) } else { val fee = extractFee(tx) val minFee = settings.nodeSettings.minimalFeeAmount @@ -199,33 +205,38 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool, val utxoWithPool = utxo.withUnconfirmedTransactions(getAll) if (tx.inputIds.forall(inputBoxId => utxoWithPool.boxById(inputBoxId).isDefined)) { utxoWithPool.validateWithCost(tx, Some(utxo.stateContext), costLimit, None) match { - case Success(cost) => acceptIfNoDoubleSpend(unconfirmedTx.onRecheck(cost)) - case Failure(ex) => this.invalidate(unconfirmedTx) -> ProcessingOutcome.Invalidated(ex) + case Success(cost) => + acceptIfNoDoubleSpend(unconfirmedTx.withCost(cost), validationStartTime) + case Failure(ex) => + this.invalidate(unconfirmedTx) -> new ProcessingOutcome.Invalidated(ex, validationStartTime) } } else { - this -> ProcessingOutcome.Declined(new Exception("not all utxos in place yet")) + val exc = new Exception("not all utxos in place yet") + this -> new ProcessingOutcome.Declined(exc, validationStartTime) } case validator: TransactionValidation => // transaction validation currently works only for UtxoState, so this branch currently // will not be triggered probably validator.validateWithCost(tx, costLimit) match { - case Success(cost) => acceptIfNoDoubleSpend(unconfirmedTx.onRecheck(cost)) - case Failure(ex) => this.invalidate(unconfirmedTx) -> ProcessingOutcome.Invalidated(ex) + case Success(cost) => + acceptIfNoDoubleSpend(unconfirmedTx.withCost(cost), validationStartTime) + case Failure(ex) => + this.invalidate(unconfirmedTx) -> new ProcessingOutcome.Invalidated(ex, validationStartTime) } case _ => // Accept transaction in case of "digest" state. Transactions are not downloaded in this mode from other // peers though, so such transactions can come from the local wallet only. - acceptIfNoDoubleSpend(unconfirmedTx) + acceptIfNoDoubleSpend(unconfirmedTx, validationStartTime) } } else { - this -> ProcessingOutcome.Declined( - new Exception(s"Pool can not accept transaction ${tx.id}, it is invalidated earlier or the pool is full")) + val exc = new Exception(s"Pool can not accept transaction ${tx.id}, it is invalidated earlier or the pool is full") + this -> new ProcessingOutcome.Declined(exc, validationStartTime) } } else { - this -> ProcessingOutcome.Declined( - new Exception(s"Min fee not met: ${minFee.toDouble / CoinsInOneErgo} ergs required, " + - s"${fee.toDouble / CoinsInOneErgo} ergs given") - ) + val exc = new Exception(s"Min fee not met: ${minFee.toDouble / CoinsInOneErgo} ergs required, " + + s"${fee.toDouble / CoinsInOneErgo} ergs given") + + this -> new ProcessingOutcome.Declined(exc, validationStartTime) } } } @@ -320,15 +331,51 @@ object ErgoMemPool extends ScorexLogging { } } + /** + * Root of possible mempool transaction validation result family + */ + sealed trait ProcessingOutcome { + /** + * Time when transaction validation was started + */ + protected val validationStartTime: Long - sealed trait ProcessingOutcome + /** + * We assume that validation ends when this processing result class is constructed + */ + private val validationEndTime: Long = System.currentTimeMillis() + + /** + * 5.0 JIT costing was designed in a way that 1000 cost units are roughly corresponding to 1 ms of 1 CPU core + * on commodity hardware (of 2021). So if we do not know the exact cost of transaction, we can estimate it by + * tracking validation time and then getting estimated validation cost by multiplying the time (in ms) by 1000 + */ + val costPerMs = 1000 + + /** + * Estimated validation cost, see comment for `costPerMs` + */ + def cost: Int = { + val timeDiff = validationEndTime - validationStartTime + if (timeDiff == 0) { + costPerMs + } else if (timeDiff > 1000000) { + Int.MaxValue // shouldn't be here, so this branch is mostly to have safe .toInt below + } else { + (timeDiff * costPerMs).toInt + } + } + } object ProcessingOutcome { /** * Object signalling that a transaction is accepted to the memory pool */ - case object Accepted extends ProcessingOutcome + class Accepted(val tx: UnconfirmedTransaction, + override protected val validationStartTime: Long) extends ProcessingOutcome { + override val cost: Int = tx.lastCost.getOrElse(super.cost) + } /** * Class signalling that a valid transaction was rejected as it is double-spending inputs of mempool transactions @@ -336,18 +383,21 @@ object ErgoMemPool extends ScorexLogging { * * @param winnerTxIds - identifiers of transactions won in replace-by-fee auction */ - case class DoubleSpendingLoser(winnerTxIds: Set[ModifierId]) extends ProcessingOutcome + class DoubleSpendingLoser(val winnerTxIds: Set[ModifierId], + override protected val validationStartTime: Long) extends ProcessingOutcome /** * Class signalling that a transaction declined from being accepted into the memory pool */ - case class Declined(e: Throwable) extends ProcessingOutcome + class Declined(val e: Throwable, + override protected val validationStartTime: Long) extends ProcessingOutcome /** * Class signalling that a transaction turned out to be invalid when checked in the mempool */ - case class Invalidated(e: Throwable) extends ProcessingOutcome + class Invalidated(val e: Throwable, + override protected val validationStartTime: Long) extends ProcessingOutcome } @@ -363,7 +413,8 @@ object ErgoMemPool extends ScorexLogging { case SortingOption.FeePerByte => log.info("Sorting mempool by fee-per-byte") case SortingOption.FeePerCycle => log.info("Sorting mempool by fee-per-cycle") } - new ErgoMemPool(OrderedTxPool.empty(settings), + new ErgoMemPool( + OrderedTxPool.empty(settings), MemPoolStatistics(System.currentTimeMillis(), 0, System.currentTimeMillis()), sortingOption )(settings) diff --git a/src/test/scala/org/ergoplatform/http/routes/TransactionApiRouteSpec.scala b/src/test/scala/org/ergoplatform/http/routes/TransactionApiRouteSpec.scala index bc2a172312..6d9b9aa354 100644 --- a/src/test/scala/org/ergoplatform/http/routes/TransactionApiRouteSpec.scala +++ b/src/test/scala/org/ergoplatform/http/routes/TransactionApiRouteSpec.scala @@ -42,7 +42,7 @@ class TransactionApiRouteSpec extends AnyFlatSpec val chainedRoute: Route = { //constructing memory pool and node view with the transaction tx included - val mp2 = memPool.put(UnconfirmedTransaction(tx)).get + val mp2 = memPool.put(UnconfirmedTransaction(tx, None)).get class UtxoReadersStub2 extends Actor { def receive: PartialFunction[Any, Unit] = { case GetReaders => sender() ! Readers(history, utxoState, mp2, wallet) diff --git a/src/test/scala/org/ergoplatform/local/MempoolAuditorSpec.scala b/src/test/scala/org/ergoplatform/local/MempoolAuditorSpec.scala index ea19d32b51..32ffa0741a 100644 --- a/src/test/scala/org/ergoplatform/local/MempoolAuditorSpec.scala +++ b/src/test/scala/org/ergoplatform/local/MempoolAuditorSpec.scala @@ -66,13 +66,13 @@ class MempoolAuditorSpec extends AnyFlatSpec with NodeViewTestOps with ErgoTestH val temporarilyValidTx = validTransactionFromBoxes(validTx.outputs, outputsProposition = proveDlogGen.sample.get) subscribeEvents(classOf[FailedTransaction]) - nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(validTx)) + nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(validTx, None)) testProbe.expectMsgClass(cleanupDuration, newTx) - expectMsgType[ProcessingOutcome.Accepted.type] + expectMsgType[ProcessingOutcome.Accepted] - nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(temporarilyValidTx)) + nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(temporarilyValidTx, None)) testProbe.expectMsgClass(cleanupDuration, newTx) - expectMsgType[ProcessingOutcome.Accepted.type] + expectMsgType[ProcessingOutcome.Accepted] getPoolSize shouldBe 2 @@ -102,7 +102,8 @@ class MempoolAuditorSpec extends AnyFlatSpec with NodeViewTestOps with ErgoTestH val us = us0.applyModifier(b1, None)(_ => ()).get val bxs = bh1.boxes.values.toList.filter(_.proposition != genesisEmissionBox.proposition) - val txs = validTransactionsFromBoxes(200000, bxs, new RandomWrapper)._1.map(UnconfirmedTransaction.apply) + val txs = validTransactionsFromBoxes(200000, bxs, new RandomWrapper)._1 + .map(tx => UnconfirmedTransaction(tx, None)) implicit val system = ActorSystem() val probe = TestProbe() diff --git a/src/test/scala/org/ergoplatform/mining/ErgoMinerSpec.scala b/src/test/scala/org/ergoplatform/mining/ErgoMinerSpec.scala index 44f769b23a..07174700c2 100644 --- a/src/test/scala/org/ergoplatform/mining/ErgoMinerSpec.scala +++ b/src/test/scala/org/ergoplatform/mining/ErgoMinerSpec.scala @@ -95,7 +95,7 @@ class ErgoMinerSpec extends AnyFlatSpec with ErgoTestHelpers with ValidBlocksGen val outputs = (0 until 10).map(_ => output) val unsignedTx = new UnsignedErgoTransaction(IndexedSeq(input), IndexedSeq(), outputs) val tx = defaultProver.sign(unsignedTx, IndexedSeq(boxToSpend), IndexedSeq(), r.s.stateContext).get - nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(ErgoTransaction(tx))) + nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(ErgoTransaction(tx), None)) expectNoMessage(1 seconds) testProbe.expectMsgClass(newBlockDelay, newBlockSignal) testProbe.expectMsgClass(newBlockDelay, newBlockSignal) @@ -122,7 +122,7 @@ class ErgoMinerSpec extends AnyFlatSpec with ErgoTestHelpers with ValidBlocksGen txCost shouldBe 431780 // send costly transaction to the mempool - nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(ErgoTransaction(costlyTx))) + nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(ErgoTransaction(costlyTx), None)) testProbe.expectMsgClass(newBlockDelay, newBlockSignal) testProbe.expectMsgClass(newBlockDelay, newBlockSignal) @@ -188,7 +188,7 @@ class ErgoMinerSpec extends AnyFlatSpec with ErgoTestHelpers with ValidBlocksGen ) } - txs.map(UnconfirmedTransaction.apply).foreach(nodeViewHolderRef ! LocallyGeneratedTransaction(_)) + txs.map(tx => UnconfirmedTransaction(tx, None)).foreach(nodeViewHolderRef ! LocallyGeneratedTransaction(_)) if (toSend > toSpend.size) { // wait for the next block @@ -257,7 +257,7 @@ class ErgoMinerSpec extends AnyFlatSpec with ErgoTestHelpers with ValidBlocksGen // As double-spending transactions are filtered out in the mempool, the only way to push them is to order to // include double-spending transaction directly via mandatoryTransactions argument of PrepareCandidate command - nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(ErgoTransaction(tx1))) + nodeViewHolderRef ! LocallyGeneratedTransaction(UnconfirmedTransaction(ErgoTransaction(tx1), None)) testProbe.expectMsgClass(newBlockDelay, newBlockSignal) testProbe.expectNoMessage(200.millis) diff --git a/src/test/scala/org/ergoplatform/nodeView/mempool/ErgoMemPoolSpec.scala b/src/test/scala/org/ergoplatform/nodeView/mempool/ErgoMemPoolSpec.scala index 09121b2160..72ae7c1810 100644 --- a/src/test/scala/org/ergoplatform/nodeView/mempool/ErgoMemPoolSpec.scala +++ b/src/test/scala/org/ergoplatform/nodeView/mempool/ErgoMemPoolSpec.scala @@ -25,8 +25,8 @@ class ErgoMemPoolSpec extends AnyFlatSpec val txs = validTransactionsFromUtxoState(wus) val pool0 = ErgoMemPool.empty(settings) val poolAfter = txs.foldLeft(pool0) { case (pool, tx) => - val (p, outcome) = pool.process(UnconfirmedTransaction(tx), us) - if (outcome != ProcessingOutcome.Accepted) { + val (p, outcome) = pool.process(UnconfirmedTransaction(tx, None), us) + if (!outcome.isInstanceOf[ProcessingOutcome.Accepted]) { throw new Exception("Transaction not accepted") } p @@ -36,7 +36,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec // light mode val poolLight = ErgoMemPool.empty(lightModeSettings) txs.foreach { tx => - poolLight.process(UnconfirmedTransaction(tx), us)._2 shouldBe ProcessingOutcome.Accepted + poolLight.process(UnconfirmedTransaction(tx, None), us)._2.isInstanceOf[ProcessingOutcome.Accepted] shouldBe true } } @@ -61,7 +61,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec )) var poolSize = ErgoMemPool.empty(sortBySizeSettings) - poolSize = poolSize.process(UnconfirmedTransaction(tx), wus)._1 + poolSize = poolSize.process(UnconfirmedTransaction(tx, None), wus)._1 val size = tx.size poolSize.pool.orderedTransactions.firstKey.weight shouldBe OrderedTxPool.weighted(tx, size).weight @@ -71,7 +71,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec )) var poolCost = ErgoMemPool.empty(sortByCostSettings) - poolCost = poolCost.process(UnconfirmedTransaction(tx), wus)._1 + poolCost = poolCost.process(UnconfirmedTransaction(tx, None), wus)._1 val cost = wus.validateWithCost(tx, Int.MaxValue).get poolCost.pool.orderedTransactions.firstKey.weight shouldBe OrderedTxPool.weighted(tx, cost).weight } @@ -83,10 +83,10 @@ class ErgoMemPoolSpec extends AnyFlatSpec val txs = validTransactionsFromUtxoState(wus) var pool = ErgoMemPool.empty(settings) txs.foreach { tx => - pool = pool.putWithoutCheck(Seq(UnconfirmedTransaction(tx))) + pool = pool.putWithoutCheck(Seq(UnconfirmedTransaction(tx, None))) } txs.foreach { tx => - pool.process(UnconfirmedTransaction(tx), us)._2.isInstanceOf[ProcessingOutcome.Declined] shouldBe true + pool.process(UnconfirmedTransaction(tx, None), us)._2.isInstanceOf[ProcessingOutcome.Declined] shouldBe true } } @@ -117,13 +117,13 @@ class ErgoMemPoolSpec extends AnyFlatSpec IndexedSeq(feeOut) ) - val tx1 = UnconfirmedTransaction(ErgoTransaction(tx1Like.inputs, tx1Like.outputCandidates)) - val tx2 = UnconfirmedTransaction(ErgoTransaction(ErgoTransaction(tx2Like.inputs, tx2Like.outputCandidates))) + val tx1 = UnconfirmedTransaction(ErgoTransaction(tx1Like.inputs, tx1Like.outputCandidates), None) + val tx2 = UnconfirmedTransaction(ErgoTransaction(ErgoTransaction(tx2Like.inputs, tx2Like.outputCandidates)), None) val pool0 = ErgoMemPool.empty(settings) val (pool, tx1Outcome) = pool0.process(tx1, us) - tx1Outcome shouldBe ProcessingOutcome.Accepted + tx1Outcome.isInstanceOf[ProcessingOutcome.Accepted] shouldBe true // tx1 and tx2 are spending the same input, and paying the same fee. // So if tx2 is about a bigger or equal size, it should be rejected as it is paying less for a byte. @@ -132,7 +132,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec pool.process(tx2, us)._2.isInstanceOf[ProcessingOutcome.DoubleSpendingLoser] shouldBe true } else { val (updPool, outcome) = pool.process(tx2, us) - outcome shouldBe ProcessingOutcome.Accepted + outcome.isInstanceOf[ProcessingOutcome.Accepted] shouldBe true updPool.size shouldBe 1 updPool.take(1).head.transaction.id shouldBe tx2.transaction.id } @@ -144,7 +144,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val us = createUtxoState(parameters)._1 var pool = ErgoMemPool.empty(settings) forAll(invalidBlockTransactionsGen) { blockTransactions => - val unconfirmedTxs = blockTransactions.txs.map(UnconfirmedTransaction.apply) + val unconfirmedTxs = blockTransactions.txs.map(tx => UnconfirmedTransaction(tx, None)) unconfirmedTxs.foreach(tx => pool = pool.process(tx, us)._1) unconfirmedTxs.foreach(tx => pool.process(tx, us)._2.isInstanceOf[ProcessingOutcome.Declined] shouldBe true) @@ -156,7 +156,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val genesis = validFullBlock(None, us, bh) val wus = WrappedUtxoState(us, bh, stateConstants, parameters).applyModifier(genesis)(_ => ()).get val txs = validTransactionsFromUtxoState(wus) - val unconfirmedTxs = txs.map(UnconfirmedTransaction.apply) + val unconfirmedTxs = txs.map(tx => UnconfirmedTransaction(tx, None)) val maxSettings = settings.copy(nodeSettings = settings.nodeSettings.copy(minimalFeeAmount = Long.MaxValue)) val pool = ErgoMemPool.empty(maxSettings) @@ -171,7 +171,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val pool2 = ErgoMemPool.empty(minSettings) unconfirmedTxs.foreach { tx => val (_, outcome) = pool2.process(tx, us) - outcome shouldBe ProcessingOutcome.Accepted + outcome.isInstanceOf[ProcessingOutcome.Accepted] shouldBe true } } @@ -180,7 +180,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val pool = ErgoMemPool.empty(settings) forAll(invalidBlockTransactionsGen) { blockTransactions => blockTransactions.txs.forall{tx => - val valRes = pool.process(UnconfirmedTransaction(tx), us)._2 + val valRes = pool.process(UnconfirmedTransaction(tx, None), us)._2 valRes.isInstanceOf[ProcessingOutcome.Invalidated] || valRes.isInstanceOf[ProcessingOutcome.Declined]} shouldBe true } @@ -188,7 +188,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec it should "accept only unique transactions" in { val pool = ErgoMemPool.empty(settings) - val tx = UnconfirmedTransaction(invalidErgoTransactionGen.sample.get) + val tx = UnconfirmedTransaction(invalidErgoTransactionGen.sample.get, None) pool.putWithoutCheck(Seq(tx, tx, tx)).size shouldBe 1 } @@ -202,8 +202,8 @@ class ErgoMemPoolSpec extends AnyFlatSpec acc :+ masterTx.copy(outputCandidates = IndexedSeq( new ErgoBoxCandidate(idx * 10000 + 1, proposition, c.creationHeight, c.additionalTokens, c.additionalRegisters))) } - val lessPrioritizedTxs = txsWithAscendingPriority.init.map(UnconfirmedTransaction.apply) - val mostPrioritizedTx = UnconfirmedTransaction(txsWithAscendingPriority.last) + val lessPrioritizedTxs = txsWithAscendingPriority.init.map(tx => UnconfirmedTransaction(tx, None)) + val mostPrioritizedTx = UnconfirmedTransaction(txsWithAscendingPriority.last, None) pool = pool.putWithoutCheck(lessPrioritizedTxs) pool.size shouldBe 4 @@ -217,7 +217,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val (us, bh) = createUtxoState(parameters) val genesis = validFullBlock(None, us, bh) val wus = WrappedUtxoState(us, bh, stateConstants, parameters).applyModifier(genesis)(_ => ()).get - val txs = validTransactionsFromUtxoState(wus).map(UnconfirmedTransaction.apply) + val txs = validTransactionsFromUtxoState(wus).map(tx => UnconfirmedTransaction(tx, None)) var pool = ErgoMemPool.empty(settings) txs.foreach { tx => pool = pool.putWithoutCheck(Seq(tx)) @@ -226,9 +226,9 @@ class ErgoMemPoolSpec extends AnyFlatSpec val spendingBox = tx.transaction.outputs.head val unconfirmedTransaction = UnconfirmedTransaction(tx.transaction.copy( inputs = IndexedSeq(new Input(spendingBox.id, emptyProverResult)), - outputCandidates = IndexedSeq(spendingBox))) + outputCandidates = IndexedSeq(spendingBox)), None) val (newPool, outcome) = pool.process(unconfirmedTransaction, us) - outcome shouldBe ProcessingOutcome.Accepted + outcome.isInstanceOf[ProcessingOutcome.Accepted] shouldBe true pool = newPool } } @@ -237,7 +237,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val (us, bh) = createUtxoState(parameters) val genesis = validFullBlock(None, us, bh) val wus = WrappedUtxoState(us, bh, stateConstants, parameters).applyModifier(genesis)(_ => ()).get - var txs = validTransactionsFromUtxoState(wus).map(UnconfirmedTransaction.apply) + var txs = validTransactionsFromUtxoState(wus).map(tx => UnconfirmedTransaction(tx, None)) val family_depth = 10 val limitedPoolSettings = settings.copy(nodeSettings = settings.nodeSettings.copy(mempoolCapacity = (family_depth + 1) * txs.size)) var pool = ErgoMemPool.empty(limitedPoolSettings) @@ -248,9 +248,9 @@ class ErgoMemPoolSpec extends AnyFlatSpec txs = txs.map(tx => { val spendingBox = tx.transaction.outputs.head val newTx = UnconfirmedTransaction(tx.transaction.copy(inputs = IndexedSeq(new Input(spendingBox.id, emptyProverResult)), - outputCandidates = IndexedSeq(spendingBox))) + outputCandidates = IndexedSeq(spendingBox)), None) val (newPool, outcome) = pool.process(newTx, us) - outcome shouldBe ProcessingOutcome.Accepted + outcome.isInstanceOf[ProcessingOutcome.Accepted] shouldBe true pool = newPool newTx }) @@ -261,7 +261,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val sb = tx.outputs.head val txToDecline = tx.copy(inputs = IndexedSeq(new Input(sb.id, emptyProverResult)), outputCandidates = IndexedSeq(new ErgoBoxCandidate(sb.value, sb.ergoTree, sb.creationHeight, sb.additionalTokens, sb.additionalRegisters))) - val res = pool.process(UnconfirmedTransaction(txToDecline), us)._2 + val res = pool.process(UnconfirmedTransaction(txToDecline, None), us)._2 res.isInstanceOf[ProcessingOutcome.Declined] shouldBe true res.asInstanceOf[ProcessingOutcome.Declined].e.getMessage.contains("pays less") shouldBe true pool.size shouldBe (family_depth + 1) * txs.size @@ -272,7 +272,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val (us, bh) = createUtxoState(parameters) val genesis = validFullBlock(None, us, bh) val wus = WrappedUtxoState(us, bh, stateConstants, parameters).applyModifier(genesis)(_ => ()).get - var txs = validTransactionsFromUtxoState(wus).map(UnconfirmedTransaction.apply) + var txs = validTransactionsFromUtxoState(wus).map(tx => UnconfirmedTransaction(tx, None)) var allTxs = txs val family_depth = 10 val limitedPoolSettings = settings.copy(nodeSettings = settings.nodeSettings.copy(mempoolCapacity = (family_depth + 1) * txs.size)) @@ -284,9 +284,9 @@ class ErgoMemPoolSpec extends AnyFlatSpec txs = txs.map(tx => { val spendingBox = tx.transaction.outputs.head val newTx = UnconfirmedTransaction(tx.transaction.copy(inputs = IndexedSeq(new Input(spendingBox.id, emptyProverResult)), - outputCandidates = IndexedSeq(spendingBox))) + outputCandidates = IndexedSeq(spendingBox)), None) val (newPool, outcome) = pool.process(newTx, us) - outcome shouldBe ProcessingOutcome.Accepted + outcome.isInstanceOf[ProcessingOutcome.Accepted] shouldBe true pool = newPool allTxs = allTxs :+ newTx newTx @@ -305,7 +305,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val (us, bh) = createUtxoState(parameters) val genesis = validFullBlock(None, us, bh) val wus = WrappedUtxoState(us, bh, stateConstants, parameters).applyModifier(genesis)(_ => ()).get - var txs = validTransactionsFromUtxoState(wus).map(UnconfirmedTransaction.apply) + var txs = validTransactionsFromUtxoState(wus).map(tx => UnconfirmedTransaction(tx, None)) val family_depth = 10 val limitedPoolSettings = settings.copy(nodeSettings = settings.nodeSettings.copy(mempoolCapacity = (family_depth + 1) * txs.size)) var pool = ErgoMemPool.empty(limitedPoolSettings) @@ -321,9 +321,9 @@ class ErgoMemPoolSpec extends AnyFlatSpec val out1 = new ErgoBoxCandidate(55000, feeProp, sc.creationHeight) val newTx = UnconfirmedTransaction(tx.transaction.copy(inputs = IndexedSeq(new Input(spendingBox.id, emptyProverResult)), - outputCandidates = IndexedSeq(out0, out1))) + outputCandidates = IndexedSeq(out0, out1)), None) val (newPool, outcome) = pool.process(newTx, us) - outcome shouldBe ProcessingOutcome.Accepted + outcome.isInstanceOf[ProcessingOutcome.Accepted] shouldBe true pool = newPool newTx }) @@ -347,7 +347,7 @@ class ErgoMemPoolSpec extends AnyFlatSpec val (us, bh) = createUtxoState(parameters) val genesis = validFullBlock(None, us, bh) val wus = WrappedUtxoState(us, bh, stateConstants, parameters).applyModifier(genesis)(_ => ()).get - var txs = validTransactionsFromUtxoState(wus).map(UnconfirmedTransaction.apply) + var txs = validTransactionsFromUtxoState(wus).map(tx => UnconfirmedTransaction(tx, None)) var allTxs = txs val family_depth = 10 val limitedPoolSettings = settings.copy(nodeSettings = settings.nodeSettings.copy(mempoolCapacity = (family_depth + 1) * txs.size)) @@ -359,9 +359,9 @@ class ErgoMemPoolSpec extends AnyFlatSpec txs = txs.map(tx => { val spendingBox = tx.transaction.outputs.head val newTx = UnconfirmedTransaction(tx.transaction.copy(inputs = IndexedSeq(new Input(spendingBox.id, emptyProverResult)), - outputCandidates = IndexedSeq(spendingBox))) + outputCandidates = IndexedSeq(spendingBox)), None) val (newPool, outcome) = pool.process(newTx, us) - outcome shouldBe ProcessingOutcome.Accepted + outcome.isInstanceOf[ProcessingOutcome.Accepted] shouldBe true pool = newPool allTxs = allTxs :+ newTx newTx diff --git a/src/test/scala/org/ergoplatform/nodeView/viewholder/ErgoNodeViewHolderSpec.scala b/src/test/scala/org/ergoplatform/nodeView/viewholder/ErgoNodeViewHolderSpec.scala index e3989a61bd..e9f8471729 100644 --- a/src/test/scala/org/ergoplatform/nodeView/viewholder/ErgoNodeViewHolderSpec.scala +++ b/src/test/scala/org/ergoplatform/nodeView/viewholder/ErgoNodeViewHolderSpec.scala @@ -145,10 +145,10 @@ class ErgoNodeViewHolderSpec extends ErgoPropertyTest with HistoryTestHelpers wi val boxes = ErgoState.newBoxes(genesis.transactions).find(_.ergoTree == Constants.TrueLeaf) boxes.nonEmpty shouldBe true - val tx = UnconfirmedTransaction(validTransactionFromBoxes(boxes.toIndexedSeq)) + val tx = UnconfirmedTransaction(validTransactionFromBoxes(boxes.toIndexedSeq), None) subscribeEvents(classOf[FailedTransaction]) nodeViewHolderRef ! LocallyGeneratedTransaction(tx) - expectMsg(Accepted) + expectMsgType[Accepted] getPoolSize shouldBe 1 } } diff --git a/src/test/scala/org/ergoplatform/nodeView/wallet/ErgoWalletServiceSpec.scala b/src/test/scala/org/ergoplatform/nodeView/wallet/ErgoWalletServiceSpec.scala index c7b711149e..48f8a5e942 100644 --- a/src/test/scala/org/ergoplatform/nodeView/wallet/ErgoWalletServiceSpec.scala +++ b/src/test/scala/org/ergoplatform/nodeView/wallet/ErgoWalletServiceSpec.scala @@ -177,7 +177,7 @@ class ErgoWalletServiceSpec inputsRaw = encodedBoxes, dataInputsRaw = Seq.empty, sign = true - ).get.asInstanceOf[ErgoTransaction]) + ).get.asInstanceOf[ErgoTransaction], None) // let's create wallet state with an unconfirmed transaction in mempool val wState = initialState(store, versionedStore, Some(new FakeMempool(Seq(unconfirmedTx)))) diff --git a/src/test/scala/org/ergoplatform/sanity/ErgoSanity.scala b/src/test/scala/org/ergoplatform/sanity/ErgoSanity.scala index af4ab63ad2..8d41c0cbd3 100644 --- a/src/test/scala/org/ergoplatform/sanity/ErgoSanity.scala +++ b/src/test/scala/org/ergoplatform/sanity/ErgoSanity.scala @@ -42,7 +42,8 @@ trait ErgoSanity[ST <: ErgoState[ST]] extends HistoryTests //Generators override lazy val transactionGenerator: Gen[ErgoTransaction] = invalidErgoTransactionGen - override lazy val unconfirmedTxGenerator: Gen[UnconfirmedTransaction] = invalidErgoTransactionGen.map(UnconfirmedTransaction.apply) + override lazy val unconfirmedTxGenerator: Gen[UnconfirmedTransaction] = + invalidErgoTransactionGen.map(tx => UnconfirmedTransaction(tx, None)) override lazy val memPoolGenerator: Gen[MPool] = emptyMemPoolGen override def syntacticallyValidModifier(history: HT): Header = { diff --git a/src/test/scala/org/ergoplatform/utils/Stubs.scala b/src/test/scala/org/ergoplatform/utils/Stubs.scala index 22fa994c94..c7d0139081 100644 --- a/src/test/scala/org/ergoplatform/utils/Stubs.scala +++ b/src/test/scala/org/ergoplatform/utils/Stubs.scala @@ -13,8 +13,7 @@ import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.LocallyGe import org.ergoplatform.nodeView.ErgoReadersHolder.{GetDataFromHistory, GetReaders, Readers} import org.ergoplatform.nodeView.history.ErgoHistory import org.ergoplatform.nodeView.mempool.ErgoMemPool -import org.ergoplatform.nodeView.mempool.ErgoMemPool.ProcessingOutcome.{Accepted, Invalidated} -import org.ergoplatform.nodeView.mempool.ErgoMemPool.SortingOption +import org.ergoplatform.nodeView.mempool.ErgoMemPool.{ProcessingOutcome, SortingOption} import org.ergoplatform.nodeView.state.wrapped.WrappedUtxoState import org.ergoplatform.nodeView.state.{DigestState, ErgoStateContext, StateType} import org.ergoplatform.nodeView.wallet.ErgoWalletActor._ @@ -71,7 +70,7 @@ trait Stubs extends ErgoGenerators with ErgoTestHelpers with ChainGenerator with lazy val wallet = new WalletStub val txs: Seq[ErgoTransaction] = validTransactionsFromBoxHolder(boxesHolderGen.sample.get)._1 - val memPool: ErgoMemPool = ErgoMemPool.empty(settings).put(txs.map(UnconfirmedTransaction.apply)).get + val memPool: ErgoMemPool = ErgoMemPool.empty(settings).put(txs.map(tx => UnconfirmedTransaction(tx, None))).get val digestReaders = Readers(history, digestState, memPool, wallet) @@ -120,8 +119,8 @@ trait Stubs extends ErgoGenerators with ErgoTestHelpers with ChainGenerator with class NodeViewStub extends Actor { def receive: Receive = { - case LocallyGeneratedTransaction(_) => - sender() ! Accepted + case LocallyGeneratedTransaction(utx) => + sender() ! new ProcessingOutcome.Accepted(utx, System.currentTimeMillis()) case _ => } } @@ -129,7 +128,7 @@ trait Stubs extends ErgoGenerators with ErgoTestHelpers with ChainGenerator with class FailingNodeViewStub extends Actor { def receive: Receive = { case LocallyGeneratedTransaction(_) => - sender() ! Invalidated(new Error("Transaction invalid")) + sender() ! new ProcessingOutcome.Invalidated(new Error("Transaction invalid"), System.currentTimeMillis()) case _ => } } diff --git a/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala b/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala index e8e062f96a..0aa376377b 100644 --- a/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala +++ b/src/test/scala/scorex/testkit/properties/NodeViewSynchronizerTests.scala @@ -3,7 +3,7 @@ package scorex.testkit.properties import akka.actor._ import akka.testkit.TestProbe import org.ergoplatform.modifiers.BlockSection -import org.ergoplatform.modifiers.mempool.ErgoTransaction +import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction} import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoSyncInfo, ErgoSyncInfoMessageSpec} import org.ergoplatform.nodeView.mempool.ErgoMemPool import org.scalacheck.Gen @@ -60,7 +60,7 @@ trait NodeViewSynchronizerTests[ST <: ErgoState[ST]] extends AnyPropSpec property("NodeViewSynchronizer: SuccessfulTransaction") { withFixture { ctx => import ctx._ - node ! SuccessfulTransaction(tx) + node ! SuccessfulTransaction(UnconfirmedTransaction(tx, None)) ncProbe.fishForMessage(3 seconds) { case m => m.isInstanceOf[SendToNetwork] } } } @@ -68,7 +68,7 @@ trait NodeViewSynchronizerTests[ST <: ErgoState[ST]] extends AnyPropSpec property("NodeViewSynchronizer: FailedTransaction") { withFixture { ctx => import ctx._ - node ! FailedTransaction(tx.id, new Exception, immediateFailure = true) + node ! FailedTransaction(UnconfirmedTransaction(tx, None), new Exception) // todo: NVS currently does nothing in this case. Should check banning. } } diff --git a/src/test/scala/scorex/testkit/properties/mempool/MempoolFilterPerformanceTest.scala b/src/test/scala/scorex/testkit/properties/mempool/MempoolFilterPerformanceTest.scala index 9a7df6e178..c7be888dab 100644 --- a/src/test/scala/scorex/testkit/properties/mempool/MempoolFilterPerformanceTest.scala +++ b/src/test/scala/scorex/testkit/properties/mempool/MempoolFilterPerformanceTest.scala @@ -34,7 +34,7 @@ trait MempoolFilterPerformanceTest var m: ErgoMemPool = memPool (0 until 1000) foreach { _ => forAll(transactionGenerator) { tx: ErgoTransaction => - m = m.put(UnconfirmedTransaction(tx)).get + m = m.put(UnconfirmedTransaction(tx, None)).get } } m.size should be > 1000 @@ -45,7 +45,7 @@ trait MempoolFilterPerformanceTest @SuppressWarnings(Array("org.wartremover.warts.OptionPartial")) val m = initializedMempool.get forAll(transactionGenerator) { tx: ErgoTransaction => - val (time, _) = profile(m.filter(Seq(UnconfirmedTransaction(tx)))) + val (time, _) = profile(m.filter(Seq(UnconfirmedTransaction(tx, None)))) assert(time < thresholdSecs) } } @@ -54,7 +54,7 @@ trait MempoolFilterPerformanceTest @SuppressWarnings(Array("org.wartremover.warts.OptionPartial")) var m = initializedMempool.get forAll(transactionGenerator) { tx: ErgoTransaction => - val unconfirmedTx = UnconfirmedTransaction(tx) + val unconfirmedTx = UnconfirmedTransaction(tx, None) m = m.put(unconfirmedTx).get val (time, _) = profile(m.filter(Seq(unconfirmedTx))) assert(time < thresholdSecs) diff --git a/src/test/scala/scorex/testkit/properties/mempool/MempoolRemovalTest.scala b/src/test/scala/scorex/testkit/properties/mempool/MempoolRemovalTest.scala index 796139099a..5daa5c98ee 100644 --- a/src/test/scala/scorex/testkit/properties/mempool/MempoolRemovalTest.scala +++ b/src/test/scala/scorex/testkit/properties/mempool/MempoolRemovalTest.scala @@ -29,7 +29,7 @@ trait MempoolRemovalTest extends AnyPropSpec var m: ErgoMemPool = memPool // var h: ErgoHistory = historyGen.sample.get forAll(transactionGenerator) { tx: ErgoTransaction => - m = m.put(UnconfirmedTransaction(tx)).get + m = m.put(UnconfirmedTransaction(tx, None)).get } // var prevMempoolSize = m.size // val b = modifierWithTransactions(Some(m), None) diff --git a/src/test/scala/scorex/testkit/properties/mempool/MempoolTransactionsTest.scala b/src/test/scala/scorex/testkit/properties/mempool/MempoolTransactionsTest.scala index a4c9b0a608..95df30d0d6 100644 --- a/src/test/scala/scorex/testkit/properties/mempool/MempoolTransactionsTest.scala +++ b/src/test/scala/scorex/testkit/properties/mempool/MempoolTransactionsTest.scala @@ -16,7 +16,7 @@ trait MempoolTransactionsTest val transactionSeqGenerator: Gen[Seq[ErgoTransaction]] = Gen.nonEmptyContainerOf[Seq, ErgoTransaction](transactionGenerator) val unconfirmedTxSeqGenerator: Gen[Seq[UnconfirmedTransaction]] = - transactionSeqGenerator.map(txs => txs.map(UnconfirmedTransaction.apply)) + transactionSeqGenerator.map(txs => txs.map(tx => UnconfirmedTransaction(tx, None))) property("Size of mempool should increase when adding a non-present transaction") { forAll(memPoolGenerator, unconfirmedTxGenerator) { (mp: ErgoMemPool, unconfirmedTx: UnconfirmedTransaction) =>