From 285e3d3bea29c9579930e1a222b23edb821a9ccb Mon Sep 17 00:00:00 2001 From: Siddharth Suresh Date: Mon, 1 Jul 2024 16:14:16 -0700 Subject: [PATCH] Remove clusters --- src/ledger/LedgerManagerImpl.cpp | 256 ++++++++-------- src/ledger/LedgerManagerImpl.h | 13 +- src/transactions/FeeBumpTransactionFrame.cpp | 2 +- src/transactions/FeeBumpTransactionFrame.h | 2 +- .../InvokeHostFunctionOpFrame.cpp | 2 +- src/transactions/InvokeHostFunctionOpFrame.h | 2 +- src/transactions/OperationFrame.cpp | 4 +- src/transactions/OperationFrame.h | 4 +- src/transactions/TransactionFrame.cpp | 2 +- src/transactions/TransactionFrame.h | 2 +- src/transactions/TransactionFrameBase.h | 7 +- .../test/InvokeHostFunctionTests.cpp | 279 +++++++++++++----- .../test/TransactionTestFrame.cpp | 2 +- src/transactions/test/TransactionTestFrame.h | 2 +- 14 files changed, 353 insertions(+), 226 deletions(-) diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index 561c60c064..4642a781f8 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -1482,10 +1482,10 @@ LedgerManagerImpl::prefetchTransactionData( } } -ClusterEntryMap -LedgerManagerImpl::collectEntries(AbstractLedgerTxn& ltx, Cluster const& txs) +ThreadEntryMap +LedgerManagerImpl::collectEntries(AbstractLedgerTxn& ltx, Thread const& txs) { - ClusterEntryMap entryMap; + ThreadEntryMap entryMap; auto getEntries = [&](xdr::xvector const& keys) { for (auto const& lk : keys) { @@ -1548,31 +1548,30 @@ LedgerManagerImpl::collectInitialTTLEntries(AbstractLedgerTxn& ltx, for (auto const& thread : stage) { - for (auto const& cluster : thread) + for (auto const& txBundle : thread) { - for (auto const& txBundle : cluster) - { - getTTLEntries( - txBundle.tx->sorobanResources().footprint.readOnly); - getTTLEntries( - txBundle.tx->sorobanResources().footprint.readWrite); - } + getTTLEntries(txBundle.tx->sorobanResources().footprint.readOnly); + getTTLEntries(txBundle.tx->sorobanResources().footprint.readWrite); } } return initialTTLs; } -// TODO: Make applyCluster a non-member function void -LedgerManagerImpl::applyCluster(ClusterEntryMap& entryMap, Config const& config, - SorobanNetworkConfig const& sorobanConfig, - CxxLedgerInfo const& ledgerInfo, - Hash const& sorobanBasePrngSeed, - uint32_t ledgerSeq, uint32_t ledgerVersion, - Cluster const& txs) +LedgerManagerImpl::applyThread(ThreadEntryMap& entryMap, Thread const& thread, + Config const& config, + SorobanNetworkConfig const& sorobanConfig, + CxxLedgerInfo const& ledgerInfo, + Hash const& sorobanBasePrngSeed, + uint32_t ledgerSeq, uint32_t ledgerVersion) { - for (auto const& txBundle : txs) + // TTL extensions can't be observable between transactions, so we track them + // separately and update the entry map at the end of the thread. + + UnorderedMap ttlExtensions; + + for (auto const& txBundle : thread) { releaseAssertOrThrow(txBundle.resPayload); auto res = txBundle.tx->parallelApply( @@ -1590,17 +1589,40 @@ LedgerManagerImpl::applyCluster(ClusterEntryMap& entryMap, Config const& config, auto it = entryMap.find(lk); releaseAssertOrThrow(it != entryMap.end()); - // TODO: Remove this - if (lk.type() == TTL && updatedLe && it->second.first) + if (lk.type() == TTL && it->second.first && updatedLe) { - releaseAssertOrThrow( - updatedLe->data.ttl().liveUntilLedgerSeq > - it->second.first->data.ttl().liveUntilLedgerSeq); + // If this is a TTL extension, then record it separately for + // now. + auto ttlIt = ttlExtensions.find(lk); + if (ttlIt != ttlExtensions.end()) + { + ttlIt->second.liveUntilLedgerSeq = + std::max(ttlIt->second.liveUntilLedgerSeq, + updatedLe->data.ttl().liveUntilLedgerSeq); + } + else + { + releaseAssertOrThrow( + updatedLe->data.ttl().liveUntilLedgerSeq > + it->second.first->data.ttl().liveUntilLedgerSeq); + ttlExtensions.emplace(lk, updatedLe->data.ttl()); + } } + else + { + // If this is a TTL entry, it is either being created or + // deleted, so it should be observable. - // A entry deletion will be marked by a nullopt le. - // Set the dirty bit so it'll be written to ltx later. - it->second = {updatedLe, true}; + // TTL entry is being deleted, so extensions can be wiped. + if (lk.type() == TTL && !updatedLe) + { + ttlExtensions.erase(lk); + } + + // A entry deletion will be marked by a nullopt le. + // Set the dirty bit so it'll be written to ltx later. + it->second = {updatedLe, true}; + } } } else @@ -1611,27 +1633,17 @@ LedgerManagerImpl::applyCluster(ClusterEntryMap& entryMap, Config const& config, txBundle.mOpMetrics = res.mOpMetrics; txBundle.mDelta = res.mDelta; } -} -// Each cluster has its own entry map, so TTL updates will not be observable -// between clusters -void -LedgerManagerImpl::applyThread(std::vector& entryMapByCluster, - std::vector const& clusters, - Config const& config, - SorobanNetworkConfig const& sorobanConfig, - CxxLedgerInfo const& ledgerInfo, - Hash const& sorobanBasePrngSeed, - uint32_t ledgerSeq, uint32_t ledgerVersion) -{ - releaseAssertOrThrow(entryMapByCluster.size() == clusters.size()); - for (size_t i = 0; i < clusters.size(); ++i) + for (auto const& ttlExtension : ttlExtensions) { - auto const& cluster = clusters.at(i); - auto& entryMap = entryMapByCluster.at(i); + auto it = entryMap.find(ttlExtension.first); + releaseAssertOrThrow(it != entryMap.end()); - applyCluster(entryMap, config, sorobanConfig, ledgerInfo, - sorobanBasePrngSeed, ledgerSeq, ledgerVersion, cluster); + LedgerEntry updatedTtl; + updatedTtl.data.type(TTL); + updatedTtl.data.ttl() = ttlExtension.second; + + it->second = {updatedTtl, true}; } } @@ -1689,13 +1701,10 @@ LedgerManagerImpl::applySorobanStage(Application& app, AbstractLedgerTxn& ltx, { for (auto const& thread : stage) { - for (auto const& cluster : thread) + for (auto const& txBundle : thread) { - for (auto const& txBundle : cluster) - { - txBundle.tx->preParallelApply(app, ltx, txBundle.meta, - txBundle.resPayload, true); - } + txBundle.tx->preParallelApply(app, ltx, txBundle.meta, + txBundle.resPayload, true); } } @@ -1708,20 +1717,16 @@ LedgerManagerImpl::applySorobanStage(Application& app, AbstractLedgerTxn& ltx, auto const& ledgerInfo = getLedgerInfo(ltx, app, sorobanConfig); - std::vector> entryMapsByThread; + std::vector entryMapsByThread; for (auto const& thread : stage) { - auto& clusterEntryMaps = entryMapsByThread.emplace_back(); - for (auto const& cluster : thread) - { - clusterEntryMaps.emplace_back(collectEntries(ltx, cluster)); - } + entryMapsByThread.emplace_back(collectEntries(ltx, thread)); } std::vector threads; for (size_t i = 0; i < stage.size(); ++i) { - auto& entryMapByCluster = entryMapsByThread.at(i); + auto& entryMapByThread = entryMapsByThread.at(i); auto const& thread = stage.at(i); @@ -1730,7 +1735,7 @@ LedgerManagerImpl::applySorobanStage(Application& app, AbstractLedgerTxn& ltx, // TODO: entryMap should be moved in. // TODO: Use thread pool threads.push_back(std::thread( - &LedgerManagerImpl::applyThread, this, std::ref(entryMapByCluster), + &LedgerManagerImpl::applyThread, this, std::ref(entryMapByThread), std::ref(thread), config, sorobanConfig, std::ref(ledgerInfo), sorobanBasePrngSeed, ledgerSeq, ledgerVersion)); } @@ -1745,101 +1750,94 @@ LedgerManagerImpl::applySorobanStage(Application& app, AbstractLedgerTxn& ltx, for (auto const& thread : stage) { - for (auto const& cluster : thread) + for (auto const& txBundle : thread) { - for (auto const& txBundle : cluster) + // First check the invariants + if (txBundle.resPayload->isSuccess()) { - // First check the invariants - if (txBundle.resPayload->isSuccess()) + try { - try - { - // Soroban transactions don't have access to the ledger - // header, so they can't modify it. Pass in the current - // header as both current and previous. - txBundle.mDelta->header.current = - ltxInner.loadHeader().current(); - txBundle.mDelta->header.previous = - ltxInner.loadHeader().current(); - app.getInvariantManager().checkOnOperationApply( - txBundle.tx->getRawOperations().at(0), - txBundle.resPayload->getOpResultAt(0), - *txBundle.mDelta); - } - catch (InvariantDoesNotHold& e) - { - printErrorAndAbort( - "Invariant failure while applying operations: ", - e.what()); - } + // Soroban transactions don't have access to the ledger + // header, so they can't modify it. Pass in the current + // header as both current and previous. + txBundle.mDelta->header.current = + ltxInner.loadHeader().current(); + txBundle.mDelta->header.previous = + ltxInner.loadHeader().current(); + app.getInvariantManager().checkOnOperationApply( + txBundle.tx->getRawOperations().at(0), + txBundle.resPayload->getOpResultAt(0), + *txBundle.mDelta); + } + catch (InvariantDoesNotHold& e) + { + printErrorAndAbort( + "Invariant failure while applying operations: ", + e.what()); } + } - txBundle.tx->processPostApply(mApp, ltxInner, txBundle.meta, - txBundle.resPayload); + txBundle.tx->processPostApply(mApp, ltxInner, txBundle.meta, + txBundle.resPayload); - releaseAssertOrThrow(txBundle.mOpMetrics); - txBundle.mOpMetrics->updateSorobanMetrics(mSorobanMetrics); + releaseAssertOrThrow(txBundle.mOpMetrics); + txBundle.mOpMetrics->updateSorobanMetrics(mSorobanMetrics); - // We only increase the internal-error metric count if the - // ledger is a newer version. - if (txBundle.resPayload->getResultCode() == txINTERNAL_ERROR && - ledgerVersion >= - config - .LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT) - { - auto& internalErrorCounter = app.getMetrics().NewCounter( - {"ledger", "transaction", "internal-error"}); - internalErrorCounter.inc(); - } + // We only increase the internal-error metric count if the + // ledger is a newer version. + if (txBundle.resPayload->getResultCode() == txINTERNAL_ERROR && + ledgerVersion >= + config.LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT) + { + auto& internalErrorCounter = app.getMetrics().NewCounter( + {"ledger", "transaction", "internal-error"}); + internalErrorCounter.inc(); } } } // TODO: Look into adding invariants checking for conflicting writes between // clusters - for (auto const& entryMapsByCluster : entryMapsByThread) + for (auto const& threadEntryMap : entryMapsByThread) { - for (auto const& clusterEntryMap : entryMapsByCluster) + for (auto const& entry : threadEntryMap) { - for (auto const& entry : clusterEntryMap) + // Only update if dirty bit is set + if (!entry.second.second) { - // Only update if dirty bit is set - if (!entry.second.second) - { - continue; - } + continue; + } - if (entry.second.first) + if (entry.second.first) + { + auto const& updatedTTL = *entry.second.first; + auto ltxe = ltxInner.load(entry.first); + if (ltxe) { - auto const& updatedTTL = *entry.second.first; - auto ltxe = ltxInner.load(entry.first); - if (ltxe) + if (ltxe.current().data.type() == TTL) { - if (ltxe.current().data.type() == TTL) + // Only update TTL if we're increasing it + auto currLiveUntil = + ltxe.current().data.ttl().liveUntilLedgerSeq; + if (currLiveUntil >= + updatedTTL.data.ttl().liveUntilLedgerSeq) { - // Only update TTL if we're increasing it - auto currLiveUntil = - ltxe.current().data.ttl().liveUntilLedgerSeq; - if (currLiveUntil >= - updatedTTL.data.ttl().liveUntilLedgerSeq) - { - continue; - } + continue; } - ltxe.current() = updatedTTL; - } - else - { - ltxInner.create(updatedTTL); } + ltxe.current() = updatedTTL; } else { - auto ltxe = ltxInner.load(entry.first); - if (ltxe) - { - ltxInner.erase(entry.first); - } + ltxInner.create(updatedTTL); + } + } + else + { + auto ltxe = ltxInner.load(entry.first); + if (ltxe) + { + ltxInner.erase(entry.first); } } } diff --git a/src/ledger/LedgerManagerImpl.h b/src/ledger/LedgerManagerImpl.h index b389bec77f..508aa84866 100644 --- a/src/ledger/LedgerManagerImpl.h +++ b/src/ledger/LedgerManagerImpl.h @@ -91,24 +91,17 @@ class LedgerManagerImpl : public LedgerManager AbstractLedgerTxn& ltx, TransactionResultSet& txResultSet, std::unique_ptr const& ledgerCloseMeta); - ClusterEntryMap collectEntries(AbstractLedgerTxn& ltx, Cluster const& txs); + ThreadEntryMap collectEntries(AbstractLedgerTxn& ltx, Thread const& txs); TTLs collectInitialTTLEntries(AbstractLedgerTxn& ltx, Stage const& stage); - void applyThread(std::vector& entryMapByCluster, - std::vector const& clusters, Config const& config, + void applyThread(ThreadEntryMap& entryMapByCluster, Thread const& thread, + Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, Hash const& sorobanBasePrngSeed, uint32_t ledgerSeq, uint32_t ledgerVersion); - // TODO: Make these three methods const - void applyCluster(ClusterEntryMap& entryMap, Config const& config, - SorobanNetworkConfig const& sorobanConfig, - CxxLedgerInfo const& ledgerInfo, - Hash const& sorobanBasePrngSeed, uint32_t ledgerSeq, - uint32_t ledgerVersion, Cluster const& txs); - void applySorobanStage(Application& app, AbstractLedgerTxn& ltx, Stage const& stage, Hash const& sorobanBasePrngSeed); diff --git a/src/transactions/FeeBumpTransactionFrame.cpp b/src/transactions/FeeBumpTransactionFrame.cpp index b382221207..cd1623d421 100644 --- a/src/transactions/FeeBumpTransactionFrame.cpp +++ b/src/transactions/FeeBumpTransactionFrame.cpp @@ -103,7 +103,7 @@ FeeBumpTransactionFrame::preParallelApply(Application& app, ParallelOpReturnVal FeeBumpTransactionFrame::parallelApply( - ClusterEntryMap const& entryMap, // Must not be shared between threads!, + ThreadEntryMap const& entryMap, // Must not be shared between threads!, Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, MutableTxResultPtr txResult, Hash const& sorobanBasePrngSeed, TransactionMetaFrame& meta, diff --git a/src/transactions/FeeBumpTransactionFrame.h b/src/transactions/FeeBumpTransactionFrame.h index cb07ebb2dd..82a92d7530 100644 --- a/src/transactions/FeeBumpTransactionFrame.h +++ b/src/transactions/FeeBumpTransactionFrame.h @@ -74,7 +74,7 @@ class FeeBumpTransactionFrame : public TransactionFrameBase bool chargeFee) const override; ParallelOpReturnVal parallelApply( - ClusterEntryMap const& entryMap, // Must not be shared between threads!, + ThreadEntryMap const& entryMap, // Must not be shared between threads!, Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, MutableTxResultPtr resPayload, Hash const& sorobanBasePrngSeed, TransactionMetaFrame& meta, diff --git a/src/transactions/InvokeHostFunctionOpFrame.cpp b/src/transactions/InvokeHostFunctionOpFrame.cpp index 2897845d77..e8780e2030 100644 --- a/src/transactions/InvokeHostFunctionOpFrame.cpp +++ b/src/transactions/InvokeHostFunctionOpFrame.cpp @@ -476,7 +476,7 @@ InvokeHostFunctionOpFrame::maybePopulateDiagnosticEvents( ParallelOpReturnVal InvokeHostFunctionOpFrame::doApplyParallel( - ClusterEntryMap const& entryMap, // Must not be shared between threads + ThreadEntryMap const& entryMap, // Must not be shared between threads Config const& appConfig, SorobanNetworkConfig const& sorobanConfig, Hash const& sorobanBasePrngSeed, CxxLedgerInfo const& ledgerInfo, OperationResult& res, SorobanTxData& sorobanData, uint32_t ledgerSeq, diff --git a/src/transactions/InvokeHostFunctionOpFrame.h b/src/transactions/InvokeHostFunctionOpFrame.h index a390895d4a..5e8229dfee 100644 --- a/src/transactions/InvokeHostFunctionOpFrame.h +++ b/src/transactions/InvokeHostFunctionOpFrame.h @@ -55,7 +55,7 @@ class InvokeHostFunctionOpFrame : public OperationFrame OperationResult& res) const override; ParallelOpReturnVal doApplyParallel( - ClusterEntryMap const& entryMap, // Must not be shared between threads! + ThreadEntryMap const& entryMap, // Must not be shared between threads! Config const& appConfig, SorobanNetworkConfig const& sorobanConfig, Hash const& sorobanBasePrngSeed, CxxLedgerInfo const& ledgerInfo, OperationResult& res, SorobanTxData& sorobanData, uint32_t ledgerSeq, diff --git a/src/transactions/OperationFrame.cpp b/src/transactions/OperationFrame.cpp index 559c1e5635..6d06ef5080 100644 --- a/src/transactions/OperationFrame.cpp +++ b/src/transactions/OperationFrame.cpp @@ -163,7 +163,7 @@ OperationFrame::apply(Application& app, SignatureChecker& signatureChecker, } ParallelOpReturnVal -OperationFrame::applyParallel(ClusterEntryMap const& entryMap, +OperationFrame::applyParallel(ThreadEntryMap const& entryMap, Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, @@ -181,7 +181,7 @@ OperationFrame::applyParallel(ClusterEntryMap const& entryMap, } ParallelOpReturnVal -OperationFrame::doApplyParallel(ClusterEntryMap const& entryMap, +OperationFrame::doApplyParallel(ThreadEntryMap const& entryMap, Config const& appConfig, SorobanNetworkConfig const& sorobanConfig, Hash const& sorobanBasePrngSeed, diff --git a/src/transactions/OperationFrame.h b/src/transactions/OperationFrame.h index c6f02eec1c..fb0133d88b 100644 --- a/src/transactions/OperationFrame.h +++ b/src/transactions/OperationFrame.h @@ -58,7 +58,7 @@ class OperationFrame OperationResult& res) const = 0; virtual ParallelOpReturnVal doApplyParallel( - ClusterEntryMap const& entryMap, // Must not be shared between threads! + ThreadEntryMap const& entryMap, // Must not be shared between threads! Config const& appConfig, SorobanNetworkConfig const& sorobanConfig, Hash const& sorobanBasePrngSeed, CxxLedgerInfo const& ledgerInfo, OperationResult& res, SorobanTxData& sorobanData, uint32_t ledgerSeq, @@ -103,7 +103,7 @@ class OperationFrame std::shared_ptr sorobanData) const; ParallelOpReturnVal applyParallel( - ClusterEntryMap const& entryMap, // Must not be shared between threads!, + ThreadEntryMap const& entryMap, // Must not be shared between threads!, Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, OperationResult& res, SorobanTxData& sorobanData, Hash const& sorobanBasePrngSeed, diff --git a/src/transactions/TransactionFrame.cpp b/src/transactions/TransactionFrame.cpp index 9e3acd1218..a6014ba59f 100644 --- a/src/transactions/TransactionFrame.cpp +++ b/src/transactions/TransactionFrame.cpp @@ -1512,7 +1512,7 @@ TransactionFrame::apply(Application& app, AbstractLedgerTxn& ltx, ParallelOpReturnVal TransactionFrame::parallelApply( - ClusterEntryMap const& entryMap, // Must not be shared between threads!, + ThreadEntryMap const& entryMap, // Must not be shared between threads!, Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, MutableTxResultPtr txResult, Hash const& sorobanBasePrngSeed, TransactionMetaFrame& meta, diff --git a/src/transactions/TransactionFrame.h b/src/transactions/TransactionFrame.h index 77d6bb23e1..e87a6bb131 100644 --- a/src/transactions/TransactionFrame.h +++ b/src/transactions/TransactionFrame.h @@ -240,7 +240,7 @@ class TransactionFrame : public TransactionFrameBase bool chargeFee) const override; ParallelOpReturnVal parallelApply( - ClusterEntryMap const& entryMap, // Must not be shared between threads!, + ThreadEntryMap const& entryMap, // Must not be shared between threads!, Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, MutableTxResultPtr resPayload, Hash const& sorobanBasePrngSeed, TransactionMetaFrame& meta, diff --git a/src/transactions/TransactionFrameBase.h b/src/transactions/TransactionFrameBase.h index 8ad6f51de6..b5a629c5b4 100644 --- a/src/transactions/TransactionFrameBase.h +++ b/src/transactions/TransactionFrameBase.h @@ -36,7 +36,7 @@ using TransactionFrameBaseConstPtr = std::shared_ptr; using ModifiedEntryMap = UnorderedMap>; -using ClusterEntryMap = +using ThreadEntryMap = UnorderedMap, bool /*dirty*/>>; @@ -74,8 +74,7 @@ class TxBundle mutable std::shared_ptr mDelta; }; -typedef std::vector Cluster; -typedef std::vector Thread; +typedef std::vector Thread; typedef std::vector Stage; typedef UnorderedMap TTLs; @@ -96,7 +95,7 @@ class TransactionFrameBase bool chargeFee) const = 0; virtual ParallelOpReturnVal parallelApply( - ClusterEntryMap const& entryMap, // Must not be shared between threads!, + ThreadEntryMap const& entryMap, // Must not be shared between threads!, Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, MutableTxResultPtr resPayload, Hash const& sorobanBasePrngSeed, TransactionMetaFrame& meta, diff --git a/src/transactions/test/InvokeHostFunctionTests.cpp b/src/transactions/test/InvokeHostFunctionTests.cpp index 3f8035e969..0fd822f073 100644 --- a/src/transactions/test/InvokeHostFunctionTests.cpp +++ b/src/transactions/test/InvokeHostFunctionTests.cpp @@ -4296,7 +4296,7 @@ TEST_CASE("Vm instantiation tightening", "[tx][soroban]") } } -TEST_CASE("parallel ttl", "[tx][soroban]") +TEST_CASE("parallel ttl", "[tx][soroban][parallelapply]") { auto modifyCfg = [](SorobanNetworkConfig& cfg) { // Increase write fee so the fee will be greater than 1 @@ -4336,84 +4336,219 @@ TEST_CASE("parallel ttl", "[tx][soroban]") REQUIRE(client.getTTL("key1", ContractDataDurability::PERSISTENT) == expectedPersistentLiveUntilLedger); - auto i1 = client.getContract().prepareInvocation( - "extend_persistent", - {makeSymbolSCVal("key1"), makeU32SCVal(100'000), makeU32SCVal(100'000)}, - client.readKeySpec("key1", ContractDataDurability::PERSISTENT)); - auto tx1 = i1.withExactNonRefundableResourceFee().createTx(&a1); + SECTION("parallel extensions") + { + auto i1 = client.getContract().prepareInvocation( + "extend_persistent", + {makeSymbolSCVal("key1"), makeU32SCVal(100'000), + makeU32SCVal(100'000)}, + client.readKeySpec("key1", ContractDataDurability::PERSISTENT)); + auto tx1 = i1.withExactNonRefundableResourceFee().createTx(&a1); + + auto i2 = client.getContract().prepareInvocation( + "extend_persistent", + {makeSymbolSCVal("key1"), makeU32SCVal(2000), makeU32SCVal(2000)}, + client.readKeySpec("key1", ContractDataDurability::PERSISTENT)); + auto tx2 = i2.withExactNonRefundableResourceFee().createTx(&a2); + + auto i3 = client.getContract().prepareInvocation( + "extend_persistent", + {makeSymbolSCVal("key1"), makeU32SCVal(3000), makeU32SCVal(3000)}, + client.readKeySpec("key1", ContractDataDurability::PERSISTENT)); + auto tx3 = i3.withExactNonRefundableResourceFee().createTx(&a3); + + auto i4 = client.getContract().prepareInvocation( + "extend_persistent", + {makeSymbolSCVal("key1"), makeU32SCVal(4000), makeU32SCVal(4000)}, + client.readKeySpec("key1", ContractDataDurability::PERSISTENT)); + auto tx4 = i4.withExactNonRefundableResourceFee().createTx(&a4); - auto i2 = client.getContract().prepareInvocation( - "extend_persistent", - {makeSymbolSCVal("key1"), makeU32SCVal(2000), makeU32SCVal(2000)}, - client.readKeySpec("key1", ContractDataDurability::PERSISTENT)); - auto tx2 = i2.withExactNonRefundableResourceFee().createTx(&a2); + LedgerTxn ltx(app.getLedgerTxnRoot()); - auto i3 = client.getContract().prepareInvocation( - "extend_persistent", - {makeSymbolSCVal("key1"), makeU32SCVal(3000), makeU32SCVal(3000)}, - client.readKeySpec("key1", ContractDataDurability::PERSISTENT)); - auto tx3 = i3.withExactNonRefundableResourceFee().createTx(&a3); + TransactionMetaFrame tm(ltx.loadHeader().current().ledgerVersion); - auto i4 = client.getContract().prepareInvocation( - "extend_persistent", - {makeSymbolSCVal("key1"), makeU32SCVal(3000), makeU32SCVal(3000)}, - client.readKeySpec("key1", ContractDataDurability::PERSISTENT)); - auto tx4 = i4.withExactNonRefundableResourceFee().createTx(&a4); + std::vector stages; + auto& stage = stages.emplace_back(); - LedgerTxn ltx(app.getLedgerTxnRoot()); + stage.resize(2); - TransactionMetaFrame tm(ltx.loadHeader().current().ledgerVersion); + // First thread + auto& thread1 = stage[0]; + thread1.emplace_back(tx1, tx1->createSuccessResult(), tm); + thread1.emplace_back(tx4, tx4->createSuccessResult(), tm); - std::vector stages; - auto& stage = stages.emplace_back(); + // Second thread + auto& thread2 = stage[1]; + thread2.emplace_back(tx2, tx2->createSuccessResult(), tm); + thread2.emplace_back(tx3, tx3->createSuccessResult(), tm); - // Put first two clusters into same thread, and third cluster into a - // different thread - stage.resize(2); + { + auto lmImpl = dynamic_cast(&lm); + lmImpl->applySorobanStages(app, ltx, stages, Hash{}); + ltx.commit(); + } - // First thread - auto& thread1 = stage[0]; - thread1.resize(3); - auto& cluster1 = thread1[0]; - cluster1.emplace_back(tx1, tx1->createSuccessResult(), tm); + REQUIRE(tx1->getResultCode() == txSUCCESS); + REQUIRE(tx2->getResultCode() == txSUCCESS); + REQUIRE(tx3->getResultCode() == txSUCCESS); + REQUIRE(tx4->getResultCode() == txSUCCESS); + + // FeeCharged is initialized to 0 in this test which is incorrect. + // That's why these values are negative after the refund. + // REQUIRE(tx1->getResult().feeCharged == -38999); + // REQUIRE(tx2->getResult().feeCharged == -39899); + // REQUIRE(tx3->getResult().feeCharged == -39799); + // REQUIRE(tx4->getResult().feeCharged == -39699); + + REQUIRE(client.getTTL("key1", ContractDataDurability::PERSISTENT) == + test.getLedgerSeq() + 100'000); + + auto const& extensionMetaChangesTx1 = + thread1[0].meta.getXDR().v3().operations.front().changes; + auto const& extensionMetaChangesTx2 = + thread2[0].meta.getXDR().v3().operations.front().changes; + auto const& extensionMetaChangesTx3 = + thread2[1].meta.getXDR().v3().operations.front().changes; + auto const& extensionMetaChangesTx4 = + thread1[1].meta.getXDR().v3().operations.front().changes; + + // Note that even though both transactions are in the same thread, they + // did not observe the other transactions bump, and instead bumped from + // the initial ttl. + REQUIRE(extensionMetaChangesTx1.at(0) + .state() + .data.ttl() + .liveUntilLedgerSeq == expectedPersistentLiveUntilLedger); + REQUIRE(extensionMetaChangesTx2.at(0) + .state() + .data.ttl() + .liveUntilLedgerSeq == expectedPersistentLiveUntilLedger); + REQUIRE(extensionMetaChangesTx3.at(0) + .state() + .data.ttl() + .liveUntilLedgerSeq == expectedPersistentLiveUntilLedger); + REQUIRE(extensionMetaChangesTx4.at(0) + .state() + .data.ttl() + .liveUntilLedgerSeq == expectedPersistentLiveUntilLedger); + + REQUIRE(extensionMetaChangesTx1.at(1) + .updated() + .data.ttl() + .liveUntilLedgerSeq == test.getLedgerSeq() + 100'000); + REQUIRE(extensionMetaChangesTx2.at(1) + .updated() + .data.ttl() + .liveUntilLedgerSeq == test.getLedgerSeq() + 2000); + REQUIRE(extensionMetaChangesTx3.at(1) + .updated() + .data.ttl() + .liveUntilLedgerSeq == test.getLedgerSeq() + 3000); + REQUIRE(extensionMetaChangesTx4.at(1) + .updated() + .data.ttl() + .liveUntilLedgerSeq == test.getLedgerSeq() + 4000); + } - auto& cluster2 = thread1[1]; - cluster2.emplace_back(tx2, tx2->createSuccessResult(), tm); + SECTION("Creation and extension") + { + auto i1 = client.getContract().prepareInvocation( + "put_persistent", {makeSymbolSCVal("key2"), makeU64SCVal(100)}, + client.writeKeySpec("key2", ContractDataDurability::PERSISTENT)); + auto tx1 = i1.withExactNonRefundableResourceFee().createTx(&a1); + + auto i2 = client.getContract().prepareInvocation( + "extend_persistent", + {makeSymbolSCVal("key2"), makeU32SCVal(5000), makeU32SCVal(5000)}, + client.readKeySpec("key2", ContractDataDurability::PERSISTENT)); + auto tx2 = i2.withExactNonRefundableResourceFee().createTx(&a2); + + auto i3 = client.getContract().prepareInvocation( + "extend_persistent", + {makeSymbolSCVal("key2"), makeU32SCVal(2000), makeU32SCVal(2000)}, + client.readKeySpec("key2", ContractDataDurability::PERSISTENT)); + auto tx3 = i3.withExactNonRefundableResourceFee().createTx(&a3); + + auto i4 = client.getContract().prepareInvocation( + "put_persistent", {makeSymbolSCVal("key3"), makeU64SCVal(200)}, + client.writeKeySpec("key3", ContractDataDurability::PERSISTENT)); + auto tx4 = i4.withExactNonRefundableResourceFee().createTx(&a4); - auto& cluster4 = thread1[2]; - cluster4.emplace_back(tx4, tx4->createSuccessResult(), tm); + LedgerTxn ltx(app.getLedgerTxnRoot()); - // Second thread - auto& cluster3 = stage[1].emplace_back(); - cluster3.emplace_back(tx3, tx3->createSuccessResult(), tm); + TransactionMetaFrame tm(ltx.loadHeader().current().ledgerVersion); - { - auto lmImpl = dynamic_cast(&lm); - lmImpl->applySorobanStages(app, ltx, stages, Hash{}); - ltx.commit(); - } + std::vector stages; + auto& stage = stages.emplace_back(); - REQUIRE(tx1->getResultCode() == txSUCCESS); - REQUIRE(tx2->getResultCode() == txSUCCESS); - REQUIRE(tx3->getResultCode() == txSUCCESS); - REQUIRE(tx4->getResultCode() == txSUCCESS); + stage.resize(2); - // FeeCharged is initialized to 0 in this test which is incorrect. That's - // why these values are negative after the refund. They also shouldn't all - // be the same. - REQUIRE(tx1->getResult().feeCharged == -39999); - REQUIRE(tx2->getResult().feeCharged == -39999); - REQUIRE(tx3->getResult().feeCharged == -39999); - REQUIRE(tx4->getResult().feeCharged == -39999); + // First thread + auto& thread1 = stage[0]; + thread1.emplace_back(tx1, tx1->createSuccessResult(), tm); + thread1.emplace_back(tx2, tx2->createSuccessResult(), tm); + thread1.emplace_back(tx3, tx3->createSuccessResult(), tm); - REQUIRE(client.getTTL("key1", ContractDataDurability::PERSISTENT) == - test.getLedgerSeq() + 100'000); + // Second thread + auto& thread2 = stage[1]; + thread2.emplace_back(tx4, tx4->createSuccessResult(), tm); + + { + auto lmImpl = dynamic_cast(&lm); + lmImpl->applySorobanStages(app, ltx, stages, Hash{}); + ltx.commit(); + } + + REQUIRE(tx1->getResultCode() == txSUCCESS); + REQUIRE(tx2->getResultCode() == txSUCCESS); + REQUIRE(tx3->getResultCode() == txSUCCESS); + REQUIRE(tx4->getResultCode() == txSUCCESS); + + // TODO: Check fee charged! + // FeeCharged is initialized to 0 in this test which is incorrect. + // That's why these values are negative after the refund. + /* REQUIRE(tx1->getResult().feeCharged == -38999); + REQUIRE(tx2->getResult().feeCharged == -39899); + REQUIRE(tx3->getResult().feeCharged == -39799); + REQUIRE(tx4->getResult().feeCharged == -39699); */ + + REQUIRE(client.getTTL("key2", ContractDataDurability::PERSISTENT) == + test.getLedgerSeq() + 5000); - std::cout << xdrToCerealString(cluster1.front().meta.getXDR(), "meta1") - << std::endl; + REQUIRE(client.getTTL("key3", ContractDataDurability::PERSISTENT) == + expectedPersistentLiveUntilLedger); + + auto const& extensionMetaChangesTx2 = + thread1[1].meta.getXDR().v3().operations.front().changes; + auto const& extensionMetaChangesTx3 = + thread1[2].meta.getXDR().v3().operations.front().changes; + + // Note that even though both transactions are in the same thread, they + // did not observe the other transactions bump, and instead bumped from + // the initial ttl. + REQUIRE(extensionMetaChangesTx2.at(0) + .state() + .data.ttl() + .liveUntilLedgerSeq == expectedPersistentLiveUntilLedger); + REQUIRE(extensionMetaChangesTx3.at(0) + .state() + .data.ttl() + .liveUntilLedgerSeq == expectedPersistentLiveUntilLedger); + + REQUIRE(extensionMetaChangesTx2.at(1) + .updated() + .data.ttl() + .liveUntilLedgerSeq == test.getLedgerSeq() + 5000); + REQUIRE(extensionMetaChangesTx3.at(1) + .updated() + .data.ttl() + .liveUntilLedgerSeq == test.getLedgerSeq() + 2000); + } + + // TODO: Add deletion test. } -TEST_CASE("parallel", "[tx][soroban]") +TEST_CASE("parallel", "[tx][soroban][parallelapply]") { auto cfg = getTestConfig(); SorobanTest test(cfg); @@ -4495,18 +4630,18 @@ TEST_CASE("parallel", "[tx][soroban]") auto& stage = stages.emplace_back(); stage.resize(3); - auto& cluster1 = stage[0].emplace_back(); - cluster1.emplace_back(tx1, tx1->createSuccessResult(), tm); - cluster1.emplace_back(tx3, tx3->createSuccessResult(), tm); + auto& thread1 = stage[0]; + thread1.emplace_back(tx1, tx1->createSuccessResult(), tm); + thread1.emplace_back(tx3, tx3->createSuccessResult(), tm); - auto& cluster2 = stage[1].emplace_back(); - cluster2.emplace_back(tx2, tx2->createSuccessResult(), tm); - cluster2.emplace_back(tx7, tx7->createSuccessResult(), tm); + auto& thread2 = stage[1]; + thread2.emplace_back(tx2, tx2->createSuccessResult(), tm); + thread2.emplace_back(tx7, tx7->createSuccessResult(), tm); - auto& cluster3 = stage[2].emplace_back(); - cluster3.emplace_back(tx4, tx4->createSuccessResult(), tm); - cluster3.emplace_back(transferTx1, transferTx1->createSuccessResult(), tm); - cluster3.emplace_back(transferTx2, transferTx2->createSuccessResult(), tm); + auto& thread3 = stage[2]; + thread3.emplace_back(tx4, tx4->createSuccessResult(), tm); + thread3.emplace_back(transferTx1, transferTx1->createSuccessResult(), tm); + thread3.emplace_back(transferTx2, transferTx2->createSuccessResult(), tm); auto timerBefore = hostFnExecTimer.count(); { @@ -4544,6 +4679,8 @@ TEST_CASE("parallel", "[tx][soroban]") .invokeHostFunctionResult() .code() == INVOKE_HOST_FUNCTION_INSUFFICIENT_REFUNDABLE_FEE); + // TODO: Check fee charged! + REQUIRE(a5.getTrustlineBalance(idr) == 150); REQUIRE(a6.getTrustlineBalance(idr) == 75); } \ No newline at end of file diff --git a/src/transactions/test/TransactionTestFrame.cpp b/src/transactions/test/TransactionTestFrame.cpp index ca6013c639..079d519efa 100644 --- a/src/transactions/test/TransactionTestFrame.cpp +++ b/src/transactions/test/TransactionTestFrame.cpp @@ -287,7 +287,7 @@ TransactionTestFrame::preParallelApply(Application& app, AbstractLedgerTxn& ltx, ParallelOpReturnVal TransactionTestFrame::parallelApply( - ClusterEntryMap const& entryMap, // Must not be shared between threads!, + ThreadEntryMap const& entryMap, // Must not be shared between threads!, Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, MutableTxResultPtr resPayload, Hash const& sorobanBasePrngSeed, TransactionMetaFrame& meta, diff --git a/src/transactions/test/TransactionTestFrame.h b/src/transactions/test/TransactionTestFrame.h index 3539db6c8d..e5937f9773 100644 --- a/src/transactions/test/TransactionTestFrame.h +++ b/src/transactions/test/TransactionTestFrame.h @@ -119,7 +119,7 @@ class TransactionTestFrame : public TransactionFrameBase bool chargeFee) const override; ParallelOpReturnVal parallelApply( - ClusterEntryMap const& entryMap, // Must not be shared between threads!, + ThreadEntryMap const& entryMap, // Must not be shared between threads!, Config const& config, SorobanNetworkConfig const& sorobanConfig, CxxLedgerInfo const& ledgerInfo, MutableTxResultPtr resPayload, Hash const& sorobanBasePrngSeed, TransactionMetaFrame& meta,