diff --git a/apps/indexer-proxy/proxy/Cargo.toml b/apps/indexer-proxy/proxy/Cargo.toml index 3a899602..f0340e8e 100644 --- a/apps/indexer-proxy/proxy/Cargo.toml +++ b/apps/indexer-proxy/proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "subql-indexer-proxy" -version = "2.7.1" +version = "2.7.2-beta.1" edition = "2021" [dependencies] diff --git a/apps/indexer-proxy/proxy/src/p2p.rs b/apps/indexer-proxy/proxy/src/p2p.rs index 6dcc9ea9..c7ced9b9 100644 --- a/apps/indexer-proxy/proxy/src/p2p.rs +++ b/apps/indexer-proxy/proxy/src/p2p.rs @@ -719,7 +719,13 @@ async fn handle_group( ) .await { - Ok((res_query, res_signature, res_state, _limit)) => { + Ok(( + res_query, + res_signature, + res_state, + _limit, + _inactive, + )) => { json!({ "result": general_purpose::STANDARD.encode(&res_query), "signature": res_signature, diff --git a/apps/indexer-proxy/proxy/src/payg.rs b/apps/indexer-proxy/proxy/src/payg.rs index fb762fd9..87319c4a 100644 --- a/apps/indexer-proxy/proxy/src/payg.rs +++ b/apps/indexer-proxy/proxy/src/payg.rs @@ -480,7 +480,7 @@ pub async fn query_single_state( state: QueryState, network_type: MetricsNetwork, no_sig: bool, -) -> Result<(Vec, String, String, Option<(i64, i64)>)> { +) -> Result<(Vec, String, String, Option<(i64, i64)>, bool)> { let project: Project = get_project(project_id).await?; // compute unit count times @@ -529,7 +529,7 @@ pub async fn query_single_state( })?; debug!("Handle query channel success"); - Ok((data, signature, post_state.to_bs64_old2(), limit)) + Ok((data, signature, post_state.to_bs64_old2(), limit, true)) } // query with multiple state mode @@ -658,7 +658,7 @@ pub async fn query_multiple_state( state: MultipleQueryState, network_type: MetricsNetwork, no_sig: bool, -) -> Result<(Vec, String, String, Option<(i64, i64)>)> { +) -> Result<(Vec, String, String, Option<(i64, i64)>, bool)> { let project = get_project(project_id).await?; // compute unit count times @@ -675,7 +675,7 @@ pub async fn query_multiple_state( } })?; if inactive { - return Ok((vec![], "".to_owned(), state.to_bs64(), None)); + return Ok((vec![], "".to_owned(), state.to_bs64(), None, inactive)); } // query the data. @@ -701,7 +701,7 @@ pub async fn query_multiple_state( post_query_multiple_state(keyname, state_cache).await; debug!("Handle query channel success"); - Ok((data, signature, state.to_bs64(), limit)) + Ok((data, signature, state.to_bs64(), limit, inactive)) } pub async fn extend_channel( diff --git a/apps/indexer-proxy/proxy/src/server.rs b/apps/indexer-proxy/proxy/src/server.rs index d47fdaf0..dbd4c137 100644 --- a/apps/indexer-proxy/proxy/src/server.rs +++ b/apps/indexer-proxy/proxy/src/server.rs @@ -43,6 +43,7 @@ use subql_indexer_utils::{ }; use tower_http::cors::{Any, CorsLayer}; +use crate::account::ACCOUNT; use crate::ai::api_stream; use crate::auth::{create_jwt, AuthQuery, AuthQueryLimit, Payload}; use crate::cli::COMMAND; @@ -484,7 +485,7 @@ async fn ep_payg_handler( return payg_stream(endpoint.endpoint.clone(), v, state, false).await; } - let (data, signature, state_data, limit) = match block.to_str() { + let (data, signature, state_data, limit, inactive) = match block.to_str() { Ok("multiple") => { let state = match MultipleQueryState::from_bs64(auth) { Ok(p) => p, @@ -527,30 +528,41 @@ async fn ep_payg_handler( let (body, mut headers) = match res_fmt.to_str() { Ok("inline") => { - let return_body = if let Ok(return_data) = String::from_utf8(data.clone()) { - if return_data.is_empty() { + let return_body = match String::from_utf8(data.clone()) { + Ok(return_data) => { + if data.is_empty() { + let account = ACCOUNT.read().await; + let indexer = account.indexer; + drop(account); + let indexer_string = format!("{:?}", indexer); + let unique_title = format!( + "payg ep_query_handler, proxy get empty and lead to inline returns empty, deployment_id: {}, ep_name: {}", + deployment, ep_name + ); + let msg = format!( + "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, base64 data is {:#?}, account address is {:#?}, inactive is {}", + res_fmt, headers, body, data, data.len(), general_purpose::STANDARD.encode(&data), indexer_string, inactive + ); + make_sentry_message(&unique_title, &msg); + } + return_data + } + Err(err) => { + let account = ACCOUNT.read().await; + let indexer = account.indexer; + drop(account); + let indexer_string = format!("{:?}", indexer); let unique_title = format!( "payg ep_query_handler, inline returns empty, because endpoint returns empty, deployment_id: {}, ep_name: {}", deployment, ep_name - ); + ); let msg = format!( - "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}", - res_fmt, headers, body, data + "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, err is {:#?}, base64 data is {:#?}, account address is {:#?}, inactive is {}", + res_fmt, headers, body, data, data.len(), err, general_purpose::STANDARD.encode(&data), indexer_string,inactive ); make_sentry_message(&unique_title, &msg); + "".to_owned() } - return_data - } else { - let unique_title = format!( - "payg ep_query_handler, inline returns empty, deployment_id: {}, ep_name: {}", - deployment, ep_name - ); - let msg = format!( - "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}", - res_fmt, headers, body, data - ); - make_sentry_message(&unique_title, &msg); - "".to_owned() }; ( return_body, diff --git a/apps/indexer-proxy/utils/Cargo.toml b/apps/indexer-proxy/utils/Cargo.toml index 12cddfa3..9a5ea291 100644 --- a/apps/indexer-proxy/utils/Cargo.toml +++ b/apps/indexer-proxy/utils/Cargo.toml @@ -15,10 +15,11 @@ hex = "0.4" http = "1.1.0" once_cell = "1.12" rand_chacha = "0.3" -reqwest = { version = "0.12", features = ["json"] } +reqwest = { version = "0.12", features = ["json", "stream"] } rustc-hex = "2.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_with ={ version = "3.0", features = ["json"] } subql-contracts = { git = "https://github.com/subquery/network-contracts", tag = "v1.5.0" } +tokio-stream = "0.1.16" uint = "0.10" diff --git a/apps/indexer-proxy/utils/src/request.rs b/apps/indexer-proxy/utils/src/request.rs index 32e663d9..63c2ba01 100644 --- a/apps/indexer-proxy/utils/src/request.rs +++ b/apps/indexer-proxy/utils/src/request.rs @@ -15,6 +15,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::{ + constants::{APPLICATION_JSON, AUTHORIZATION, KEEP_ALIVE}, + error::Error, +}; use once_cell::sync::Lazy; use reqwest::{ header::{CONNECTION, CONTENT_TYPE}, @@ -25,11 +29,7 @@ use serde_json::{json, Value}; use serde_with::skip_serializing_none; use std::error::Error as StdError; use std::time::Duration; - -use crate::{ - constants::{APPLICATION_JSON, AUTHORIZATION, KEEP_ALIVE}, - error::Error, -}; +use tokio_stream::StreamExt; pub static REQUEST_CLIENT: Lazy = Lazy::new(reqwest::Client::new); @@ -134,38 +134,33 @@ pub async fn post_request_raw(uri: &str, query: String) -> Result, Error // handle request #[inline] async fn handle_request_raw(request: RequestBuilder, query: String) -> Result, Error> { - let response_result = request + let res = request .timeout(Duration::from_secs(REQUEST_TIMEOUT)) - .header(CONTENT_TYPE, APPLICATION_JSON) - .header(CONNECTION, KEEP_ALIVE) - .body(query.to_owned()) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .header(reqwest::header::CONNECTION, "keep-alive") + .body(query) .send() - .await; - - let res = match response_result { - Ok(res) => res, - Err(_e) => { - return Err(Error::GraphQLInternal( + .await + .or_else(|_e| { + Err(Error::GraphQLInternal( 1010, "Service exception or timeout".to_owned(), )) - } - }; + })?; let status = res.status(); - let body = res - .bytes() - .await - .map(|bytes| bytes.to_vec()) - .map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?; + let mut body_stream = res.bytes_stream(); + let mut body = Vec::new(); - // 200~299 - if status.is_success() { - Ok(body) - } else { - let err = String::from_utf8(body).unwrap_or("Internal request error".to_owned()); - Err(Error::GraphQLInternal(1011, err)) + while let Some(chunk) = body_stream.next().await { + let chunk = chunk.map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?; + body.extend_from_slice(&chunk); } + + status.is_success().then_some(body.clone()).ok_or_else(|| { + let err = String::from_utf8_lossy(&body).to_string(); + Error::GraphQLInternal(1011, err) + }) } // Request to indexer/consumer proxy