Skip to content

Commit

Permalink
fix: CI
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Sep 30, 2024
1 parent dc89235 commit f4a8439
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 38 deletions.
9 changes: 5 additions & 4 deletions binding/python/examples/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ def load_peers() -> Peers:
)


def build_config(initial_peers: Peers) -> Config:
def build_config(node_id: int, initial_peers: Peers) -> Config:
raft_cfg = RaftConfig(
id=node_id,
election_tick=10,
heartbeat_tick=3,
)
Expand Down Expand Up @@ -106,12 +107,12 @@ async def main():

initial_peers = load_peers()

cfg = build_config(initial_peers)
node_id = initial_peers.get_node_id_by_addr(raft_addr)

cfg = build_config(node_id, initial_peers)
logger = Logger(setup_logger())
store = HashStore()

node_id = initial_peers.get_node_id_by_addr(raft_addr)

tasks = []
raft = Raft.bootstrap(node_id, raft_addr, store, cfg, logger)
tasks.append(raft.run())
Expand Down
5 changes: 3 additions & 2 deletions binding/python/tests/harness/raft_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
RAFTS: dict[int, Raft] = {}


def build_config(initial_peers: Peers) -> Config:
def build_config(node_id: int, initial_peers: Peers) -> Config:
raft_cfg = RaftConfig(
id=node_id,
election_tick=10,
heartbeat_tick=3,
)
Expand All @@ -23,7 +24,7 @@ def build_config(initial_peers: Peers) -> Config:

async def run_raft(node_id: int, initial_peers: Peers):
peer = initial_peers.get(node_id)
cfg = build_config(initial_peers)
cfg = build_config(node_id, initial_peers)

