Skip to content

Commit

Permalink
Merge pull request #1819 from ergoplatform/p2p-opts
Browse files Browse the repository at this point in the history
Mempool backpressure
  • Loading branch information
kushti authored Sep 1, 2022
2 parents 5c88fb7 + 7779a1e commit 91a6a9a
Show file tree
Hide file tree
Showing 23 changed files with 366 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object ErgoMemPoolBenchmark

private def bench(txsInIncomeOrder: Seq[ErgoTransaction]): Unit = {
var pool = ErgoMemPool.empty(settings)
txsInIncomeOrder.foreach(tx => pool = pool.put(UnconfirmedTransaction(tx)).get)
txsInIncomeOrder.foreach(tx => pool = pool.put(UnconfirmedTransaction(tx, None)).get)
}

performance of "ErgoMemPool awaiting" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ class MempoolPerformanceBench extends AnyPropSpec
override val memPool: ErgoMemPool = ErgoMemPool.empty(settings)
override val memPoolGenerator: Gen[ErgoMemPool] = emptyMemPoolGen
override val transactionGenerator: Gen[ErgoTransaction] = invalidErgoTransactionGen
override val unconfirmedTxGenerator: Gen[UnconfirmedTransaction] = invalidErgoTransactionGen.map(UnconfirmedTransaction.apply)
override val unconfirmedTxGenerator: Gen[UnconfirmedTransaction] =
invalidErgoTransactionGen.map(tx => UnconfirmedTransaction(tx, None))
}
2 changes: 1 addition & 1 deletion src/main/resources/mainnet.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ ergo {
blacklistedTransactions = []

# maximum cost of transaction for it to be propagated
maxTransactionCost = 5000000
maxTransactionCost = 4900000
}
}

Expand Down
22 changes: 1 addition & 21 deletions src/main/scala/org/ergoplatform/http/api/ApiCodecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.ergoplatform.{ErgoBox, ErgoLikeContext, ErgoLikeTransaction, JsonCode
import org.ergoplatform.http.api.ApiEncoderOption.Detalization
import org.ergoplatform.ErgoBox.RegisterId
import org.ergoplatform.mining.{groupElemFromBytes, groupElemToBytes}
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction, UnsignedErgoTransaction}
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnsignedErgoTransaction}
import org.ergoplatform.nodeView.history.ErgoHistory.Difficulty
import org.ergoplatform.settings.ErgoAlgos
import org.ergoplatform.nodeView.wallet.persistence.WalletDigest
Expand Down Expand Up @@ -195,26 +195,6 @@ trait ApiCodecs extends JsonCodecs {
} yield ErgoTransaction(ergoLikeTx)
}

// We are not using this encoder for now, but may use in future
implicit val unconfirmedTxEncoder: Encoder[UnconfirmedTransaction] = { unconfirmedTx =>
Json.obj(
"transaction" -> transactionEncoder(unconfirmedTx.transaction),
"lastCost" -> unconfirmedTx.lastCost.asJson,
"createdTime" -> unconfirmedTx.createdTime.asJson,
"lastCheckedTime" -> unconfirmedTx.lastCheckedTime.asJson
)
}

// We are not using this decoder for now, but may use in future
implicit val unconfirmedTxDecoder: Decoder[UnconfirmedTransaction] = { cursor =>
for {
tx <- transactionDecoder(cursor)
lastCost <- cursor.downField("lastCost").as[Option[Int]]
createdTime <- cursor.downField("createdTime").as[Long]
lastCheckedTime <- cursor.downField("lastCheckedTime").as[Long]
} yield UnconfirmedTransaction(tx, lastCost, createdTime, lastCheckedTime, Some(tx.bytes))
}



