From 9e98acb0e97d0e8b0996a80743682e645212b331 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 3 Jun 2024 09:42:49 -0400 Subject: [PATCH] wip: cont on http interface --- src/http.rs | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/src/http.rs b/src/http.rs index f6fb3b8..0ff06f1 100644 --- a/src/http.rs +++ b/src/http.rs @@ -2,6 +2,8 @@ use std::collections::HashMap; use std::error::Error; use std::net::SocketAddr; +use scru128::Scru128Id; + use serde::{Deserialize, Serialize}; use tokio::io::AsyncWriteExt; @@ -17,7 +19,7 @@ use hyper::StatusCode; use hyper_util::rt::TokioIo; use crate::listener::Listener; -use crate::store::Store; +use crate::store::{ReadOptions, Store}; #[derive(Serialize, Deserialize, Debug)] pub struct Request { @@ -40,6 +42,7 @@ pub struct Request { #[derive(Default, Debug, Serialize, Deserialize, Clone)] pub struct Response { + pub request_id: Scru128Id, #[serde(skip_serializing_if = "Option::is_none")] pub status: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -108,6 +111,33 @@ async fn handle( ) .await; + async fn wait_for_response(store: &Store, frame_id: Scru128Id) -> Option { + let mut recver = store + .read(ReadOptions { + follow: true, + last_id: Some(frame_id), + }) + .await; + + while let Some(event_frame) = recver.recv().await { + if event_frame.topic == "http.response" { + if let Some(meta) = event_frame.meta { + if let Ok(res) = serde_json::from_value::(meta) { + if res.request_id == frame_id { + return Some(res); + } + } + } + } + } + + None + } + + let response = wait_for_response(&store, frame.id).await; + + eprintln!("RESPONSE {:?}", response); + Ok(hyper::Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json")