diff --git a/apps/indexer-proxy/proxy/Cargo.toml b/apps/indexer-proxy/proxy/Cargo.toml index b7f14a8b..f72f582b 100644 --- a/apps/indexer-proxy/proxy/Cargo.toml +++ b/apps/indexer-proxy/proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "subql-indexer-proxy" -version = "2.8.0" +version = "2.8.2-beta.1" edition = "2021" [dependencies] diff --git a/apps/indexer-proxy/proxy/src/p2p.rs b/apps/indexer-proxy/proxy/src/p2p.rs index 6dcc9ea9..c7ced9b9 100644 --- a/apps/indexer-proxy/proxy/src/p2p.rs +++ b/apps/indexer-proxy/proxy/src/p2p.rs @@ -719,7 +719,13 @@ async fn handle_group( ) .await { - Ok((res_query, res_signature, res_state, _limit)) => { + Ok(( + res_query, + res_signature, + res_state, + _limit, + _inactive, + )) => { json!({ "result": general_purpose::STANDARD.encode(&res_query), "signature": res_signature, diff --git a/apps/indexer-proxy/proxy/src/payg.rs b/apps/indexer-proxy/proxy/src/payg.rs index fb762fd9..87319c4a 100644 --- a/apps/indexer-proxy/proxy/src/payg.rs +++ b/apps/indexer-proxy/proxy/src/payg.rs @@ -480,7 +480,7 @@ pub async fn query_single_state( state: QueryState, network_type: MetricsNetwork, no_sig: bool, -) -> Result<(Vec, String, String, Option<(i64, i64)>)> { +) -> Result<(Vec, String, String, Option<(i64, i64)>, bool)> { let project: Project = get_project(project_id).await?; // compute unit count times @@ -529,7 +529,7 @@ pub async fn query_single_state( })?; debug!("Handle query channel success"); - Ok((data, signature, post_state.to_bs64_old2(), limit)) + Ok((data, signature, post_state.to_bs64_old2(), limit, true)) } // query with multiple state mode @@ -658,7 +658,7 @@ pub async fn query_multiple_state( state: MultipleQueryState, network_type: MetricsNetwork, no_sig: bool, -) -> Result<(Vec, String, String, Option<(i64, i64)>)> { +) -> Result<(Vec, String, String, Option<(i64, i64)>, bool)> { let project = get_project(project_id).await?; // compute unit count times @@ -675,7 +675,7 @@ pub async fn query_multiple_state( } })?; if inactive { - return Ok((vec![], "".to_owned(), state.to_bs64(), None)); + return Ok((vec![], "".to_owned(), state.to_bs64(), None, inactive)); } // query the data. @@ -701,7 +701,7 @@ pub async fn query_multiple_state( post_query_multiple_state(keyname, state_cache).await; debug!("Handle query channel success"); - Ok((data, signature, state.to_bs64(), limit)) + Ok((data, signature, state.to_bs64(), limit, inactive)) } pub async fn extend_channel( diff --git a/apps/indexer-proxy/proxy/src/server.rs b/apps/indexer-proxy/proxy/src/server.rs index d47fdaf0..0b25540d 100644 --- a/apps/indexer-proxy/proxy/src/server.rs +++ b/apps/indexer-proxy/proxy/src/server.rs @@ -17,6 +17,23 @@ // along with this program. If not, see . #![deny(warnings)] +use crate::account::ACCOUNT; +use crate::ai::api_stream; +use crate::auth::{create_jwt, AuthQuery, AuthQueryLimit, Payload}; +use crate::cli::COMMAND; +use crate::contracts::check_agreement_and_consumer; +use crate::metrics::{get_owner_metrics, MetricsNetwork, MetricsQuery}; +use crate::payg::{ + extend_channel, fetch_channel_cache, merket_price, open_state, pay_channel, + query_multiple_state, query_single_state, AuthPayg, +}; +use crate::project::get_project; +use crate::sentry_log::make_sentry_message; +use crate::websocket::{connect_to_project_ws, handle_websocket, validate_project, QueryType}; +use crate::{ + account::{get_indexer, indexer_healthy}, + auth::AuthWhitelistQuery, +}; use axum::extract::ws::WebSocket; use axum::{ extract::{ConnectInfo, Path, WebSocketUpgrade}, @@ -43,23 +60,6 @@ use subql_indexer_utils::{ }; use tower_http::cors::{Any, CorsLayer}; -use crate::ai::api_stream; -use crate::auth::{create_jwt, AuthQuery, AuthQueryLimit, Payload}; -use crate::cli::COMMAND; -use crate::contracts::check_agreement_and_consumer; -use crate::metrics::{get_owner_metrics, MetricsNetwork, MetricsQuery}; -use crate::payg::{ - extend_channel, fetch_channel_cache, merket_price, open_state, pay_channel, - query_multiple_state, query_single_state, AuthPayg, -}; -use crate::project::get_project; -use crate::sentry_log::make_sentry_message; -use crate::websocket::{connect_to_project_ws, handle_websocket, validate_project, QueryType}; -use crate::{ - account::{get_indexer, indexer_healthy}, - auth::AuthWhitelistQuery, -}; - #[derive(Serialize)] pub struct QueryUri { /// the url refer to specific project @@ -484,7 +484,7 @@ async fn ep_payg_handler( return payg_stream(endpoint.endpoint.clone(), v, state, false).await; } - let (data, signature, state_data, limit) = match block.to_str() { + let (data, signature, state_data, limit, inactive) = match block.to_str() { Ok("multiple") => { let state = match MultipleQueryState::from_bs64(auth) { Ok(p) => p, @@ -527,30 +527,41 @@ async fn ep_payg_handler( let (body, mut headers) = match res_fmt.to_str() { Ok("inline") => { - let return_body = if let Ok(return_data) = String::from_utf8(data.clone()) { - if return_data.is_empty() { + let return_body = match String::from_utf8(data.clone()) { + Ok(return_data) => { + if data.is_empty() { + let account = ACCOUNT.read().await; + let indexer = account.indexer; + drop(account); + let indexer_string = format!("{:?}", indexer); + let unique_title = format!( + "payg ep_query_handler, proxy get empty and lead to inline returns empty, deployment_id: {}, ep_name: {}", + deployment, ep_name + ); + let msg = format!( + "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, base64 data is {:#?}, account address is {:#?}, inactive is {}", + res_fmt, headers, body, data, data.len(), general_purpose::STANDARD.encode(&data), indexer_string, inactive + ); + make_sentry_message(&unique_title, &msg); + } + return_data + } + Err(err) => { + let account = ACCOUNT.read().await; + let indexer = account.indexer; + drop(account); + let indexer_string = format!("{:?}", indexer); let unique_title = format!( "payg ep_query_handler, inline returns empty, because endpoint returns empty, deployment_id: {}, ep_name: {}", deployment, ep_name - ); + ); let msg = format!( - "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}", - res_fmt, headers, body, data + "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, err is {:#?}, base64 data is {:#?}, account address is {:#?}, inactive is {}", + res_fmt, headers, body, data, data.len(), err, general_purpose::STANDARD.encode(&data), indexer_string,inactive ); make_sentry_message(&unique_title, &msg); + "".to_owned() } - return_data - } else { - let unique_title = format!( - "payg ep_query_handler, inline returns empty, deployment_id: {}, ep_name: {}", - deployment, ep_name - ); - let msg = format!( - "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}", - res_fmt, headers, body, data - ); - make_sentry_message(&unique_title, &msg); - "".to_owned() }; ( return_body, diff --git a/apps/indexer-proxy/utils/Cargo.toml b/apps/indexer-proxy/utils/Cargo.toml index fc6d4b78..3f83a700 100644 --- a/apps/indexer-proxy/utils/Cargo.toml +++ b/apps/indexer-proxy/utils/Cargo.toml @@ -16,10 +16,11 @@ http = "1.1.0" native-tls = "0.2.12" once_cell = "1.12" rand_chacha = "0.3" -reqwest = { version = "0.12", features = ["json", "native-tls"] } +reqwest = { version = "0.12", features = ["json", "native-tls", "stream"] } rustc-hex = "2.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_with ={ version = "3.0", features = ["json"] } subql-contracts = { git = "https://github.com/subquery/network-contracts", tag = "v1.5.0" } +tokio-stream = "0.1.16" uint = "0.10" diff --git a/apps/indexer-proxy/utils/src/request.rs b/apps/indexer-proxy/utils/src/request.rs index f9caedaa..f91a0303 100644 --- a/apps/indexer-proxy/utils/src/request.rs +++ b/apps/indexer-proxy/utils/src/request.rs @@ -30,6 +30,7 @@ use serde_json::{json, Value}; use serde_with::skip_serializing_none; use std::error::Error as StdError; use std::time::Duration; +use tokio_stream::StreamExt; pub static REQUEST_CLIENT: Lazy = Lazy::new(reqwest::Client::new); @@ -134,38 +135,33 @@ pub async fn post_request_raw(uri: &str, query: String) -> Result, Error // handle request #[inline] async fn handle_request_raw(request: RequestBuilder, query: String) -> Result, Error> { - let response_result = request + let res = request .timeout(Duration::from_secs(REQUEST_TIMEOUT)) - .header(CONTENT_TYPE, APPLICATION_JSON) - .header(CONNECTION, KEEP_ALIVE) - .body(query.to_owned()) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .header(reqwest::header::CONNECTION, "keep-alive") + .body(query) .send() - .await; - - let res = match response_result { - Ok(res) => res, - Err(_e) => { - return Err(Error::GraphQLInternal( + .await + .or_else(|_e| { + Err(Error::GraphQLInternal( 1010, "Service exception or timeout".to_owned(), )) - } - }; + })?; let status = res.status(); - let body = res - .bytes() - .await - .map(|bytes| bytes.to_vec()) - .map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?; + let mut body_stream = res.bytes_stream(); + let mut body = Vec::new(); - // 200~299 - if status.is_success() { - Ok(body) - } else { - let err = String::from_utf8(body).unwrap_or("Internal request error".to_owned()); - Err(Error::GraphQLInternal(1011, err)) + while let Some(chunk) = body_stream.next().await { + let chunk = chunk.map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?; + body.extend_from_slice(&chunk); } + + status.is_success().then_some(body.clone()).ok_or_else(|| { + let err = String::from_utf8_lossy(&body).to_string(); + Error::GraphQLInternal(1011, err) + }) } // Request to indexer/consumer proxy