implicit val sigmaBooleanEncoder: Encoder[SigmaBoolean] = {
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/org/ergoplatform/http/api/ErgoBaseApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ trait ErgoBaseApiRoute extends ApiRoute with ApiCodecs {
(nodeViewActorRef ? LocallyGeneratedTransaction(unconfirmedTx))
.mapTo[ProcessingOutcome]
.flatMap {
case Accepted => Future.successful(unconfirmedTx.transaction.id)
case DoubleSpendingLoser(_) => Future.failed(new IllegalArgumentException("Double spending attempt"))
case Declined(ex) => Future.failed(ex)
case Invalidated(ex) => Future.failed(ex)
case _: Accepted => Future.successful(unconfirmedTx.transaction.id)
case _: DoubleSpendingLoser => Future.failed(new IllegalArgumentException("Double spending attempt"))
case d: Declined => Future.failed(d.e)
case i: Invalidated => Future.failed(i.e)
}
completeOrRecoverWith(resultFuture) { ex =>
ApiError.BadRequest(ex.getMessage)
Expand All @@ -78,9 +78,9 @@ trait ErgoBaseApiRoute extends ApiRoute with ApiCodecs {
val maxTxCost = ergoSettings.nodeSettings.maxTransactionCost
utxo.withMempool(mp)
.validateWithCost(tx, maxTxCost)
.map(cost => UnconfirmedTransaction(tx, Some(cost), now, now, bytes))
.map(cost => UnconfirmedTransaction(tx, Some(cost), now, now, bytes, source = None))
case _ =>
tx.statelessValidity().map(_ => UnconfirmedTransaction(tx, None, now, now, bytes))
tx.statelessValidity().map(_ => UnconfirmedTransaction(tx, None, now, now, bytes, source = None))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ case class WalletApiRoute(readersHolder: ActorRef,
requests,
inputsRaw,
dataInputsRaw,
tx => Future(Success(UnconfirmedTransaction(tx))),
tx => Future(Success(UnconfirmedTransaction(tx, source = None))),
utx => ApiResponse(utx.transaction)
)
}
Expand Down
17 changes: 10 additions & 7 deletions src/main/scala/org/ergoplatform/local/CleanupWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scorex.core.transaction.state.TransactionValidation
import scorex.util.{ModifierId, ScorexLogging}

import scala.annotation.tailrec
import scala.collection.mutable
import scala.util.{Failure, Success}

/**
Expand Down Expand Up @@ -82,26 +83,28 @@ class CleanupWorker(nodeViewHolderRef: ActorRef,
//internal loop function validating transactions, returns validated and invalidated transaction ids
@tailrec
def validationLoop(txs: Seq[UnconfirmedTransaction],
validated: Seq[UnconfirmedTransaction],
invalidated: Seq[ModifierId],
costAcc: Long): (Seq[UnconfirmedTransaction], Seq[ModifierId]) = {
validated: mutable.ArrayBuilder[UnconfirmedTransaction],
invalidated: mutable.ArrayBuilder[ModifierId],
costAcc: Long
): (mutable.ArrayBuilder[UnconfirmedTransaction], mutable.ArrayBuilder[ModifierId]) = {
txs match {
case head :: tail if costAcc < CostLimit =>
state.validateWithCost(head.transaction, nodeSettings.maxTransactionCost) match {
case Success(txCost) =>
val updTx = head.onRecheck(txCost)
validationLoop(tail, validated :+ updTx, invalidated, txCost + costAcc)
val updTx = head.withCost(txCost)
validationLoop(tail, validated += updTx, invalidated, txCost + costAcc)
case Failure(e) =>
val txId = head.id
log.info(s"Transaction $txId invalidated: ${e.getMessage}")
validationLoop(tail, validated, invalidated :+ txId, head.lastCost.getOrElse(0) + costAcc) //add old cost
validationLoop(tail, validated, invalidated += txId, head.lastCost.getOrElse(0) + costAcc) //add old cost
}
case _ =>
validated -> invalidated
}
}

validationLoop(txsToValidate, Seq.empty, Seq.empty, 0L)
val res = validationLoop(txsToValidate, mutable.ArrayBuilder.make(), mutable.ArrayBuilder.make(), 0L)
wrapRefArray(res._1.result()) -> wrapRefArray(res._2.result())
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,47 @@
package org.ergoplatform.modifiers.mempool

import scorex.core.network.ConnectedPeer
import scorex.util.{ModifierId, ScorexLogging}


/**
* Wrapper for unconfirmed transaction and corresponding data
*
* @param transaction - unconfirmed transaction
* @param lastCost - validation cost during last check
* @param createdTime - when transaction entered the pool
* @param lastCheckedTime - when last validity check was done
* @param transactionBytes - transaction bytes, to avoid serializations when we send it over the wire
* @param source - peer which delivered the transaction (None if transaction submitted via API)
*/
case class UnconfirmedTransaction(transaction: ErgoTransaction,
lastCost: Option[Int],
createdTime: Long,
lastCheckedTime: Long,
transactionBytes: Option[Array[Byte]])
transactionBytes: Option[Array[Byte]],
source: Option[ConnectedPeer])
extends ScorexLogging {

def id: ModifierId = transaction.id

/**
* Updates cost and last checked time of unconfirmed transaction
*/
def onRecheck(cost: Int): UnconfirmedTransaction = {
def withCost(cost: Int): UnconfirmedTransaction = {
copy(lastCost = Some(cost), lastCheckedTime = System.currentTimeMillis())
}

}

object UnconfirmedTransaction {

def apply(tx: ErgoTransaction): UnconfirmedTransaction = {
def apply(tx: ErgoTransaction, source: Option[ConnectedPeer]): UnconfirmedTransaction = {
val now = System.currentTimeMillis()
UnconfirmedTransaction(tx, None, now, now, Some(tx.bytes))
UnconfirmedTransaction(tx, None, now, now, Some(tx.bytes), source)
}

def apply(tx: ErgoTransaction, txBytes: Array[Byte]): UnconfirmedTransaction = {
def apply(tx: ErgoTransaction, txBytes: Array[Byte], source: Option[ConnectedPeer]): UnconfirmedTransaction = {
val now = System.currentTimeMillis()
UnconfirmedTransaction(tx, None, now, now, Some(txBytes))
UnconfirmedTransaction(tx, None, now, now, Some(txBytes), source)
}

}
Expand Down
Loading

0 comments on commit 91a6a9a

Please sign in to comment.