Skip to content

Commit

Permalink
v0.1.74
Browse files Browse the repository at this point in the history
  • Loading branch information
Jurshsmith committed Apr 18, 2024
1 parent dfc1255 commit 53838dc
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 101 deletions.
2 changes: 1 addition & 1 deletion chaindexing/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chaindexing"
version = "0.1.72"
version = "0.1.74"
edition = "2021"
description = "Index any EVM chain and query with SQL"
license = "MIT OR Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions chaindexing/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub async fn start<S: Send + Sync + Clone + Debug + 'static>(config: &Config<S>)
let config = config.clone();

node_task
.add_subtask(&tokio::spawn({
.add_subtask(tokio::spawn({
let node_task = node_task.clone();

// MultiChainStates are indexed in an order-agnostic fashion, so no need for txn client
Expand All @@ -41,7 +41,7 @@ pub async fn start<S: Send + Sync + Clone + Debug + 'static>(config: &Config<S>)

node_task
.clone()
.add_subtask(&tokio::spawn(async move {
.add_subtask(tokio::spawn(async move {
let mut interval =
interval(Duration::from_millis(config.handler_rate_ms));

Expand Down
2 changes: 1 addition & 1 deletion chaindexing/src/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn start<S: Sync + Send + Clone + 'static>(config: &Config<S>) -> Node
let config = config.clone();

node_task
.add_subtask(&tokio::spawn(async move {
.add_subtask(tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(config.ingestion_rate_ms));
let mut last_pruned_at_per_chain_id = HashMap::new();

Expand Down
101 changes: 4 additions & 97 deletions chaindexing/src/nodes/node_task.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Clone, PartialEq, Debug)]
struct NodeSubTask(*const tokio::task::JoinHandle<()>);

unsafe impl Send for NodeSubTask {}

#[derive(Clone, Debug)]
pub struct NodeTask {
subtasks: Arc<Mutex<Vec<NodeSubTask>>>,
subtasks: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
}

impl Default for NodeTask {
Expand All @@ -23,102 +18,14 @@ impl NodeTask {
subtasks: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn add_subtask(&self, task: &tokio::task::JoinHandle<()>) {
pub async fn add_subtask(&self, task: tokio::task::JoinHandle<()>) {
let mut subtasks = self.subtasks.lock().await;
subtasks.push(NodeSubTask(task));
subtasks.push(task);
}
pub async fn stop(&self) {
let subtasks = self.subtasks.lock().await;
for subtask in subtasks.iter() {
if let Some(subtask) = unsafe { (subtask).0.as_ref() } {
subtask.abort();
}
}
}
}

#[cfg(test)]
mod tests {

use super::*;

#[tokio::test]
async fn adds_a_subtask() {
let node_task = NodeTask::new();
let subtask = tokio::spawn(async {});

node_task.add_subtask(&subtask).await;

let subtask = NodeSubTask(&subtask);

let added_subtasks = node_task.subtasks.lock().await;

assert_eq!(subtask, *added_subtasks.first().unwrap());
}

#[tokio::test]
async fn adds_multiple_flattened_subtasks() {
let node_task = NodeTask::new();
let subtasks = [
tokio::spawn(async {}),
tokio::spawn(async {}),
tokio::spawn(async {}),
];

for subtask in subtasks.iter() {
node_task.add_subtask(subtask).await;
}

let subtasks: Vec<_> = subtasks.iter().map(|t| NodeSubTask(t)).collect();

let added_subtasks = node_task.subtasks.lock().await;

for (index, subtask) in subtasks.iter().enumerate() {
assert_eq!(subtask, added_subtasks.get(index).unwrap());
}
}

#[tokio::test]
async fn adds_multiple_nested_subtasks() {
let node_task = NodeTask::new();

let subtask = tokio::spawn({
let node_task = node_task.clone();

async move {
let subtask = tokio::spawn({
let node_task = node_task.clone();

async move {
let subtask = tokio::spawn({
let node_task = node_task.clone();

async move {
let subtask = tokio::spawn(async move {});
node_task.add_subtask(&subtask).await;
assert_is_added(&subtask, &node_task).await;
}
});
node_task.add_subtask(&subtask).await;
assert_is_added(&subtask, &node_task).await;
}
});

node_task.add_subtask(&subtask).await;

assert_is_added(&subtask, &node_task).await;
}
});

node_task.add_subtask(&subtask).await;
assert_is_added(&subtask, &node_task).await;

async fn assert_is_added(subtask: &tokio::task::JoinHandle<()>, node_task: &NodeTask) {
let subtask = NodeSubTask(subtask);

let added_subtasks = node_task.subtasks.lock().await;

assert!(added_subtasks.iter().any(|added_subtask| *added_subtask == subtask));
subtask.abort();
}
}
}

0 comments on commit 53838dc

Please sign in to comment.