diff --git a/src/http.rs b/src/http.rs index f6fb3b8..0ff06f1 100644 --- a/src/http.rs +++ b/src/http.rs @@ -2,6 +2,8 @@ use std::collections::HashMap; use std::error::Error; use std::net::SocketAddr; +use scru128::Scru128Id; + use serde::{Deserialize, Serialize}; use tokio::io::AsyncWriteExt; @@ -17,7 +19,7 @@ use hyper::StatusCode; use hyper_util::rt::TokioIo; use crate::listener::Listener; -use crate::store::Store; +use crate::store::{ReadOptions, Store}; #[derive(Serialize, Deserialize, Debug)] pub struct Request { @@ -40,6 +42,7 @@ pub struct Request { #[derive(Default, Debug, Serialize, Deserialize, Clone)] pub struct Response { + pub request_id: Scru128Id, #[serde(skip_serializing_if = "Option::is_none")] pub status: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -108,6 +111,33 @@ async fn handle( ) .await; + async fn wait_for_response(store: &Store, frame_id: Scru128Id) -> Option { + let mut recver = store + .read(ReadOptions { + follow: true, + last_id: Some(frame_id), + }) + .await; + + while let Some(event_frame) = recver.recv().await { + if event_frame.topic == "http.response" { + if let Some(meta) = event_frame.meta { + if let Ok(res) = serde_json::from_value::(meta) { + if res.request_id == frame_id { + return Some(res); + } + } + } + } + } + + None + } + + let response = wait_for_response(&store, frame.id).await; + + eprintln!("RESPONSE {:?}", response); + Ok(hyper::Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json")