Skip to content

Commit

Permalink
feat: Full node mode (#13)
Browse files Browse the repository at this point in the history
# What ❔

Allows to run a node without the consensus module. Refactors the
`executor` library to be usable as a library and moves its binary to the
`tools` crate.

## Why ❔

We need to be able to use `executor` as a library from the consensus
code; including when running an external node.
  • Loading branch information
slowli authored Oct 30, 2023
1 parent b4d85d8 commit 0217158
Show file tree
Hide file tree
Showing 46 changed files with 1,138 additions and 645 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/load_testing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ jobs:
- name: Build executor binary
working-directory: node
run: |
build_output=$(cargo build --release -p executor --bin executor --message-format=json) || exit 1
build_output=$(cargo build --release -p tools --bin executor --message-format=json) || exit 1
echo "$build_output" | jq -r 'select(.executable != null) | .executable' \
| while read binary; do
cp "$binary" artifacts/binaries/
Expand Down
6 changes: 0 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,9 @@
# will have compiled files and executables
target/

# These are backup files generated by rustfmt
/*.rs.bk

# Debug logs
logs/

# Config files
config/

# Local load test leftovers
.terraform
.ssh
Expand Down
2 changes: 1 addition & 1 deletion docker/compose.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ WORKDIR /usr/src/myapp/artifacts/$node/
RUN mkdir /usr/src/myapp/artifacts/$node/logs/

# You can ignore this command. In docker-compose.yml file we have specified the different command
CMD ["./executor", "0"]
CMD ["./executor", "0"]
2 changes: 1 addition & 1 deletion docker/localenv.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ RUN cargo run -p tools --bin localnet_config -- --nodes=$nodes

# Build binary file in release mode and create a main release binary
WORKDIR /usr/src/myapp/node
RUN cargo build -p executor --release
RUN cargo build -p tools --bin executor --release

# Create the artifacts directory
WORKDIR /usr/src/myapp/node/
Expand Down
58 changes: 11 additions & 47 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ time = "0.3.23"
tokio = { version = "1.28.1", features = ["full"] }
tracing = { version = "0.1.37", features = ["attributes"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }
vise = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "8322ddc4bb115a7d11127626730b94f93b804cbe" }
vise-exporter = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "8322ddc4bb115a7d11127626730b94f93b804cbe" }
vise = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "dd05139b76ab0843443ab3ff730174942c825dae" }
vise-exporter = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "dd05139b76ab0843443ab3ff730174942c825dae" }

# Note that "bench" profile inherits from "release" profile and
# "test" profile inherits from "dev" profile.
Expand Down
5 changes: 2 additions & 3 deletions node/actors/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use anyhow::Context as _;
use concurrency::ctx;
use inner::ConsensusInner;
use roles::validator;
use std::sync::Arc;
use storage::ReplicaStateStore;
use storage::FallbackReplicaStateStore;
use tracing::{info, instrument};
use utils::pipe::ActorPipe;

Expand Down Expand Up @@ -54,7 +53,7 @@ impl Consensus {
pipe: ActorPipe<InputMessage, OutputMessage>,
secret_key: validator::SecretKey,
validator_set: validator::ValidatorSet,
storage: Arc<dyn ReplicaStateStore>,
storage: FallbackReplicaStateStore,
) -> anyhow::Result<Self> {
Ok(Consensus {
inner: ConsensusInner {
Expand Down
59 changes: 21 additions & 38 deletions node/actors/consensus/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ use crate::{metrics, ConsensusInner};
use anyhow::Context as _;
use concurrency::{ctx, metrics::LatencyHistogramExt as _, scope, time};
use roles::validator;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use storage::{ReplicaStateStore, StorageError};
use std::collections::{BTreeMap, HashMap};
use storage::{FallbackReplicaStateStore, StorageError};
use tracing::instrument;

/// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible
Expand All @@ -27,47 +24,33 @@ pub(crate) struct StateMachine {
/// The deadline to receive an input message.
pub(crate) timeout_deadline: time::Deadline,
/// A reference to the storage module. We use it to backup the replica state.
pub(crate) storage: Arc<dyn ReplicaStateStore>,
pub(crate) storage: FallbackReplicaStateStore,
}

impl StateMachine {
/// Creates a new StateMachine struct. We try to recover a past state from the storage module,
/// otherwise we initialize the state machine with whatever head block we have.
pub(crate) async fn new(
ctx: &ctx::Ctx,
storage: Arc<dyn ReplicaStateStore>,
storage: FallbackReplicaStateStore,
) -> anyhow::Result<Self> {
Ok(match storage.replica_state(ctx).await? {
Some(backup) => {
let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
for p in backup.proposals {
block_proposal_cache
.entry(p.number)
.or_default()
.insert(p.payload.hash(), p.payload);
}
Self {
view: backup.view,
phase: backup.phase,
high_vote: backup.high_vote,
high_qc: backup.high_qc,
block_proposal_cache,
timeout_deadline: time::Deadline::Infinite,
storage,
}
}
None => {
let head = storage.head_block(ctx).await?;
Self {
view: head.justification.message.view,
phase: validator::Phase::Prepare,
high_vote: head.justification.message,
high_qc: head.justification,
block_proposal_cache: BTreeMap::new(),
timeout_deadline: time::Deadline::Infinite,
storage,
}
}
let backup = storage.replica_state(ctx).await?;
let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
for proposal in backup.proposals {
block_proposal_cache
.entry(proposal.number)
.or_default()
.insert(proposal.payload.hash(), proposal.payload);
}

Ok(Self {
view: backup.view,
phase: backup.phase,
high_vote: backup.high_vote,
high_qc: backup.high_qc,
block_proposal_cache,
timeout_deadline: time::Deadline::Infinite,
storage,
})
}

Expand Down
4 changes: 2 additions & 2 deletions node/actors/consensus/src/testonly/make.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
use concurrency::ctx;
use roles::validator;
use std::sync::Arc;
use storage::RocksdbStorage;
use storage::{FallbackReplicaStateStore, RocksdbStorage};
use tempfile::tempdir;
use utils::pipe::{self, DispatcherPipe};

Expand All @@ -33,7 +33,7 @@ pub async fn make_consensus(
consensus_pipe,
key.clone(),
validator_set.clone(),
Arc::new(storage),
FallbackReplicaStateStore::from_store(Arc::new(storage)),
);
let consensus = consensus
.await
Expand Down
2 changes: 1 addition & 1 deletion node/actors/consensus/src/testonly/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Node {
network_pipe: DispatcherPipe<network::io::InputMessage, network::io::OutputMessage>,
metrics: channel::UnboundedSender<Metrics>,
) -> anyhow::Result<()> {
let key = self.net.state().cfg().consensus.key.public();
let key = self.net.consensus_config().key.public();
let rng = &mut ctx.rng();
let mut net_recv = network_pipe.recv;
let net_send = network_pipe.send;
Expand Down
27 changes: 14 additions & 13 deletions node/actors/consensus/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use storage::RocksdbStorage;
use storage::{FallbackReplicaStateStore, RocksdbStorage};
use tracing::Instrument as _;
use utils::pipe;

Expand Down Expand Up @@ -44,8 +44,8 @@ impl Test {
// Get only the honest replicas.
let honest: HashSet<_> = nodes
.iter()
.filter(|n| n.behavior == Behavior::Honest)
.map(|n| n.net.state().cfg().consensus.key.public())
.filter(|node| node.behavior == Behavior::Honest)
.map(|node| node.net.consensus_config().key.public())
.collect();
assert!(!honest.is_empty());

Expand Down Expand Up @@ -83,17 +83,18 @@ async fn run_nodes(
) -> anyhow::Result<()> {
let keys: Vec<_> = nodes
.iter()
.map(|r| r.net.state().cfg().consensus.key.clone())
.map(|node| node.net.consensus_config().key.clone())
.collect();
let (genesis_block, _) = testonly::make_genesis(&keys, validator::Payload(vec![]));
let network_ready = signal::Once::new();
let mut network_pipes = HashMap::new();
let mut network_send = HashMap::new();
let mut network_recv = HashMap::new();
scope::run!(ctx, |ctx, s| async {
for (i, n) in nodes.iter().enumerate() {
let validator_key = n.net.state().cfg().consensus.key.clone();
let validator_set = n.net.state().cfg().consensus.validators.clone();
for (i, node) in nodes.iter().enumerate() {
let consensus_config = node.net.consensus_config();
let validator_key = consensus_config.key.clone();
let validator_set = node.net.to_config().validators;

let (consensus_actor_pipe, consensus_pipe) = pipe::new();
let (network_actor_pipe, network_pipe) = pipe::new();
Expand All @@ -105,12 +106,12 @@ async fn run_nodes(
RocksdbStorage::new(ctx, &genesis_block, &dir.path().join("storage"))
.await
.context("RocksdbStorage")?;
let storage = Arc::new(storage);
let storage = FallbackReplicaStateStore::from_store(Arc::new(storage));

let consensus = Consensus::new(
ctx,
consensus_actor_pipe,
n.net.state().cfg().consensus.key.clone(),
node.net.consensus_config().key.clone(),
validator_set,
storage,
)
Expand All @@ -120,7 +121,7 @@ async fn run_nodes(
scope::run!(ctx, |ctx, s| async {
network_ready.recv(ctx).await?;
s.spawn_blocking(|| consensus.run(ctx).context("consensus.run()"));
n.run_executor(ctx, consensus_pipe, network_pipe, metrics.clone())
node.run_executor(ctx, consensus_pipe, network_pipe, metrics.clone())
.await
.context("executor.run()")
})
Expand All @@ -131,10 +132,10 @@ async fn run_nodes(
}
match network {
Network::Real => {
for (i, n) in nodes.iter().enumerate() {
let state = n.net.state().clone();
for (i, node) in nodes.iter().enumerate() {
let state = node.net.state().clone();
let pipe = network_pipes
.remove(&state.cfg().consensus.key.public())
.remove(&node.net.consensus_config().key.public())
.unwrap();
s.spawn(
async {
Expand Down
Loading

0 comments on commit 0217158

Please sign in to comment.