Skip to content

Commit

Permalink
make an append command available to the nu snippet
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Aug 14, 2024
1 parent f3125bb commit 7c644e7
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 48 deletions.
113 changes: 112 additions & 1 deletion src/nu/engine.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_std::io::WriteExt;
use futures::io::AsyncReadExt;

use nu_cli::{add_cli_context, gather_parent_env_vars};
Expand All @@ -11,6 +12,7 @@ use nu_protocol::engine::{Command, EngineState, Stack, StateWorkingSet};
use nu_protocol::{Category, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, Value};

use crate::error::Error;
use crate::nu::util;
use crate::store::Store;

#[derive(Clone)]
Expand Down Expand Up @@ -87,10 +89,119 @@ impl Command for CasCommand {
}
}

#[derive(Clone)]
struct AppendCommand {
store: Store,
}

impl AppendCommand {
fn new(store: Store) -> Self {
Self { store }
}
}

impl Command for AppendCommand {
fn name(&self) -> &str {
".append"
}

fn signature(&self) -> Signature {
Signature::build(".append")
// TODO output type should be Record
.input_output_types(vec![(Type::Any, Type::Any)])
.required("topic", SyntaxShape::String, "this clip's topic")
.named(
"meta",
SyntaxShape::Record(vec![]),
"arbitrary metadata",
None,
)
.category(Category::Experimental)
}

fn usage(&self) -> &str {
"writes its input to the CAS and then appends a clip with a hash of this content to the
given topic on the stream"
}

fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;

let mut store = self.store.clone();

let topic: String = call.req(engine_state, stack, 0)?;
let meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
let meta = meta.map(|meta| util::value_to_json(&meta));

// Create a Tokio runtime for blocking async operations
let rt = tokio::runtime::Runtime::new()
.map_err(|e| ShellError::IOError { msg: e.to_string() })?;

let frame = rt.block_on(async {
let mut writer = store
.cas_writer()
.await
.map_err(|e| ShellError::IOError { msg: e.to_string() })?;

let hash = match input {
PipelineData::Value(value, _) => match value {
Value::Nothing { .. } => Ok(None),
Value::String { val, .. } => {
// Write the string data
writer
.write_all(val.as_bytes())
.await
.map_err(|e| ShellError::IOError { msg: e.to_string() })?;

// Commit the writer and return the hash
let hash = writer
.commit()
.await
.map_err(|e| ShellError::IOError { msg: e.to_string() })?;

Ok(Some(hash))
}
_ => Err(ShellError::PipelineMismatch {
exp_input_type: "string or nothing".into(),
dst_span: span,
src_span: value.span(),
}),
},
PipelineData::ListStream(_stream, ..) => {
// Handle the ListStream case (for now, we'll just panic)
panic!("ListStream handling is not yet implemented");
}
PipelineData::ByteStream(_stream, ..) => {
// Handle the ByteStream case (for now, we'll just panic)
panic!("ByteStream handling is not yet implemented");
}
PipelineData::Empty => Ok(None),
}?;

eprintln!("meta: {:?}", meta);

let frame = store.append(topic.as_str(), hash, meta).await;
Ok::<_, ShellError>(frame)
})?;

Ok(PipelineData::Value(
util::frame_to_value(&frame, span),
None,
))
}
}

