Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: miss tx #108

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions hildr-node/src/main/java/io/optimism/derive/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;
Expand All @@ -24,12 +27,16 @@
*/
public class State {

private static final Logger LOGGER = LoggerFactory.getLogger(State.class);

private final TreeMap<String, L1Info> l1Info;

private final TreeMap<BigInteger, String> l1Hashes;

private final TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs;

private final Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher;

private BlockInfo safeHead;

private Epoch safeEpoch;
Expand All @@ -44,6 +51,7 @@ public class State {
* @param l1Info the L1 info
* @param l1Hashes the L1 hashes
* @param l2Refs the L2 block info references
* @param l2Fetcher the L2 block info fetcher
* @param safeHead the safe head
* @param safeEpoch the safe epoch
* @param currentEpochNum the current epoch num
Expand All @@ -53,13 +61,15 @@ public State(
TreeMap<String, L1Info> l1Info,
TreeMap<BigInteger, String> l1Hashes,
TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs,
Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher,
BlockInfo safeHead,
Epoch safeEpoch,
BigInteger currentEpochNum,
Config config) {
this.l1Info = l1Info;
this.l1Hashes = l1Hashes;
this.l2Refs = l2Refs;
this.l2Fetcher = l2Fetcher;
this.safeHead = safeHead;
this.safeEpoch = safeEpoch;
this.currentEpochNum = currentEpochNum;
Expand All @@ -70,18 +80,27 @@ public State(
* Create state.
*
* @param l2Refs the L2 block info references
* @param l2Fetcher the L2 block info fetcher
* @param finalizedHead the finalized head
* @param finalizedEpoch the finalized epoch
* @param config the config
* @return the state
*/
public static State create(
TreeMap<BigInteger, Tuple2<BlockInfo, Epoch>> l2Refs,
Function<BigInteger, Tuple2<BlockInfo, Epoch>> l2Fetcher,
BlockInfo finalizedHead,
Epoch finalizedEpoch,
Config config) {
return new State(
new TreeMap<>(), new TreeMap<>(), l2Refs, finalizedHead, finalizedEpoch, BigInteger.ZERO, config);
new TreeMap<>(),
new TreeMap<>(),
l2Refs,
l2Fetcher,
finalizedHead,
finalizedEpoch,
BigInteger.ZERO,
config);
}

/**
Expand Down Expand Up @@ -119,7 +138,14 @@ public Tuple2<BlockInfo, Epoch> l2Info(BigInteger timestamp) {
.subtract(config.chainConfig().l2Genesis().timestamp())
.divide(config.chainConfig().blockTime())
.add(config.chainConfig().l2Genesis().number());
return this.l2Refs.get(blockNum);
var cache = l2Refs.get(blockNum);
if (cache != null) {
return cache;
}

var res = l2Fetcher.apply(blockNum);
this.l2Refs.put(res.component1().number(), res);
return res;
}

/**
Expand Down Expand Up @@ -173,6 +199,7 @@ public void updateL1Info(L1Info l1Info) {
* @param safeEpoch the safe epoch
*/
public void purge(BlockInfo safeHead, Epoch safeEpoch) {
LOGGER.info("purge state: safeHead.number={}, safeEpoch. ={}", safeHead.number(), safeEpoch.hash());
this.safeHead = safeHead;
this.safeEpoch = safeEpoch;
this.l1Info.clear();
Expand Down
59 changes: 39 additions & 20 deletions hildr-node/src/main/java/io/optimism/derive/stages/Batches.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public Batches(TreeMap<BigInteger, Batch> batches, I channelIterator, AtomicRefe
public void purge() {
this.channelIterator.purge();
this.batches.clear();
if (!this.nextSingularBatches.isEmpty()) {
LOGGER.warn("batches has element but will be discarded");
}
this.nextSingularBatches.clear();
}

Expand All @@ -86,8 +89,13 @@ public Batch next() {
}
Channel channel = this.channelIterator.next();
if (channel != null) {
decodeBatches(this.config.chainConfig(), channel)
.forEach(batch -> this.batches.put(batch.batch().getTimestamp(), batch));
decodeBatches(this.config.chainConfig(), channel).forEach(batch -> {
Batch prev = this.batches.put(batch.batch().getTimestamp(), batch);
if (prev != null) {
LOGGER.warn(
"batch was replaced: timestamp={}", batch.batch().getTimestamp());
}
});
}

Batch derivedBatch = null;
Expand Down Expand Up @@ -121,24 +129,28 @@ public Batch next() {
this.nextSingularBatches.addAll(singularBatches);
return this.nextSingularBatches.poll();
}
}

State state = this.state.get();

BigInteger currentL1Block = state.getCurrentEpochNum();
BlockInfo safeHead = state.getSafeHead();
Epoch epoch = state.getSafeEpoch();
Epoch nextEpoch = state.epoch(epoch.number().add(BigInteger.ONE));
BigInteger seqWindowSize = this.config.chainConfig().seqWindowSize();

if (nextEpoch != null) {
if (currentL1Block.compareTo(epoch.number().add(seqWindowSize)) > 0) {
BigInteger nextTimestamp =
safeHead.timestamp().add(this.config.chainConfig().blockTime());
Epoch epochRes = nextTimestamp.compareTo(nextEpoch.timestamp()) < 0 ? epoch : nextEpoch;
var singularBatch = new SingularBatch(
safeHead.parentHash(), epochRes.number(), epochRes.hash(), nextTimestamp, Lists.newArrayList());
batch = new Batch(singularBatch, currentL1Block);
} else {
State state = this.state.get();

BigInteger currentL1Block = state.getCurrentEpochNum();
BlockInfo safeHead = state.getSafeHead();
Epoch epoch = state.getSafeEpoch();
Epoch nextEpoch = state.epoch(epoch.number().add(BigInteger.ONE));
BigInteger seqWindowSize = this.config.chainConfig().seqWindowSize();

if (nextEpoch != null) {
if (currentL1Block.compareTo(epoch.number().add(seqWindowSize)) > 0) {
BigInteger nextTimestamp =
safeHead.timestamp().add(this.config.chainConfig().blockTime());
Epoch epochRes = nextTimestamp.compareTo(nextEpoch.timestamp()) < 0 ? epoch : nextEpoch;
var singularBatch = new SingularBatch(
safeHead.parentHash(),
epochRes.number(),
epochRes.hash(),
nextTimestamp,
Lists.newArrayList());
batch = new Batch(singularBatch, currentL1Block);
}
}
}
return batch;
Expand All @@ -147,6 +159,7 @@ public Batch next() {
/**
* Decode batches list.
*
* @param chainConfig the chain config
* @param channel the channel
* @return the list
*/
Expand Down Expand Up @@ -485,6 +498,12 @@ private List<SingularBatch> toSingularBatches(final SpanBatch batch, final State
List<SingularBatch> singularBatches = new ArrayList<>();
for (SpanBatchElement element : batch.getBatches()) {
if (element.timestamp().compareTo(state.getSafeHead().timestamp()) <= 0) {
if (!element.transactions().isEmpty()) {
LOGGER.warn(
"past span batch element: timestamp{{}} <= safeHead.timestamp={{}}",
element.timestamp(),
state.getSafeHead().timestamp());
}
continue;
}
SingularBatch singularBatch = new SingularBatch();
Expand Down
27 changes: 24 additions & 3 deletions hildr-node/src/main/java/io/optimism/driver/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.tuples.generated.Tuple2;
import org.web3j.utils.Numeric;

/**
Expand Down Expand Up @@ -166,7 +168,7 @@ public EngineDriver<E> getEngineDriver() {
*/
public static Driver<EngineApi> from(Config config, CountDownLatch latch)
throws InterruptedException, ExecutionException {
Web3j provider = Web3jProvider.createClient(config.l2RpcUrl());
final Web3j provider = Web3jProvider.createClient(config.l2RpcUrl());

EthBlock finalizedBlock;
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Expand Down Expand Up @@ -207,8 +209,27 @@ public static Driver<EngineApi> from(Config config, CountDownLatch latch)
config);

var l2Refs = io.optimism.derive.State.initL2Refs(finalizedHead.number(), config.chainConfig(), provider);
AtomicReference<io.optimism.derive.State> state =
new AtomicReference<>(io.optimism.derive.State.create(l2Refs, finalizedHead, finalizedEpoch, config));
var l2Fetcher = (Function<BigInteger, Tuple2<BlockInfo, Epoch>>) blockNum -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<EthBlock> blockTask = scope.fork(TracerTaskWrapper.wrap(
() -> provider.ethGetBlockByNumber(DefaultBlockParameter.valueOf(blockNum), true)
.send()));
scope.join();
scope.throwIfFailed();

var block = blockTask.get();
if (block == null) {
return null;
}
final HeadInfo l2BlockInfo = HeadInfo.from(block.getBlock());
return new Tuple2<>(l2BlockInfo.l2BlockInfo(), l2BlockInfo.l1Epoch());
} catch (Exception e) {
LOGGER.error("failed to fetch L2 block", e);
return null;
}
};
AtomicReference<io.optimism.derive.State> state = new AtomicReference<>(
io.optimism.derive.State.create(l2Refs, l2Fetcher, finalizedHead, finalizedEpoch, config));

EngineDriver<EngineApi> engineDriver = new EngineDriver<>(finalizedHead, finalizedEpoch, provider, config);

Expand Down
Loading