Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Jul 24, 2024
1 parent 37c71e7 commit d954d59
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 26 deletions.
11 changes: 0 additions & 11 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ fn match_route(path: &str) -> Routes {
"/" => Routes::Root,
p if p.starts_with("/cas/") => {
if let Some(hash) = p.strip_prefix("/cas/") {
eprintln!("hash: '{}'", &hash);
if let Ok(integrity) = ssri::Integrity::from_str(hash) {
return Routes::CasGet(integrity);
}
Expand All @@ -65,7 +64,6 @@ fn match_route(path: &str) -> Routes {
}

async fn get(store: Store, req: Request<hyper::body::Incoming>) -> HTTPResult {
eprintln!("uri: {:?}", req.uri());
match match_route(req.uri().path()) {
Routes::Root => {
let options = match ReadOptions::from_query(req.uri().query()) {
Expand All @@ -76,7 +74,6 @@ async fn get(store: Store, req: Request<hyper::body::Incoming>) -> HTTPResult {
let rx = store.read(options).await;
let stream = ReceiverStream::new(rx);
let stream = stream.map(|frame| {
eprintln!("streaming");
let mut encoded = serde_json::to_vec(&frame).unwrap();
encoded.push(b'\n');
Ok(hyper::body::Frame::data(bytes::Bytes::from(encoded)))
Expand Down Expand Up @@ -132,16 +129,11 @@ async fn post_kv(mut store: Store, path: &str, mut body: hyper::body::Incoming)

async fn post(mut store: Store, req: Request<hyper::body::Incoming>) -> HTTPResult {
let (parts, mut body) = req.into_parts();
eprintln!("parts: {:?}", &parts);
eprintln!("uri: {:?}", &parts.uri.path());
eprintln!("headers: {:?}", &parts.headers);
eprintln!("body end of stream: {:?}", &body.is_end_stream());

let path = &parts.uri.path();
if path.starts_with("/kv/") {
if let Some(path) = path.strip_prefix("/kv/") {
if !path.is_empty() {
eprintln!("kv path: '{}'", &path);
return post_kv(store, path, body).await;
}
}
Expand Down Expand Up @@ -176,8 +168,6 @@ async fn post(mut store: Store, req: Request<hyper::body::Incoming>) -> HTTPResu
Err(e) => return response_400(e.to_string()),
};

eprintln!("meta: {:?}", &meta);

let frame = store
.append(parts.uri.path().trim_start_matches('/'), hash, meta)
.await;
Expand All @@ -189,7 +179,6 @@ async fn post(mut store: Store, req: Request<hyper::body::Incoming>) -> HTTPResu
}

async fn handle(store: Store, req: Request<hyper::body::Incoming>) -> HTTPResult {
eprintln!("\n\nreq: {:?}", &req);
match *req.method() {
Method::GET => get(store, req).await,
Method::POST => post(store, req).await,
Expand Down
2 changes: 0 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
tokio::spawn(async move {
loop {
let store = store.clone();
eprintln!("spawning up");
let res = xs::spawn::spawn(store).await;
eprintln!("peace from spawn: {:?}", res);
tokio::time::sleep(Duration::from_millis(1000)).await;
}
});
Expand Down
11 changes: 0 additions & 11 deletions src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,17 @@ pub async fn spawn(mut store: Store) -> Result<(), Box<dyn std::error::Error + S
frame = recver.recv() => {
match frame {
Some(frame) => {
eprintln!("FRAME: {:?}", &frame.topic);
if frame.topic == "ws.send" {
let content = store.cas_read(&frame.hash.unwrap()).await.unwrap();
let mut content = content;
content.push(b'\n');
eprintln!("CONTENT: {}", std::str::from_utf8(&content).unwrap());
if let Err(e) = stdin.write_all(&content).await {
eprintln!("Failed to write to stdin: {}", e);
break;
}
}
},
None => {
eprintln!("Receiver closed");
break;
}
}
Expand All @@ -61,8 +58,6 @@ pub async fn spawn(mut store: Store) -> Result<(), Box<dyn std::error::Error + S
}
}
}

eprintln!("writer: outie");
});
}

Expand All @@ -76,22 +71,16 @@ pub async fn spawn(mut store: Store) -> Result<(), Box<dyn std::error::Error + S
Ok(_) => {
let hash = store.cas_insert(&line).await.unwrap();
let frame = store.append("ws.recv", Some(hash.clone()), None).await;
eprintln!("inserted: {} {:?} :: {:?}", line, hash, frame);
}
Err(e) => {
eprintln!("Failed to read from stdout: {}", e);
break;
}
}
}
eprintln!("reader: outie");
});

let _ = child.wait().await;
eprintln!("child: outie");

let _ = stop_tx.send(true);
eprintln!("adios spawn");

Ok(())
}
1 change: 0 additions & 1 deletion src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ mod tests_store {
let temp_dir = TempDir::new().unwrap();
let mut store = Store::spawn(temp_dir.into_path());
let meta = serde_json::json!({"key": "value"});
eprintln!("meta: {:?}", &meta);
let frame = store.append("stream", None, Some(meta)).await;
let got = store.get(&frame.id);
assert_eq!(Some(frame.clone()), got);
Expand Down
2 changes: 1 addition & 1 deletion xs.nu
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def build-query [params] {
} | and-then { $"?($in | str join "&")" }
}

export def _cat [ store: string, flags: record ] {
def _cat [ store: string, flags: record ] {
let path = "/"
let query = ( build-query $flags )
let url = $"localhost($path)($query)"
Expand Down

0 comments on commit d954d59

Please sign in to comment.