Skip to content

Commit

Permalink
Parallelize RestoreFootprintOpFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
sisuresh committed Jul 5, 2024
1 parent 9e5f2b2 commit 57d458d
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 32 deletions.
15 changes: 12 additions & 3 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,10 @@ LedgerManagerImpl::applyThread(ThreadEntryMap& entryMap, Thread const& thread,
auto it = entryMap.find(lk);
releaseAssertOrThrow(it != entryMap.end());

if (lk.type() == TTL && it->second.first && updatedLe)
auto opType = txBundle.tx->getRawOperations().at(0).body.type();

if (opType != RESTORE_FOOTPRINT && lk.type() == TTL &&
it->second.first && updatedLe)
{
// If this is a TTL extension, then record it separately for
// now.
Expand All @@ -1612,15 +1615,21 @@ LedgerManagerImpl::applyThread(ThreadEntryMap& entryMap, Thread const& thread,
}
else
{
// If this is a TTL entry, it is either being created or
// deleted, so it should be observable.
// If this is a TTL entry, it is either being created,
// deleted, or restored, so it should be observable.

// TTL entry is being deleted, so extensions can be wiped.
if (lk.type() == TTL && !updatedLe)
{
ttlExtensions.erase(lk);
}

// If this is a create or restore, the entry is in the
// readWrite set, so it shouldn't be possible for an
// extension to be accepted prior to the entry being
// accessible.
releaseAssertOrThrow(ttlExtensions.count(lk) == 0);

// A entry deletion will be marked by a nullopt le.
// Set the dirty bit so it'll be written to ltx later.
it->second = {updatedLe, true};
Expand Down
42 changes: 18 additions & 24 deletions src/transactions/ExtendFootprintTTLOpFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ ExtendFootprintTTLOpFrame::doApplyParallel(
SorobanTxData& sorobanData, uint32_t ledgerSeq,
uint32_t ledgerVersion) const
{
ZoneNamedN(applyZone, "ExtendFootprintTTLOpFrame apply", true);
ZoneNamedN(applyZone, "ExtendFootprintTTLOpFrame doApplyParallel", true);

ExtendFootprintTTLMetrics metrics(sorobanMetrics);
auto timeScope = metrics.getExecTimer();
Expand All @@ -82,29 +82,25 @@ ExtendFootprintTTLOpFrame::doApplyParallel(
for (auto const& lk : footprint.readOnly)
{
auto ttlKey = getTTLKey(lk);
{
// Initially load without record since we may not need to modify
// entry
auto ttlIter = entryMap.find(ttlKey);
auto ttlIter = entryMap.find(ttlKey);

// auto ttlConstLtxe = ltx.loadWithoutRecord(ttlKey);
if (ttlIter == entryMap.end() || !ttlIter->second.first ||
!isLive(*ttlIter->second.first, ledgerSeq))
{
// Skip archived entries, as those must be restored.
//
// Also skip the missing entries. Since this happens at apply
// time and we refund the unspent fees, it is more beneficial
// to extend as many entries as possible.
continue;
}
// auto ttlConstLtxe = ltx.loadWithoutRecord(ttlKey);
if (ttlIter == entryMap.end() || !ttlIter->second.first ||
!isLive(*ttlIter->second.first, ledgerSeq))
{
// Skip archived entries, as those must be restored.
//
// Also skip the missing entries. Since this happens at apply
// time and we refund the unspent fees, it is more beneficial
// to extend as many entries as possible.
continue;
}

auto currLiveUntilLedgerSeq =
ttlIter->second.first->data.ttl().liveUntilLedgerSeq;
if (currLiveUntilLedgerSeq >= newLiveUntilLedgerSeq)
{
continue;
}
auto currLiveUntilLedgerSeq =
ttlIter->second.first->data.ttl().liveUntilLedgerSeq;
if (currLiveUntilLedgerSeq >= newLiveUntilLedgerSeq)
{
continue;
}

auto entryIter = entryMap.find(lk);
Expand Down Expand Up @@ -138,8 +134,6 @@ ExtendFootprintTTLOpFrame::doApplyParallel(
return {false, {}};
}

// We already checked that the TTLEntry exists in the logic above
auto ttlIter = entryMap.find(ttlKey);
auto ttlLe = *ttlIter->second.first;

rustEntryRentChanges.emplace_back();
Expand Down
2 changes: 1 addition & 1 deletion src/transactions/InvokeHostFunctionOpFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ InvokeHostFunctionOpFrame::doApplyParallel(
SorobanTxData& sorobanData, uint32_t ledgerSeq,
uint32_t ledgerVersion) const
{
ZoneNamedN(applyZone, "InvokeHostFunctionOpFrame apply", true);
ZoneNamedN(applyZone, "InvokeHostFunctionOpFrame doApplyParallel", true);

std::vector<LedgerEntryChange> changes;

Expand Down
113 changes: 113 additions & 0 deletions src/transactions/RestoreFootprintOpFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,119 @@ RestoreFootprintOpFrame::doApply(AbstractLedgerTxn& ltx,
throw std::runtime_error("RestoreFootprintOpFrame::doApply needs Config");
}

ParallelOpReturnVal
RestoreFootprintOpFrame::doApplyParallel(
ThreadEntryMap const& entryMap, // Must not be shared between threads
Config const& appConfig, SorobanNetworkConfig const& sorobanConfig,
Hash const& sorobanBasePrngSeed, CxxLedgerInfo const& ledgerInfo,
SorobanMetrics& sorobanMetrics, OperationResult& res,
SorobanTxData& sorobanData, uint32_t ledgerSeq,
uint32_t ledgerVersion) const
{
ZoneNamedN(applyZone, "RestoreFootprintOpFrame doApplyParallel", true);

RestoreFootprintMetrics metrics(sorobanMetrics);
auto timeScope = metrics.getExecTimer();

auto const& resources = mParentTx.sorobanResources();
auto const& footprint = resources.footprint;

// Keep track of LedgerEntry updates we need to make
ModifiedEntryMap opEntryMap;

auto const& archivalSettings = sorobanConfig.stateArchivalSettings();
rust::Vec<CxxLedgerEntryRentChange> rustEntryRentChanges;
// Extend the TTL on the restored entry to minimum TTL, including
// the current ledger.
uint32_t restoredLiveUntilLedger =
ledgerSeq + archivalSettings.minPersistentTTL - 1;
rustEntryRentChanges.reserve(footprint.readWrite.size());
for (auto const& lk : footprint.readWrite)
{
auto ttlKey = getTTLKey(lk);

auto ttlIter = entryMap.find(ttlKey);

// Skip entry if the TTLEntry is missing or if it's already live.
if (ttlIter == entryMap.end() || !ttlIter->second.first ||
isLive(*ttlIter->second.first, ledgerSeq))
{
continue;
}

// We must load the ContractCode/ContractData entry for fee purposes, as
// restore is considered a write
auto entryIter = entryMap.find(lk);

// We checked for TTLEntry existence above
releaseAssertOrThrow(entryIter != entryMap.end() &&
entryIter->second.first);

auto const& entryLe = *entryIter->second.first;

uint32_t entrySize = static_cast<uint32>(xdr::xdr_size(entryLe));
metrics.mLedgerReadByte += entrySize;
if (resources.readBytes < metrics.mLedgerReadByte)
{
sorobanData.pushApplyTimeDiagnosticError(
appConfig, SCE_BUDGET, SCEC_EXCEEDED_LIMIT,
"operation byte-read resources exceeds amount specified",
{makeU64SCVal(metrics.mLedgerReadByte),
makeU64SCVal(resources.readBytes)});
innerResult(res).code(RESTORE_FOOTPRINT_RESOURCE_LIMIT_EXCEEDED);
return {false, {}};
}

// To maintain consistency with InvokeHostFunction, TTLEntry
// writes come out of refundable fee, so only add entrySize
metrics.mLedgerWriteByte += entrySize;
if (!validateContractLedgerEntry(lk, entrySize, sorobanConfig,
appConfig, mParentTx, sorobanData))
{
innerResult(res).code(RESTORE_FOOTPRINT_RESOURCE_LIMIT_EXCEEDED);
return {false, {}};
}

if (resources.writeBytes < metrics.mLedgerWriteByte)
{
sorobanData.pushApplyTimeDiagnosticError(
appConfig, SCE_BUDGET, SCEC_EXCEEDED_LIMIT,
"operation byte-write resources exceeds amount specified",
{makeU64SCVal(metrics.mLedgerWriteByte),
makeU64SCVal(resources.writeBytes)});
innerResult(res).code(RESTORE_FOOTPRINT_RESOURCE_LIMIT_EXCEEDED);
return {false, {}};
}

rustEntryRentChanges.emplace_back();
auto& rustChange = rustEntryRentChanges.back();
rustChange.is_persistent = true;
// Treat the entry as if it hasn't existed before restoration
// for the rent fee purposes.
rustChange.old_size_bytes = 0;
rustChange.old_live_until_ledger = 0;
rustChange.new_size_bytes = entrySize;
rustChange.new_live_until_ledger = restoredLiveUntilLedger;

auto ttlLe = *ttlIter->second.first;
ttlLe.data.ttl().liveUntilLedgerSeq = restoredLiveUntilLedger;

opEntryMap.emplace(ttlKey, ttlLe);
}
int64_t rentFee = rust_bridge::compute_rent_fee(
appConfig.CURRENT_LEDGER_PROTOCOL_VERSION, ledgerVersion,
rustEntryRentChanges, sorobanConfig.rustBridgeRentFeeConfiguration(),
ledgerSeq);
if (!sorobanData.consumeRefundableSorobanResources(
0, rentFee, ledgerVersion, sorobanConfig, appConfig, mParentTx))
{
innerResult(res).code(RESTORE_FOOTPRINT_INSUFFICIENT_REFUNDABLE_FEE);
return {false, {}};
}
innerResult(res).code(RESTORE_FOOTPRINT_SUCCESS);
return {true, opEntryMap};
}

bool
RestoreFootprintOpFrame::doApplyForSoroban(Application& app,
AbstractLedgerTxn& ltx,
Expand Down
8 changes: 8 additions & 0 deletions src/transactions/RestoreFootprintOpFrame.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class RestoreFootprintOpFrame : public OperationFrame
bool doCheckValid(uint32_t ledgerVersion,
OperationResult& res) const override;

ParallelOpReturnVal doApplyParallel(
ThreadEntryMap const& entryMap, // Must not be shared between threads!
Config const& appConfig, SorobanNetworkConfig const& sorobanConfig,
Hash const& sorobanBasePrngSeed, CxxLedgerInfo const& ledgerInfo,
SorobanMetrics& sorobanMetrics, OperationResult& res,
SorobanTxData& sorobanData, uint32_t ledgerSeq,
uint32_t ledgerVersion) const override;

void
insertLedgerKeysToPrefetch(UnorderedSet<LedgerKey>& keys) const override;

Expand Down
79 changes: 75 additions & 4 deletions src/transactions/test/InvokeHostFunctionTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4453,12 +4453,12 @@ TEST_CASE("parallel ttl", "[tx][soroban][parallelapply]")
.liveUntilLedgerSeq == test.getLedgerSeq() + 4000);
};

SECTION("parallel extensions - ExtendFootprint op has highest extension")
SECTION("Parallel extensions - ExtendFootprint op has highest extension")
{
parallelTtlExtensions(false);
}

SECTION("parallel extensions - InvokeHostFunctionOp has highest extension")
SECTION("Parallel extensions - InvokeHostFunctionOp has highest extension")
{
parallelTtlExtensions(true);
}
Expand Down Expand Up @@ -4558,10 +4558,81 @@ TEST_CASE("parallel ttl", "[tx][soroban][parallelapply]")
.liveUntilLedgerSeq == test.getLedgerSeq() + 2000);
}

SECTION("Contract and ExtendFootprintOp")
SECTION("Restore and extend")
{
}
// Advance ledger until the contract expires
for (uint32_t i =
test.getApp().getLedgerManager().getLastClosedLedgerNum();
i <= expectedPersistentLiveUntilLedger + 1; ++i)
{
closeLedgerOn(test.getApp(), i, 2, 1, 2016);
}

auto ledgerSeq = test.getLedgerSeq();
auto const& contractKeys = client.getContract().getKeys();
REQUIRE(!test.isEntryLive(contractKeys[0], ledgerSeq));
REQUIRE(!test.isEntryLive(contractKeys[1], ledgerSeq));

SorobanResources restoreResources;
restoreResources.footprint.readWrite = contractKeys;
restoreResources.readBytes = 9'000;
restoreResources.writeBytes = 9'000;

auto const resourceFee = 300'000 + 40'000 * contractKeys.size();
auto tx1 =
test.createRestoreTx(restoreResources, 1'000, resourceFee, &a1);

SorobanResources extendResources;
extendResources.footprint.readOnly = contractKeys;
extendResources.readBytes = 9'000;
auto tx2 = test.createExtendOpTx(extendResources, 10'000, 30'000,
500'000, &a2);

auto i3 = client.getContract().prepareInvocation(
"put_persistent", {makeSymbolSCVal("key2"), makeU64SCVal(100)},
client.writeKeySpec("key2", ContractDataDurability::PERSISTENT));
auto tx3 = i3.withExactNonRefundableResourceFee().createTx(&a3);

auto i4 = client.getContract().prepareInvocation(
"extend_persistent",
{makeSymbolSCVal("key2"), makeU32SCVal(5000), makeU32SCVal(5000)},
client.readKeySpec("key2", ContractDataDurability::PERSISTENT));
auto tx4 = i4.withExactNonRefundableResourceFee().createTx(&a4);

LedgerTxn ltx(app.getLedgerTxnRoot());

TransactionMetaFrame tm(ltx.loadHeader().current().ledgerVersion);

std::vector<Stage> stages;
auto& stage = stages.emplace_back();

stage.resize(1);

// First thread
auto& thread1 = stage[0];

thread1.emplace_back(tx1, tx1->createSuccessResult(), tm);
thread1.emplace_back(tx2, tx2->createSuccessResult(), tm);
thread1.emplace_back(tx3, tx3->createSuccessResult(), tm);
thread1.emplace_back(tx4, tx4->createSuccessResult(), tm);

{
auto lmImpl = dynamic_cast<LedgerManagerImpl*>(&lm);
lmImpl->applySorobanStages(app, ltx, stages, Hash{});
ltx.commit();
}

REQUIRE(tx1->getResultCode() == txSUCCESS);
REQUIRE(tx2->getResultCode() == txSUCCESS);
REQUIRE(tx3->getResultCode() == txSUCCESS);
REQUIRE(tx4->getResultCode() == txSUCCESS);

REQUIRE(test.getTTL(contractKeys[0]) == ledgerSeq + 10'000);
REQUIRE(test.getTTL(contractKeys[1]) == ledgerSeq + 10'000);

REQUIRE(client.getTTL("key2", ContractDataDurability::PERSISTENT) ==
test.getLedgerSeq() + 5000);
}
// TODO: Add deletion test.
}

Expand Down

0 comments on commit 57d458d

Please sign in to comment.