Skip to content

Commit

Permalink
refactor(nu): streamline Engine interface and improve module organiza…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
cablehead committed Aug 14, 2024
1 parent d494751 commit f44258d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.await;

while let Some(frame) = rx.recv().await {
let result = engine.run_closure(&closure, frame).await;
let result = engine.run_closure(closure.clone(), frame).await;
match result {
Ok(value) => {
// Handle the result, e.g., log it
Expand Down
74 changes: 44 additions & 30 deletions src/nu/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,67 @@ use nu_protocol::debugger::WithoutDebug;
use nu_protocol::engine::{Closure, EngineState, Stack, StateWorkingSet};
use nu_protocol::{PipelineData, ShellError, Span, Value};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Condvar, Mutex};

#[derive(Clone)]
pub struct ThreadPool {
_workers: Vec<tokio::task::JoinHandle<()>>,
sender: crossbeam_channel::Sender<Box<dyn FnOnce() + Send + 'static>>,
tx: crossbeam_channel::Sender<Box<dyn FnOnce() + Send + 'static>>,
active_count: Arc<AtomicUsize>,
completion_pair: Arc<(Mutex<()>, Condvar)>,
}

impl ThreadPool {
pub fn new(size: usize) -> Self {
let (sender, receiver) =
crossbeam_channel::unbounded::<Box<dyn FnOnce() + Send + 'static>>();
let receiver = Arc::new(receiver);

let _workers = (0..size)
.map(|_| {
let receiver = receiver.clone();
tokio::spawn(async move {
while let Ok(job) = receiver.recv() {
job();
let (tx, rx) = crossbeam_channel::bounded::<Box<dyn FnOnce() + Send + 'static>>(0);
let active_count = Arc::new(AtomicUsize::new(0));
let completion_pair = Arc::new((Mutex::new(()), Condvar::new()));

for _ in 0..size {
let rx = rx.clone();
let active_count = active_count.clone();
let completion_pair = completion_pair.clone();

std::thread::spawn(move || {
while let Ok(job) = rx.recv() {
active_count.fetch_add(1, Ordering::SeqCst);
job();
if active_count.fetch_sub(1, Ordering::SeqCst) == 1 {
let (lock, cvar) = &*completion_pair;
let guard = lock.lock().unwrap();
cvar.notify_all();
drop(guard);
}
})
})
.collect();
}
});
}

ThreadPool { _workers, sender }
ThreadPool {
tx,
active_count,
completion_pair,
}
}

pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.sender.send(Box::new(f)).unwrap();
self.tx.send(Box::new(f)).unwrap();
}

pub fn wait_for_completion(&self) {
let (lock, cvar) = &*self.completion_pair;
let mut guard = lock.lock().unwrap();
while self.active_count.load(Ordering::SeqCst) > 0 {
guard = cvar.wait(guard).unwrap();
}
}
}

#[derive(Clone)]
pub struct Engine {
engine_state: EngineState,
pool: Arc<ThreadPool>,
active_count: Arc<AtomicUsize>,
pool: ThreadPool,
}

impl Engine {
Expand All @@ -65,8 +86,7 @@ impl Engine {

Ok(Self {
engine_state,
pool: Arc::new(ThreadPool::new(thread_count)),
active_count: Arc::new(AtomicUsize::new(0)),
pool: ThreadPool::new(thread_count),
})
}

Expand All @@ -82,12 +102,9 @@ impl Engine {
result.into_value(Span::unknown())?.into_closure()
}

pub async fn run_closure(&self, closure: &Closure, frame: Frame) -> Result<Value, Error> {
self.active_count.fetch_add(1, Ordering::SeqCst);
pub async fn run_closure(&self, closure: Closure, frame: Frame) -> Result<Value, Error> {
let engine_state = self.engine_state.clone();
let closure = closure.clone();
let pool = self.pool.clone();
let active_count = self.active_count.clone();

let (tx, rx) = tokio::sync::oneshot::channel();

Expand All @@ -97,17 +114,14 @@ impl Engine {
Ok(pipeline_data) => pipeline_data.into_value(Span::unknown()),
Err(err) => Err(err),
};
active_count.fetch_sub(1, Ordering::SeqCst);
let _ = tx.send(result);
});

rx.await.unwrap().map_err(Error::from)
}

pub async fn wait_for_completion(&self) {
while self.active_count.load(Ordering::SeqCst) > 0 {
tokio::task::yield_now().await;
}
self.pool.wait_for_completion()
}
}

Expand Down
14 changes: 6 additions & 8 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use scru128::Scru128Id;

use serde::{Deserialize, Deserializer, Serialize};

use tokio::sync::mpsc;

use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle};

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
Expand All @@ -26,7 +24,7 @@ pub struct Store {
_keyspace: Keyspace,
pub partition: PartitionHandle,
pub kv: PartitionHandle,
commands_tx: mpsc::Sender<Command>,
commands_tx: tokio::sync::mpsc::Sender<Command>,
}

#[derive(Default, PartialEq, Clone, Debug)]
Expand Down Expand Up @@ -89,7 +87,7 @@ impl ReadOptions {

#[derive(Debug)]
enum Command {
Read(mpsc::Sender<Frame>, ReadOptions),
Read(tokio::sync::mpsc::Sender<Frame>, ReadOptions),
Append(Frame),
}

Expand All @@ -106,7 +104,7 @@ impl Store {
.open_partition("kv", PartitionCreateOptions::default())
.unwrap();

let (tx, mut rx) = mpsc::channel::<Command>(32);
let (tx, mut rx) = tokio::sync::mpsc::channel::<Command>(32);

let store = Store {
path,
Expand All @@ -119,7 +117,7 @@ impl Store {
{
let store = store.clone();
std::thread::spawn(move || {
let mut subscribers: Vec<mpsc::Sender<Frame>> = Vec::new();
let mut subscribers: Vec<tokio::sync::mpsc::Sender<Frame>> = Vec::new();
'outer: while let Some(command) = rx.blocking_recv() {
match command {
Command::Read(tx, options) => {
Expand Down Expand Up @@ -189,8 +187,8 @@ impl Store {
store
}

pub async fn read(&self, options: ReadOptions) -> mpsc::Receiver<Frame> {
let (tx, rx) = mpsc::channel::<Frame>(100);
pub async fn read(&self, options: ReadOptions) -> tokio::sync::mpsc::Receiver<Frame> {
let (tx, rx) = tokio::sync::mpsc::channel::<Frame>(100);
self.commands_tx
.send(Command::Read(tx.clone(), options.clone()))
.await
Expand Down

0 comments on commit f44258d

Please sign in to comment.