fn add_custom_commands(store: Store, mut engine_state: EngineState) -> EngineState {
let delta = {
let mut working_set = StateWorkingSet::new(&engine_state);
working_set.add_decl(Box::new(CasCommand::new(store)));
working_set.add_decl(Box::new(CasCommand::new(store.clone())));
working_set.add_decl(Box::new(AppendCommand::new(store)));
working_set.render()
};

Expand Down
1 change: 1 addition & 0 deletions src/nu/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
mod engine;
mod run;
mod thread_pool;
mod util;

use crate::error::Error;
use crate::store::{FollowOption, ReadOptions, Store};
Expand Down
50 changes: 3 additions & 47 deletions src/nu/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,12 @@ use std::sync::Arc;
use nu_engine::get_eval_block_with_early_return;
use nu_protocol::engine::Closure;
use nu_protocol::engine::{EngineState, Stack};
use nu_protocol::{PipelineData, Record, ShellError, Span, Value};
use nu_protocol::{PipelineData, ShellError, Span, Value};

use crate::nu::thread_pool;
use crate::nu::util;
use crate::store::Frame;

fn frame_to_value(frame: &Frame, span: Span) -> Value {
let mut record = Record::new();

record.push("id", Value::string(frame.id.to_string(), span));
record.push("topic", Value::string(frame.topic.clone(), span));

if let Some(hash) = &frame.hash {
record.push("hash", Value::string(hash.to_string(), span));
}

if let Some(meta) = &frame.meta {
record.push("meta", json_to_value(meta, span));
}

Value::record(record, span)
}

fn json_to_value(json: &serde_json::Value, span: Span) -> Value {
match json {
serde_json::Value::Null => Value::nothing(span),
serde_json::Value::Bool(b) => Value::bool(*b, span),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::int(i, span)
} else if let Some(f) = n.as_f64() {
Value::float(f, span)
} else {
Value::string(n.to_string(), span)
}
}
serde_json::Value::String(s) => Value::string(s, span),
serde_json::Value::Array(arr) => {
let values: Vec<Value> = arr.iter().map(|v| json_to_value(v, span)).collect();
Value::list(values, span)
}
serde_json::Value::Object(obj) => {
let mut record = Record::new();
for (k, v) in obj {
record.push(k, json_to_value(v, span));
}
Value::record(record, span)
}
}
}

pub fn line(
frame: Frame,
engine_state: &EngineState,
Expand All @@ -64,7 +20,7 @@ pub fn line(
pool.execute(move || {
tracing::debug!(id = frame.id.to_string(), topic = frame.topic, "");

let input = PipelineData::Value(frame_to_value(&frame, Span::unknown()), None);
let input = PipelineData::Value(util::frame_to_value(&frame, Span::unknown()), None);
match eval_closure(&engine_state, &closure, input) {
Ok(pipeline_data) => match pipeline_data.into_value(Span::unknown()) {
Ok(value) => match value {
Expand Down
75 changes: 75 additions & 0 deletions src/nu/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use nu_protocol::{Record, Span, Value};

use crate::store::Frame;

pub fn frame_to_value(frame: &Frame, span: Span) -> Value {
let mut record = Record::new();

record.push("id", Value::string(frame.id.to_string(), span));
record.push("topic", Value::string(frame.topic.clone(), span));

if let Some(hash) = &frame.hash {
record.push("hash", Value::string(hash.to_string(), span));
}

if let Some(meta) = &frame.meta {
record.push("meta", json_to_value(meta, span));
}

Value::record(record, span)
}

pub fn json_to_value(json: &serde_json::Value, span: Span) -> Value {
match json {
serde_json::Value::Null => Value::nothing(span),
serde_json::Value::Bool(b) => Value::bool(*b, span),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::int(i, span)
} else if let Some(f) = n.as_f64() {
Value::float(f, span)
} else {
Value::string(n.to_string(), span)
}
}
serde_json::Value::String(s) => Value::string(s, span),
serde_json::Value::Array(arr) => {
let values: Vec<Value> = arr.iter().map(|v| json_to_value(v, span)).collect();
Value::list(values, span)
}
serde_json::Value::Object(obj) => {
let mut record = Record::new();
for (k, v) in obj {
record.push(k, json_to_value(v, span));
}
Value::record(record, span)
}
}
}

pub fn value_to_json(value: &Value) -> serde_json::Value {
match value {
Value::Nothing { .. } => serde_json::Value::Null,
Value::Bool { val, .. } => serde_json::Value::Bool(*val),
Value::Int { val, .. } => serde_json::Value::Number((*val).into()),
Value::Float { val, .. } => {
match serde_json::Number::from_f64(*val) {
Some(n) => serde_json::Value::Number(n),
None => serde_json::Value::Null, // or handle this case as appropriate
}
}
Value::String { val, .. } => serde_json::Value::String(val.clone()),
Value::List { vals, .. } => {
serde_json::Value::Array(vals.iter().map(value_to_json).collect())
}
Value::Record { val, .. } => {
let mut map = serde_json::Map::new();
for (k, v) in val.iter() {
map.insert(k.clone(), value_to_json(v));
}
serde_json::Value::Object(map)
}
// Handle other variants as needed
_ => serde_json::Value::Null, // Default case for unhandled variants
}
}

0 comments on commit 7c644e7

Please sign in to comment.