Skip to content

Commit

Permalink
feat: spawn managed nushell expressions to act as generators (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead authored Aug 23, 2024
1 parent 7d6dc63 commit b7bead9
Show file tree
Hide file tree
Showing 15 changed files with 410 additions and 236 deletions.
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
Status: WIP [██████████...... 50%]
```

> "You don't so much run it, as poke _at_ it."
## Overview / Sketch

An event stream store for personal, local-first use. Kinda like the
Expand All @@ -17,6 +15,8 @@ An event stream store for personal, local-first use. Kinda like the

![screenshot](./docs/screenshot.png)

> "You don't so much run it, as poke _at_ it."
Built with:

- [fjall](https://github.com/fjall-rs/fjall): for indexing and metadata
Expand All @@ -28,16 +28,24 @@ Built with:

## Built-in Topics

- `stream.cross.start` - emitted when the server mounts the stream to expose an
- `xs.start`: emitted when the server mounts the stream to expose an
API

- `stream.cross.pulse` - a heartbeat event you can configure to be emitted every
- `xs.pulse`: (synthetic) a heartbeat event you can configure to be emitted every
N seconds when in follow mode

- `stream.cross.threshold` - a synthetic event that marks the boundary between
- `xs.threshold`: (synthetic) marks the boundary between
replaying events and events that are newly arriving in real-time via a live
subscription

- `xs.generator.spawn`
- meta:: topic: string, duplex: bool
- `xs.generator.terminate`

- `xs.handler.spawn`
- meta:: run-from: start, tail, id?
- `xs.handler.terminate`

## Local socket HTTP API

WIP, thoughts:
Expand Down
7 changes: 7 additions & 0 deletions check.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -euo pipefail

cargo fmt --check
cargo clippy
cargo t
31 changes: 0 additions & 31 deletions p.nu

This file was deleted.

23 changes: 15 additions & 8 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;

use crate::nu::{value_to_json, Engine};
use nu_protocol::Span;

use crate::nu;
use crate::store::{ReadOptions, Store};

type BoxError = Box<dyn std::error::Error + Send + Sync>;
Expand Down Expand Up @@ -50,6 +52,7 @@ fn match_route(method: &Method, path: &str) -> Routes {
}
Routes::NotFound
}

(&Method::GET, p) if p.starts_with("/kv/") => {
if let Some(key) = p.strip_prefix("/kv/") {
if !key.is_empty() {
Expand All @@ -58,6 +61,7 @@ fn match_route(method: &Method, path: &str) -> Routes {
}
Routes::NotFound
}

(&Method::PUT, p) if p.starts_with("/kv/") => {
if let Some(key) = p.strip_prefix("/kv/") {
if !key.is_empty() {
Expand All @@ -66,6 +70,7 @@ fn match_route(method: &Method, path: &str) -> Routes {
}
Routes::NotFound
}

(&Method::GET, p) if p.starts_with("/cas/") => {
if let Some(hash) = p.strip_prefix("/cas/") {
if let Ok(integrity) = ssri::Integrity::from_str(hash) {
Expand Down Expand Up @@ -94,7 +99,7 @@ fn match_route(method: &Method, path: &str) -> Routes {

async fn handle(
mut store: Store,
engine: Engine,
engine: nu::Engine,
req: Request<hyper::body::Incoming>,
) -> HTTPResult {
let method = req.method();
Expand Down Expand Up @@ -211,17 +216,19 @@ async fn handle_stream_append(

async fn handle_pipe_post(
store: &mut Store,
engine: Engine,
engine: nu::Engine,
id: Scru128Id,
body: hyper::body::Incoming,
) -> HTTPResult {
let bytes = body.collect().await?.to_bytes();
let closure_snippet = std::str::from_utf8(&bytes)?;
let closure = engine.parse_closure(closure_snippet)?;
let expression = std::str::from_utf8(&bytes)?.to_string();

if let Some(frame) = store.get(&id) {
let value = closure.run(frame).await?;
let json = value_to_json(&value);
let input = nu::frame_to_pipeline(&frame);
let value = engine
.eval(input, expression)
.and_then(|pipeline_data| pipeline_data.into_value(Span::unknown()))?;
let json = nu::value_to_json(&value);
let bytes = serde_json::to_vec(&json)?;

Ok(Response::builder()
Expand All @@ -234,7 +241,7 @@ async fn handle_pipe_post(

pub async fn serve(
store: Store,
engine: Engine,
engine: nu::Engine,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
tracing::info!("starting api: {:?}", &store.path);
let listener = UnixListener::bind(store.path.join("sock")).unwrap();
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ pub mod listener;
pub mod nu;
pub mod spawn;
pub mod store;
pub mod tasks;
pub mod thread_pool;
pub mod trace;
44 changes: 10 additions & 34 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::time::Duration;
use clap::Parser;

use xs::nu;
use xs::store::{FollowOption, ReadOptions, Store};
use xs::store::Store;
use xs::thread_pool::ThreadPool;

#[derive(Parser, Debug)]
#[clap(version)]
Expand All @@ -16,11 +17,6 @@ struct Args {
#[clap(long, value_parser, value_name = "LISTEN_ADDR")]
http: Option<String>,

/// A Nushell closure which will be called for every item added to the stream (temporary, you'll be
/// able add arbitrary closures at runtime in the future)
#[clap(long, value_parser, value_name = "CLOSURE")]
closure: Option<String>,

/// Enable discord websocket (temporary, you'll be able spawn arbitrary CLI commands at runtime
/// in the future)
#[clap(long)]
Expand All @@ -33,41 +29,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

let args = Args::parse();
let store = Store::spawn(args.path);
let engine = nu::Engine::new(store.clone(), 10)?;
let pool = ThreadPool::new(10);
let engine = nu::Engine::new(store.clone())?;

if let Some(addr) = args.http {
{
let store = store.clone();
let engine = engine.clone();
tokio::spawn(async move {
let _ = xs::http::serve(store, &addr).await;
let _ = xs::tasks::serve(store, engine).await;
});
}

if let Some(closure_snippet) = args.closure {
let engine = engine.clone();
if let Some(addr) = args.http {
let store = store.clone();
let closure = engine.parse_closure(&closure_snippet)?;

tokio::spawn(async move {
let mut rx = store
.read(ReadOptions {
follow: FollowOption::On,
tail: false,
last_id: None,
})
.await;

while let Some(frame) = rx.recv().await {
let result = closure.run(frame).await;
match result {
Ok(value) => {
// Handle the result, e.g., log it
tracing::info!(output = ?value);
}
Err(err) => {
tracing::error!("Error running closure: {:?}", err);
}
}
}
let _ = xs::http::serve(store, &addr).await;
});
}

Expand All @@ -84,7 +60,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

// TODO: graceful shutdown
xs::api::serve(store, engine.clone()).await?;
engine.wait_for_completion().await;
pool.wait_for_completion();

Ok(())
}
Loading

0 comments on commit b7bead9

Please sign in to comment.