From 003bb84731f6ea797690d1af4f322a5067f6181b Mon Sep 17 00:00:00 2001 From: Alexander Chepurnoy Date: Mon, 30 Sep 2024 23:57:09 +0300 Subject: [PATCH] getting dispatchers back --- src/main/resources/application.conf | 37 +++++++++++++++++++ .../org/ergoplatform/GlobalConstants.scala | 16 ++++++++ .../nodeView/ErgoReadersHolder.scala | 3 +- .../nodeView/history/extra/ExtraIndexer.scala | 14 ++++--- .../nodeView/wallet/ErgoWalletActor.scala | 1 + 5 files changed, 64 insertions(+), 7 deletions(-) create mode 100644 src/main/scala/org/ergoplatform/GlobalConstants.scala diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 7b1e91eb31..cbc5dd4382 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -590,4 +590,41 @@ critical-dispatcher { fixed-pool-size = 2 } throughput = 1 +} + +# dispatcher for some API-related actors +api-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 1 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 2 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 4 +} + +indexer-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 1 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 1.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 4 + } } \ No newline at end of file diff --git a/src/main/scala/org/ergoplatform/GlobalConstants.scala b/src/main/scala/org/ergoplatform/GlobalConstants.scala new file mode 100644 index 0000000000..2536689a4f --- /dev/null +++ b/src/main/scala/org/ergoplatform/GlobalConstants.scala @@ -0,0 +1,16 @@ +package org.ergoplatform + +/** + * A singleton which holds constants needed around the whole Ergo Platform. + */ +object GlobalConstants { + + /** + * Name of dispatcher for actors processing API requests + * (to avoid clashing between blockchain processing and API actors) + */ + val ApiDispatcher = "api-dispatcher" + + val IndexerDispatcher = "indexer-dispatcher" + +} diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoReadersHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoReadersHolder.scala index 8ff1f1f710..a7df32ee3f 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoReadersHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoReadersHolder.scala @@ -1,6 +1,7 @@ package org.ergoplatform.nodeView import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} +import org.ergoplatform.GlobalConstants import org.ergoplatform.nodeView.ErgoReadersHolder._ import org.ergoplatform.nodeView.history.ErgoHistoryReader import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader @@ -68,7 +69,7 @@ object ErgoReadersHolderRef { def apply(viewHolderRef: ActorRef) (implicit context: ActorRefFactory): ActorRef = { - val props = Props(new ErgoReadersHolder(viewHolderRef)) + val props = Props(new ErgoReadersHolder(viewHolderRef)).withDispatcher(GlobalConstants.ApiDispatcher) context.actorOf(props) } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index 1a3cd221eb..29636c920a 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -2,7 +2,7 @@ package org.ergoplatform.nodeView.history.extra import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash} import org.ergoplatform.ErgoBox.TokenId -import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, Pay2SAddress} +import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, GlobalConstants, Pay2SAddress} import org.ergoplatform.modifiers.history.BlockTransactions import org.ergoplatform.modifiers.history.header.Header import org.ergoplatform.modifiers.mempool.ErgoTransaction @@ -25,7 +25,7 @@ import spire.syntax.all.cfor import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.collection.concurrent -import scala.concurrent.Future +import scala.concurrent.{ExecutionContextExecutor, Future} import scala.jdk.CollectionConverters._ /** @@ -33,6 +33,8 @@ import scala.jdk.CollectionConverters._ */ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { + private implicit val ec: ExecutionContextExecutor = context.dispatcher + /** * Max buffer size (determined by config) */ @@ -97,7 +99,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { (range._1 until range._2).foreach { blockNum => history.bestBlockTransactionsAt(blockNum).map(blockCache.put(blockNum, _)) } - }(context.dispatcher) + } } } else { val blockNums = height + 1 to readingUpTo @@ -105,7 +107,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { blockNums.foreach { blockNum => history.bestBlockTransactionsAt(blockNum).map(blockCache.put(blockNum, _)) } - }(context.dispatcher) + } } } txs @@ -248,7 +250,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { val height = headerOpt.map(_.height).getOrElse(state.indexedHeight) if (btOpt.isEmpty) { - log.warn(s"Could not read block $height / $chainHeight from database, waiting for new block until retrying") + log.error(s"Could not read block $height / $chainHeight from database, waiting for new block until retrying") return state.decrementIndexedHeight.copy(caughtUp = true) } @@ -594,6 +596,6 @@ object ExtraIndexer { def apply(chainSettings: ChainSettings, cacheSettings: CacheSettings)(implicit system: ActorSystem): ActorRef = { val props = Props.create(classOf[ExtraIndexer], cacheSettings, chainSettings.addressEncoder) - system.actorOf(props) + system.actorOf(props.withDispatcher(GlobalConstants.IndexerDispatcher)) } } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala index 41e48f9fb8..b64b43522f 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala @@ -509,6 +509,7 @@ object ErgoWalletActor extends ScorexLogging { boxSelector: BoxSelector, historyReader: ErgoHistoryReader)(implicit actorSystem: ActorSystem): ActorRef = { val props = Props(classOf[ErgoWalletActor], settings, parameters, service, boxSelector, historyReader) + .withDispatcher(GlobalConstants.ApiDispatcher) val walletActorRef = actorSystem.actorOf(props) CoordinatedShutdown(actorSystem).addActorTerminationTask( CoordinatedShutdown.PhaseBeforeServiceUnbind,