Skip to content

Commit

Permalink
Remove clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
sisuresh committed Jul 2, 2024
1 parent df49917 commit 285e3d3
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 226 deletions.
256 changes: 127 additions & 129 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1482,10 +1482,10 @@ LedgerManagerImpl::prefetchTransactionData(
}
}

ClusterEntryMap
LedgerManagerImpl::collectEntries(AbstractLedgerTxn& ltx, Cluster const& txs)
ThreadEntryMap
LedgerManagerImpl::collectEntries(AbstractLedgerTxn& ltx, Thread const& txs)
{
ClusterEntryMap entryMap;
ThreadEntryMap entryMap;
auto getEntries = [&](xdr::xvector<LedgerKey> const& keys) {
for (auto const& lk : keys)
{
Expand Down Expand Up @@ -1548,31 +1548,30 @@ LedgerManagerImpl::collectInitialTTLEntries(AbstractLedgerTxn& ltx,

for (auto const& thread : stage)
{
for (auto const& cluster : thread)
for (auto const& txBundle : thread)
{
for (auto const& txBundle : cluster)
{
getTTLEntries(
txBundle.tx->sorobanResources().footprint.readOnly);
getTTLEntries(
txBundle.tx->sorobanResources().footprint.readWrite);
}
getTTLEntries(txBundle.tx->sorobanResources().footprint.readOnly);
getTTLEntries(txBundle.tx->sorobanResources().footprint.readWrite);
}
}

return initialTTLs;
}

// TODO: Make applyCluster a non-member function
void
LedgerManagerImpl::applyCluster(ClusterEntryMap& entryMap, Config const& config,
SorobanNetworkConfig const& sorobanConfig,
CxxLedgerInfo const& ledgerInfo,
Hash const& sorobanBasePrngSeed,
uint32_t ledgerSeq, uint32_t ledgerVersion,
Cluster const& txs)
LedgerManagerImpl::applyThread(ThreadEntryMap& entryMap, Thread const& thread,
Config const& config,
SorobanNetworkConfig const& sorobanConfig,
CxxLedgerInfo const& ledgerInfo,
Hash const& sorobanBasePrngSeed,
uint32_t ledgerSeq, uint32_t ledgerVersion)
{
for (auto const& txBundle : txs)
// TTL extensions can't be observable between transactions, so we track them
// separately and update the entry map at the end of the thread.

UnorderedMap<LedgerKey, TTLEntry> ttlExtensions;

for (auto const& txBundle : thread)
{
releaseAssertOrThrow(txBundle.resPayload);
auto res = txBundle.tx->parallelApply(
Expand All @@ -1590,17 +1589,40 @@ LedgerManagerImpl::applyCluster(ClusterEntryMap& entryMap, Config const& config,
auto it = entryMap.find(lk);
releaseAssertOrThrow(it != entryMap.end());

// TODO: Remove this
if (lk.type() == TTL && updatedLe && it->second.first)
if (lk.type() == TTL && it->second.first && updatedLe)
{
releaseAssertOrThrow(
updatedLe->data.ttl().liveUntilLedgerSeq >
it->second.first->data.ttl().liveUntilLedgerSeq);
// If this is a TTL extension, then record it separately for
// now.
auto ttlIt = ttlExtensions.find(lk);
if (ttlIt != ttlExtensions.end())
{
ttlIt->second.liveUntilLedgerSeq =
std::max(ttlIt->second.liveUntilLedgerSeq,
updatedLe->data.ttl().liveUntilLedgerSeq);
}
else
{
releaseAssertOrThrow(
updatedLe->data.ttl().liveUntilLedgerSeq >
it->second.first->data.ttl().liveUntilLedgerSeq);
ttlExtensions.emplace(lk, updatedLe->data.ttl());
}
}
else
{
// If this is a TTL entry, it is either being created or
// deleted, so it should be observable.

// A entry deletion will be marked by a nullopt le.
// Set the dirty bit so it'll be written to ltx later.
it->second = {updatedLe, true};
// TTL entry is being deleted, so extensions can be wiped.
if (lk.type() == TTL && !updatedLe)
{
ttlExtensions.erase(lk);
}

// A entry deletion will be marked by a nullopt le.
// Set the dirty bit so it'll be written to ltx later.
it->second = {updatedLe, true};
}
}
}
else
Expand All @@ -1611,27 +1633,17 @@ LedgerManagerImpl::applyCluster(ClusterEntryMap& entryMap, Config const& config,
txBundle.mOpMetrics = res.mOpMetrics;
txBundle.mDelta = res.mDelta;
}
}

// Each cluster has its own entry map, so TTL updates will not be observable
// between clusters
void
LedgerManagerImpl::applyThread(std::vector<ClusterEntryMap>& entryMapByCluster,
std::vector<Cluster> const& clusters,
Config const& config,
SorobanNetworkConfig const& sorobanConfig,
CxxLedgerInfo const& ledgerInfo,
Hash const& sorobanBasePrngSeed,
uint32_t ledgerSeq, uint32_t ledgerVersion)
{
releaseAssertOrThrow(entryMapByCluster.size() == clusters.size());
for (size_t i = 0; i < clusters.size(); ++i)
for (auto const& ttlExtension : ttlExtensions)
{
auto const& cluster = clusters.at(i);
auto& entryMap = entryMapByCluster.at(i);
auto it = entryMap.find(ttlExtension.first);
releaseAssertOrThrow(it != entryMap.end());

applyCluster(entryMap, config, sorobanConfig, ledgerInfo,
sorobanBasePrngSeed, ledgerSeq, ledgerVersion, cluster);
LedgerEntry updatedTtl;
updatedTtl.data.type(TTL);
updatedTtl.data.ttl() = ttlExtension.second;

it->second = {updatedTtl, true};
}
}

Expand Down Expand Up @@ -1689,13 +1701,10 @@ LedgerManagerImpl::applySorobanStage(Application& app, AbstractLedgerTxn& ltx,
{
for (auto const& thread : stage)
{
for (auto const& cluster : thread)
for (auto const& txBundle : thread)
{
for (auto const& txBundle : cluster)
{
txBundle.tx->preParallelApply(app, ltx, txBundle.meta,
txBundle.resPayload, true);
}
txBundle.tx->preParallelApply(app, ltx, txBundle.meta,
txBundle.resPayload, true);
}
}

Expand All @@ -1708,20 +1717,16 @@ LedgerManagerImpl::applySorobanStage(Application& app, AbstractLedgerTxn& ltx,

auto const& ledgerInfo = getLedgerInfo(ltx, app, sorobanConfig);

std::vector<std::vector<ClusterEntryMap>> entryMapsByThread;
std::vector<ThreadEntryMap> entryMapsByThread;
for (auto const& thread : stage)
{
auto& clusterEntryMaps = entryMapsByThread.emplace_back();
for (auto const& cluster : thread)
{
clusterEntryMaps.emplace_back(collectEntries(ltx, cluster));
}
entryMapsByThread.emplace_back(collectEntries(ltx, thread));
}

std::vector<std::thread> threads;
for (size_t i = 0; i < stage.size(); ++i)
{
auto& entryMapByCluster = entryMapsByThread.at(i);
auto& entryMapByThread = entryMapsByThread.at(i);

auto const& thread = stage.at(i);

Expand All @@ -1730,7 +1735,7 @@ LedgerManagerImpl::applySorobanStage(Application& app, AbstractLedgerTxn& ltx,
// TODO: entryMap should be moved in.
// TODO: Use thread pool
threads.push_back(std::thread(
&LedgerManagerImpl::applyThread, this, std::ref(entryMapByCluster),
&LedgerManagerImpl::applyThread, this, std::ref(entryMapByThread),
std::ref(thread), config, sorobanConfig, std::ref(ledgerInfo),
sorobanBasePrngSeed, ledgerSeq, ledgerVersion));
}
Expand All @@ -1745,101 +1750,94 @@ LedgerManagerImpl::applySorobanStage(Application& app, AbstractLedgerTxn& ltx,

for (auto const& thread : stage)
{
for (auto const& cluster : thread)
for (auto const& txBundle : thread)
{
for (auto const& txBundle : cluster)
// First check the invariants
if (txBundle.resPayload->isSuccess())
{
// First check the invariants
if (txBundle.resPayload->isSuccess())
try
{
try
{
// Soroban transactions don't have access to the ledger
// header, so they can't modify it. Pass in the current
// header as both current and previous.
txBundle.mDelta->header.current =
ltxInner.loadHeader().current();
txBundle.mDelta->header.previous =
ltxInner.loadHeader().current();
app.getInvariantManager().checkOnOperationApply(
txBundle.tx->getRawOperations().at(0),
txBundle.resPayload->getOpResultAt(0),
*txBundle.mDelta);
}
catch (InvariantDoesNotHold& e)
{
printErrorAndAbort(
"Invariant failure while applying operations: ",
e.what());
}
// Soroban transactions don't have access to the ledger
// header, so they can't modify it. Pass in the current
// header as both current and previous.
txBundle.mDelta->header.current =
ltxInner.loadHeader().current();
txBundle.mDelta->header.previous =
ltxInner.loadHeader().current();
app.getInvariantManager().checkOnOperationApply(
txBundle.tx->getRawOperations().at(0),
txBundle.resPayload->getOpResultAt(0),
*txBundle.mDelta);
}
catch (InvariantDoesNotHold& e)
{
printErrorAndAbort(
"Invariant failure while applying operations: ",
e.what());
}
}

txBundle.tx->processPostApply(mApp, ltxInner, txBundle.meta,
txBundle.resPayload);
txBundle.tx->processPostApply(mApp, ltxInner, txBundle.meta,
txBundle.resPayload);

releaseAssertOrThrow(txBundle.mOpMetrics);
txBundle.mOpMetrics->updateSorobanMetrics(mSorobanMetrics);
releaseAssertOrThrow(txBundle.mOpMetrics);
txBundle.mOpMetrics->updateSorobanMetrics(mSorobanMetrics);

// We only increase the internal-error metric count if the
// ledger is a newer version.
if (txBundle.resPayload->getResultCode() == txINTERNAL_ERROR &&
ledgerVersion >=
config
.LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT)
{
auto& internalErrorCounter = app.getMetrics().NewCounter(
{"ledger", "transaction", "internal-error"});
internalErrorCounter.inc();
}
// We only increase the internal-error metric count if the
// ledger is a newer version.
if (txBundle.resPayload->getResultCode() == txINTERNAL_ERROR &&
ledgerVersion >=
config.LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT)
{
auto& internalErrorCounter = app.getMetrics().NewCounter(
{"ledger", "transaction", "internal-error"});
internalErrorCounter.inc();
}
}
}

// TODO: Look into adding invariants checking for conflicting writes between
// clusters
for (auto const& entryMapsByCluster : entryMapsByThread)
for (auto const& threadEntryMap : entryMapsByThread)
{
for (auto const& clusterEntryMap : entryMapsByCluster)
for (auto const& entry : threadEntryMap)
{
for (auto const& entry : clusterEntryMap)
// Only update if dirty bit is set
if (!entry.second.second)
{
// Only update if dirty bit is set
if (!entry.second.second)
{
continue;
}
continue;
}

if (entry.second.first)
if (entry.second.first)
{
auto const& updatedTTL = *entry.second.first;
auto ltxe = ltxInner.load(entry.first);
if (ltxe)
{
auto const& updatedTTL = *entry.second.first;
auto ltxe = ltxInner.load(entry.first);
if (ltxe)
if (ltxe.current().data.type() == TTL)
{
if (ltxe.current().data.type() == TTL)
// Only update TTL if we're increasing it
auto currLiveUntil =
ltxe.current().data.ttl().liveUntilLedgerSeq;
if (currLiveUntil >=
updatedTTL.data.ttl().liveUntilLedgerSeq)
{
// Only update TTL if we're increasing it
auto currLiveUntil =
ltxe.current().data.ttl().liveUntilLedgerSeq;
if (currLiveUntil >=
updatedTTL.data.ttl().liveUntilLedgerSeq)
{
continue;
}
continue;
}
ltxe.current() = updatedTTL;
}
else
{
ltxInner.create(updatedTTL);
}
ltxe.current() = updatedTTL;
}
else
{
auto ltxe = ltxInner.load(entry.first);
if (ltxe)
{
ltxInner.erase(entry.first);
}
ltxInner.create(updatedTTL);
}
}
else
{
auto ltxe = ltxInner.load(entry.first);
if (ltxe)
{
ltxInner.erase(entry.first);
}
}
}
Expand Down
13 changes: 3 additions & 10 deletions src/ledger/LedgerManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,17 @@ class LedgerManagerImpl : public LedgerManager
AbstractLedgerTxn& ltx, TransactionResultSet& txResultSet,
std::unique_ptr<LedgerCloseMetaFrame> const& ledgerCloseMeta);

ClusterEntryMap collectEntries(AbstractLedgerTxn& ltx, Cluster const& txs);
ThreadEntryMap collectEntries(AbstractLedgerTxn& ltx, Thread const& txs);

TTLs collectInitialTTLEntries(AbstractLedgerTxn& ltx, Stage const& stage);

void applyThread(std::vector<ClusterEntryMap>& entryMapByCluster,
std::vector<Cluster> const& clusters, Config const& config,
void applyThread(ThreadEntryMap& entryMapByCluster, Thread const& thread,
Config const& config,
SorobanNetworkConfig const& sorobanConfig,
CxxLedgerInfo const& ledgerInfo,
Hash const& sorobanBasePrngSeed, uint32_t ledgerSeq,
uint32_t ledgerVersion);

// TODO: Make these three methods const
void applyCluster(ClusterEntryMap& entryMap, Config const& config,
SorobanNetworkConfig const& sorobanConfig,
CxxLedgerInfo const& ledgerInfo,
Hash const& sorobanBasePrngSeed, uint32_t ledgerSeq,
uint32_t ledgerVersion, Cluster const& txs);

void applySorobanStage(Application& app, AbstractLedgerTxn& ltx,
Stage const& stage, Hash const& sorobanBasePrngSeed);

Expand Down
2 changes: 1 addition & 1 deletion src/transactions/FeeBumpTransactionFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ FeeBumpTransactionFrame::preParallelApply(Application& app,

ParallelOpReturnVal
FeeBumpTransactionFrame::parallelApply(
ClusterEntryMap const& entryMap, // Must not be shared between threads!,
ThreadEntryMap const& entryMap, // Must not be shared between threads!,
Config const& config, SorobanNetworkConfig const& sorobanConfig,
CxxLedgerInfo const& ledgerInfo, MutableTxResultPtr txResult,
Hash const& sorobanBasePrngSeed, TransactionMetaFrame& meta,
Expand Down
Loading

0 comments on commit 285e3d3

Please sign in to comment.