Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save and restore bundle pool #157

Merged
merged 1 commit into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ private void performSharedStartTasksOnce(final ServiceManager serviceManager) {

bundlePoolService =
new LineaLimitedBundlePool(
transactionSelectorConfiguration().maxBundlePoolSizeBytes(), besuEvents);
besuConfiguration.getDataPath(),
transactionSelectorConfiguration().maxBundlePoolSizeBytes(),
besuEvents);
bundlePoolService.loadFromDisk();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@
*/
package net.consensys.linea.rpc.services;

import static java.util.stream.Collectors.joining;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.regex.Pattern;

import lombok.Getter;
import lombok.experimental.Accessors;
import org.apache.tuweni.bytes.Bytes;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.plugin.services.BesuService;

public interface BundlePoolService extends BesuService {
Expand All @@ -30,6 +36,9 @@ public interface BundlePoolService extends BesuService {
@Accessors(fluent = true)
@Getter
class TransactionBundle {
private static final String FIELD_SEPARATOR = "|";
private static final String ITEM_SEPARATOR = ",";
private static final String LINE_TERMINATOR = "$";
private final Hash bundleIdentifier;
private final List<PendingBundleTx> pendingTransactions;
private final Long blockNumber;
Expand All @@ -52,6 +61,74 @@ public TransactionBundle(
this.revertingTxHashes = revertingTxHashes;
}

public String serializeForDisk() {
// version=1 | blockNumber | bundleIdentifier | minTimestamp | maxTimestamp |
// revertingTxHashes, | txs, $
return new StringBuilder("1")
.append(FIELD_SEPARATOR)
.append(blockNumber)
.append(FIELD_SEPARATOR)
.append(bundleIdentifier.toHexString())
.append(FIELD_SEPARATOR)
.append(minTimestamp.map(l -> l + FIELD_SEPARATOR).orElse(FIELD_SEPARATOR))
.append(maxTimestamp.map(l -> l + FIELD_SEPARATOR).orElse(FIELD_SEPARATOR))
.append(
revertingTxHashes
.map(l -> l.stream().map(Hash::toHexString).collect(joining(ITEM_SEPARATOR)))
.orElse(FIELD_SEPARATOR))
.append(
pendingTransactions.stream()
.map(PendingBundleTx::serializeForDisk)
.collect(joining(ITEM_SEPARATOR)))
.append(LINE_TERMINATOR)
.toString();
}

public static TransactionBundle restoreFromSerialized(final String str) {
if (!str.endsWith(LINE_TERMINATOR)) {
throw new IllegalArgumentException(
"Unterminated bundle serialization, missing terminal " + LINE_TERMINATOR);
}

final var parts =
str.substring(0, str.length() - LINE_TERMINATOR.length())
.split(Pattern.quote(FIELD_SEPARATOR));
if (!parts[0].equals("1")) {
throw new IllegalArgumentException("Unsupported bundle serialization version " + parts[0]);
}
if (parts.length != 7) {
throw new IllegalArgumentException(
"Invalid bundle serialization, expected 7 fields but got " + parts.length);
}

final var blockNumber = Long.parseLong(parts[1]);
final var bundleIdentifier = Hash.fromHexString(parts[2]);
final Optional<Long> minTimestamp =
parts[3].isEmpty() ? Optional.empty() : Optional.of(Long.parseLong(parts[3]));
final Optional<Long> maxTimestamp =
parts[4].isEmpty() ? Optional.empty() : Optional.of(Long.parseLong(parts[4]));
final Optional<List<Hash>> revertingTxHashes =
parts[5].isEmpty()
? Optional.empty()
: Optional.of(
Arrays.stream(parts[5].split(Pattern.quote(ITEM_SEPARATOR)))
.map(Hash::fromHexString)
.toList());
final var transactions =
Arrays.stream(parts[6].split(Pattern.quote(ITEM_SEPARATOR)))
.map(Bytes::fromBase64String)
.map(Transaction::readFrom)
.toList();

return new TransactionBundle(
bundleIdentifier,
transactions,
blockNumber,
minTimestamp,
maxTimestamp,
revertingTxHashes);
}

/** A pending transaction contained in a bundle. */
public class PendingBundleTx
extends org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction.Local {
Expand All @@ -72,6 +149,12 @@ public boolean isBundleStart() {
public String toTraceLog() {
return "Bundle tx: " + super.toTraceLog();
}

String serializeForDisk() {
final var rlpOutput = new BytesValueRLPOutput();
getTransaction().writeTo(rlpOutput);
return rlpOutput.encoded().toBase64String();
}
}
}

Expand Down Expand Up @@ -146,4 +229,8 @@ public String toTraceLog() {
* @return the number of bundles in the pool
*/
long size();

void saveToDisk();

void loadFromDisk();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@

package net.consensys.linea.rpc.services;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -44,8 +51,10 @@
@AutoService(BesuService.class)
@Slf4j
public class LineaLimitedBundlePool implements BundlePoolService, BesuEvents.BlockAddedListener {
public static final String BUNDLE_SAVE_FILENAME = "bundles.dump";
private final Cache<Hash, TransactionBundle> cache;
private final Map<Long, List<TransactionBundle>> blockIndex;
private final Path saveFilePath;
private final AtomicLong maxBlockHeight = new AtomicLong(0L);

/**
Expand All @@ -55,7 +64,9 @@ public class LineaLimitedBundlePool implements BundlePoolService, BesuEvents.Blo
* @param maxSizeInBytes The maximum size in bytes of the pool objects.
*/
@VisibleForTesting
public LineaLimitedBundlePool(long maxSizeInBytes, BesuEvents eventService) {
public LineaLimitedBundlePool(
final Path dataDir, final long maxSizeInBytes, final BesuEvents eventService) {
this.saveFilePath = dataDir.resolve(BUNDLE_SAVE_FILENAME);
this.cache =
Caffeine.newBuilder()
.maximumWeight(maxSizeInBytes) // Maximum size in bytes
Expand Down Expand Up @@ -185,6 +196,63 @@ public long size() {
return cache.estimatedSize();
}

@Override
public void saveToDisk() {
log.info("Saving bundles to {}", saveFilePath);
try (final BufferedWriter bw =
Files.newBufferedWriter(saveFilePath, StandardCharsets.US_ASCII)) {
final var savedCount =
blockIndex.values().stream()
.flatMap(List::stream)
.sorted(Comparator.comparing(TransactionBundle::blockNumber))
.map(TransactionBundle::serializeForDisk)
.peek(
str -> {
try {
bw.write(str);
bw.newLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.count();
log.info("Saved {} bundles to {}", savedCount, saveFilePath);
} catch (final Throwable ioe) {
log.error("Error while saving bundles to {}", saveFilePath, ioe);
}
}

@Override
public void loadFromDisk() {
if (saveFilePath.toFile().exists()) {
log.info("Loading bundles from {}", saveFilePath);
final var loadedCount = new AtomicLong(0L);
try (final BufferedReader br =
Files.newBufferedReader(saveFilePath, StandardCharsets.US_ASCII)) {
br.lines()
.parallel()
.forEach(
line -> {
try {
final var bundle = TransactionBundle.restoreFromSerialized(line);
this.putOrReplace(bundle.bundleIdentifier(), bundle);
loadedCount.incrementAndGet();
} catch (final Exception e) {
log.warn("Error while loading bundle from serialized format {}", line, e);
}
});
log.info("Loaded {} bundles from {}", loadedCount.get(), saveFilePath);
} catch (final Throwable t) {
log.error(
"Error while reading bundles from {}, partially loaded {} bundles",
saveFilePath,
loadedCount.get(),
t);
}
saveFilePath.toFile().delete();
}
}

/**
* Adds a TransactionBundle to the block index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ public void doStart() {
lineaSendBundleMethod.init(bundlePoolService);
lineaCancelBundleMethod.init(bundlePoolService);
}

@Override
public void stop() {

super.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -37,9 +38,10 @@
import org.hyperledger.besu.plugin.services.rpc.PluginRpcRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class LineaSendBundleTest {

@TempDir Path dataDir;
private LineaSendBundle lineaSendBundle;
private BesuEvents mockEvents;
private LineaLimitedBundlePool bundlePool;
Expand All @@ -59,7 +61,7 @@ class LineaSendBundleTest {
@BeforeEach
void setup() {
mockEvents = mock(BesuEvents.class);
bundlePool = spy(new LineaLimitedBundlePool(4096L, mockEvents));
bundlePool = spy(new LineaLimitedBundlePool(dataDir, 4096L, mockEvents));
lineaSendBundle = new LineaSendBundle().init(bundlePool);
}

Expand Down
Loading
Loading