store = HashStore()
logger = Slogger.default()
Expand Down
8 changes: 4 additions & 4 deletions examples/memstore/src/web_server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn leader_id(data: web::Data<(HashStore, Raft)>) -> impl Responder {
#[get("/leave")]
async fn leave(data: web::Data<(HashStore, Raft)>) -> impl Responder {
let raft = data.clone();
raft.1.leave().await;
raft.1.leave().await.unwrap();
"OK".to_string()
}

Expand Down Expand Up @@ -107,21 +107,21 @@ async fn transfer_leader(
) -> impl Responder {
let raft = data.clone();
let node_id: u64 = path.into_inner();
raft.1.transfer_leader(node_id).await;
raft.1.transfer_leader(node_id).await.unwrap();
"OK".to_string()
}

#[get("/campaign")]
async fn campaign(data: web::Data<(HashStore, Raft)>) -> impl Responder {
let raft = data.clone();
raft.1.campaign().await;
raft.1.campaign().await.unwrap();
"OK".to_string()
}

#[get("/demote/{term}/{leader_id}")]
async fn demote(data: web::Data<(HashStore, Raft)>, path: web::Path<(u64, u64)>) -> impl Responder {
let raft = data.clone();
let (term, leader_id_) = path.into_inner();
raft.1.demote(term, leader_id_).await;
raft.1.demote(term, leader_id_).await.unwrap();
"OK".to_string()
}
3 changes: 2 additions & 1 deletion harness/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use raftify::{Config, RaftConfig};

pub fn build_config() -> Config {
pub fn build_config(node_id: u64) -> Config {
let raft_config = RaftConfig {
id: node_id,
election_tick: 10,
heartbeat_tick: 3,
omit_heartbeat_log: true,
Expand Down
29 changes: 19 additions & 10 deletions harness/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn run_raft(
should_be_leader: bool,
) -> Result<JoinHandle<Result<()>>> {
let peer = peers.get(node_id).unwrap();
let mut cfg = build_config();
let mut cfg = build_config(*node_id);
cfg.initial_peers = if should_be_leader {
None
} else {
Expand All @@ -60,7 +60,7 @@ fn run_raft(

let store = HashStore::new();
let logger = build_logger();
let storage_pth = get_storage_path(cfg.log_dir.as_str(), 1);
let storage_pth = get_storage_path(cfg.log_dir.as_str(), *node_id);
ensure_directory_exist(storage_pth.as_str())?;

let storage = HeedStorage::create(
Expand Down Expand Up @@ -141,9 +141,9 @@ pub async fn spawn_extra_node(
slog: build_logger(),
});

let cfg = build_config();
let cfg = build_config(node_id);
let store = HashStore::new();
let storage_pth = get_storage_path(cfg.log_dir.as_str(), 1);
let storage_pth = get_storage_path(cfg.log_dir.as_str(), node_id);
ensure_directory_exist(storage_pth.as_str())?;

let storage = HeedStorage::create(&storage_pth, &cfg, logger.clone())?;
Expand Down Expand Up @@ -172,11 +172,11 @@ pub async fn spawn_and_join_extra_node(
.unwrap();

let node_id = join_ticket.reserved_id;
let mut cfg = build_config();
let mut cfg = build_config(node_id);
cfg.initial_peers = Some(join_ticket.peers.clone().into());
let store = HashStore::new();

let storage_pth = get_storage_path(cfg.log_dir.as_str(), 1);
let storage_pth = get_storage_path(cfg.log_dir.as_str(), node_id);
ensure_directory_exist(storage_pth.as_str())?;

let storage = HeedStorage::create(&storage_pth, &cfg, logger.clone())?;
Expand All @@ -189,8 +189,12 @@ pub async fn spawn_and_join_extra_node(

let raft_handle = tokio::spawn(raft.clone().run());

raft.add_peers(join_ticket.peers.clone()).await;
raft.join_cluster(vec![join_ticket]).await;
raft.add_peers(join_ticket.peers.clone())
.await
.expect("Failed to add peers");
raft.join_cluster(vec![join_ticket])
.await
.expect("Failed to join cluster");

Ok(raft_handle)
}
Expand All @@ -202,9 +206,14 @@ pub async fn join_nodes(rafts: Vec<&Raft>, raft_addrs: Vec<&str>, peer_addr: &st
.await
.unwrap();

raft.add_peers(join_ticket.peers.clone()).await;
raft.add_peers(join_ticket.peers.clone())
.await
.expect("Failed to add peers");
tickets.push(join_ticket);
}

rafts[0].join_cluster(tickets).await;
rafts[0]
.join_cluster(tickets)
.await
.expect("Failed to join cluster");
}
10 changes: 10 additions & 0 deletions harness/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ pub fn kill_process_using_port(port: u16) {
}
}

pub fn cleanup_storage(log_dir: &str) {
let storage_pth = Path::new(log_dir);

if fs::metadata(storage_pth).is_ok() {
fs::remove_dir_all(storage_pth).expect("Failed to remove storage directory");
}

fs::create_dir_all(storage_pth).expect("Failed to create storage directory");
}

pub fn kill_previous_raft_processes() {
RAFT_PORTS.iter().for_each(|port| {
kill_process_using_port(*port);
Expand Down
11 changes: 8 additions & 3 deletions harness/tests/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ use tokio::time::sleep;
use harness::{
constant::{ONE_NODE_EXAMPLE, RAFT_ADDRS, THREE_NODE_EXAMPLE},
raft::{build_raft_cluster, spawn_and_join_extra_node, wait_until_rafts_ready, Raft},
utils::{kill_previous_raft_processes, load_peers, wait_for_until_cluster_size_increase},
utils::{
cleanup_storage, kill_previous_raft_processes, load_peers,
wait_for_until_cluster_size_increase,
},
};

#[tokio::test]
pub async fn test_static_bootstrap() {
cleanup_storage("./logs");
kill_previous_raft_processes();
let (tx_raft, rx_raft) = mpsc::channel::<(u64, Raft)>();

Expand All @@ -23,14 +27,15 @@ pub async fn test_static_bootstrap() {
wait_for_until_cluster_size_increase(raft_1.clone(), 3).await;

for (_, raft) in rafts.iter_mut() {
raft.quit().await;
raft.quit().await.expect("Failed to quit raft node");
}

sleep(Duration::from_secs(1)).await;
}

#[tokio::test]
pub async fn test_dynamic_bootstrap() {
cleanup_storage("./logs");
kill_previous_raft_processes();
let (tx_raft, rx_raft) = mpsc::channel::<(u64, Raft)>();

Expand Down Expand Up @@ -64,7 +69,7 @@ pub async fn test_dynamic_bootstrap() {
wait_for_until_cluster_size_increase(raft_1.clone(), 3).await;

for (_, raft) in rafts.iter_mut() {
raft.quit().await;
raft.quit().await.expect("Failed to quit raft node");
}
}

Expand Down
15 changes: 9 additions & 6 deletions harness/tests/data_replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ use harness::{
constant::{RAFT_ADDRS, THREE_NODE_EXAMPLE},
raft::{build_raft_cluster, spawn_and_join_extra_node, wait_until_rafts_ready, Raft},
state_machine::LogEntry,
utils::{kill_previous_raft_processes, load_peers, wait_for_until_cluster_size_increase},
utils::{
cleanup_storage, kill_previous_raft_processes, load_peers,
wait_for_until_cluster_size_increase,
},
};

#[tokio::test]
pub async fn test_data_replication() {
cleanup_storage("./logs");
kill_previous_raft_processes();

let peers = load_peers(THREE_NODE_EXAMPLE).await.unwrap();
Expand All @@ -33,7 +37,7 @@ pub async fn test_data_replication() {

raft_1.propose(entry).await.unwrap();

sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(3)).await;

// Data should be replicated to all nodes.
for (_, raft) in rafts.iter_mut() {
Expand Down Expand Up @@ -62,7 +66,7 @@ pub async fn test_data_replication() {
let store = raft_4.state_machine().await.unwrap();
let store_lk = store.0.read().unwrap();

// Data should be replicated to new joined node.
// Data should be replicated to new member.
assert_eq!(store_lk.get(&1).unwrap(), "test");
std::mem::drop(store_lk);

Expand All @@ -77,15 +81,14 @@ pub async fn test_data_replication() {

raft_1.propose(new_entry).await.unwrap();

// New entry data should be replicated to all nodes including new joined node.
// New entry data should be replicated to all nodes including new member.
for (_, raft) in rafts.iter() {
// stop
let store = raft.state_machine().await.unwrap();
let store_lk = store.0.read().unwrap();
assert_eq!(store_lk.get(&2).unwrap(), "test2");
}

for (_, raft) in rafts.iter_mut() {
raft.quit().await;
raft.quit().await.expect("Failed to quit the raft node");
}
}
18 changes: 10 additions & 8 deletions harness/tests/leader_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use harness::{
constant::{FIVE_NODE_EXAMPLE, THREE_NODE_EXAMPLE},
raft::{build_raft_cluster, wait_until_rafts_ready, Raft},
utils::{
kill_previous_raft_processes, load_peers, wait_for_until_cluster_size_decrease,
wait_for_until_cluster_size_increase,
cleanup_storage, kill_previous_raft_processes, load_peers,
wait_for_until_cluster_size_decrease, wait_for_until_cluster_size_increase,
},
};

#[tokio::test]
pub async fn test_leader_election_in_three_node_example() {
cleanup_storage("./logs");
kill_previous_raft_processes();

let (tx_raft, rx_raft) = mpsc::channel::<(u64, Raft)>();
Expand All @@ -28,7 +29,7 @@ pub async fn test_leader_election_in_three_node_example() {

sleep(Duration::from_secs(1)).await;

raft_1.leave().await;
raft_1.leave().await.expect("Failed to leave");

sleep(Duration::from_secs(2)).await;

Expand All @@ -52,15 +53,16 @@ pub async fn test_leader_election_in_three_node_example() {
leader_id
);

raft_2.quit().await;
raft_2.quit().await.expect("Failed to quit");
let raft_3 = rafts.get_mut(&3).unwrap();
raft_3.quit().await;
raft_3.quit().await.expect("Failed to quit");
}

// TODO: Fix this test.
#[tokio::test]
#[ignore]
pub async fn test_leader_election_in_five_node_example() {
cleanup_storage("./logs");
kill_previous_raft_processes();

let (tx_raft, rx_raft) = mpsc::channel::<(u64, Raft)>();
Expand All @@ -77,7 +79,7 @@ pub async fn test_leader_election_in_five_node_example() {

sleep(Duration::from_secs(1)).await;

raft_1.leave().await;
raft_1.leave().await.expect("Failed to leave");

let raft_2 = rafts.get_mut(&2).unwrap();

Expand All @@ -94,7 +96,7 @@ pub async fn test_leader_election_in_five_node_example() {
);

let leader_raft = rafts.get_mut(&leader_id).unwrap();
leader_raft.leave().await;
leader_raft.leave().await.expect("Failed to leave");

let mut remaining_nodes = vec![2, 3, 4, 5];
if let Some(pos) = remaining_nodes.iter().position(|&x| x == leader_id) {
Expand All @@ -115,6 +117,6 @@ pub async fn test_leader_election_in_five_node_example() {

for id in remaining_nodes {
let raft = rafts.get_mut(&id).unwrap();
raft.quit().await;
raft.quit().await.expect("Failed to quit the raft node");
}
}

0 comments on commit f4a8439

Please sign in to comment.