From d3cf166af65b70d5c64b9832de15c4e21595051f Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Thu, 31 Oct 2024 20:28:17 +0530 Subject: [PATCH] chore: optimize --- core/src/metadata_json.rs | 68 +++++-------------- .../src/dao/generated/asset_data.rs | 9 +-- digital_asset_types/tests/common.rs | 3 +- digital_asset_types/tests/json_parsing.rs | 3 +- 4 files changed, 22 insertions(+), 61 deletions(-) diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index fd466a00..1681b541 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -67,14 +67,11 @@ pub struct MetadataJsonDownloadWorkerArgs { metadata_json_download_worker_request_timeout: u64, } -async fn is_asset_data_fetch_req( - download_metadata_info: &DownloadMetadataInfo, - pool: Pool, -) -> bool { +async fn skip_index(download_metadata_info: &DownloadMetadataInfo, pool: Pool) -> bool { let DownloadMetadataInfo { asset_data_id, slot: incoming_slot, - .. + uri, } = download_metadata_info; let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); @@ -83,7 +80,7 @@ async fn is_asset_data_fetch_req( .one(&conn) .await .unwrap_or(None) - .is_some_and(|model| *incoming_slot > model.slot_updated) + .is_some_and(|model| model.metadata_url.eq(uri) && model.slot_updated >= *incoming_slot) } impl MetadataJsonDownloadWorkerArgs { @@ -112,9 +109,10 @@ impl MetadataJsonDownloadWorkerArgs { let pool = pool.clone(); - if is_asset_data_fetch_req(&download_metadata_info, pool.clone()).await { - handlers.push(spawn_task(client.clone(), pool, download_metadata_info)); + if skip_index(&download_metadata_info, pool.clone()).await { + continue; } + handlers.push(spawn_task(client.clone(), pool, download_metadata_info)); } while handlers.next().await.is_some() {} @@ -181,26 +179,11 @@ pub enum StatusCode { Code(reqwest::StatusCode), } -pub struct MetadataJsonData { - value: serde_json::Value, - time_elapsed: u64, - retries: u8, -} - -pub struct MetadataJsonFetchError { - error: FetchMetadataJsonError, - time_elapsed: u64, - retries: u8, -} - async fn fetch_metadata_json( client: Client, metadata_json_url: &str, -) -> Result { - let retries = AtomicU8::new(0); - let start = Instant::now(); - - let res = (|| async { +) -> Result { + (|| async { let url = ReqwestUrl::parse(metadata_json_url)?; let response = client.get(url.clone()).send().await?; @@ -229,25 +212,7 @@ async fn fetch_metadata_json( } }) .retry(&ExponentialBuilder::default()) - .notify(|_e, _d| { - retries.fetch_add(1, Ordering::Relaxed); - }) - .await; - - let time_elapsed = start.elapsed().as_secs(); - - let retries = retries.load(Ordering::Relaxed); - - res.map(|value| MetadataJsonData { - value, - time_elapsed, - retries, - }) - .map_err(|error| MetadataJsonFetchError { - error, - time_elapsed, - retries, - }) + .await } #[derive(thiserror::Error, Debug)] @@ -266,15 +231,17 @@ pub async fn perform_metadata_json_task( download_metadata_info: &DownloadMetadataInfo, ) -> Result { let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - match fetch_metadata_json(client, &download_metadata_info.uri).await { + let start = Instant::now(); + let fetch_res = fetch_metadata_json(client, &download_metadata_info.uri).await; + let time_elapsed = start.elapsed().as_millis() as u64; + match fetch_res { Ok(metadata) => { let active_model = asset_data::ActiveModel { id: Set(download_metadata_info.asset_data_id.clone()), - metadata: Set(metadata.value), + metadata: Set(metadata), reindex: Set(Some(false)), last_requested_status_code: Set(Some(MetadataJsonFetchResult::Success)), - fetch_duration_in_secs: Set(Some(metadata.time_elapsed)), - failed_fetch_attempts: Set(Some(metadata.retries)), + fetch_duration_in_ms: Set(Some(time_elapsed)), ..Default::default() }; @@ -287,14 +254,13 @@ pub async fn perform_metadata_json_task( id: Set(download_metadata_info.asset_data_id.clone()), reindex: Set(Some(true)), last_requested_status_code: Set(Some(MetadataJsonFetchResult::Failure)), - failed_fetch_attempts: Set(Some(e.retries)), - fetch_duration_in_secs: Set(Some(e.time_elapsed)), + fetch_duration_in_ms: Set(Some(time_elapsed)), ..Default::default() }; active_model.update(&conn).await?; - Err(MetadataJsonTaskError::Fetch(e.error)) + Err(MetadataJsonTaskError::Fetch(e)) } } } diff --git a/digital_asset_types/src/dao/generated/asset_data.rs b/digital_asset_types/src/dao/generated/asset_data.rs index 4ea6cd45..997f6ea7 100644 --- a/digital_asset_types/src/dao/generated/asset_data.rs +++ b/digital_asset_types/src/dao/generated/asset_data.rs @@ -28,9 +28,8 @@ pub struct Model { pub raw_name: Option>, pub raw_symbol: Option>, pub base_info_seq: Option, - pub fetch_duration_in_secs: Option, + pub fetch_duration_in_ms: Option, pub last_requested_status_code: Option, - pub failed_fetch_attempts: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -46,9 +45,8 @@ pub enum Column { RawName, RawSymbol, BaseInfoSeq, - FetchDurationInSecs, + FetchDurationInMs, LastRequestedStatusCode, - FailedFetchAttempts, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -81,9 +79,8 @@ impl ColumnTrait for Column { Self::RawName => ColumnType::Binary.def().null(), Self::RawSymbol => ColumnType::Binary.def().null(), Self::BaseInfoSeq => ColumnType::BigInteger.def().null(), - Self::FetchDurationInSecs => ColumnType::Unsigned.def().null(), + Self::FetchDurationInMs => ColumnType::Unsigned.def().null(), Self::LastRequestedStatusCode => ColumnType::Unsigned.def().null(), - Self::FailedFetchAttempts => ColumnType::Unsigned.def().null(), } } } diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index 2bc85012..417376fb 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -86,9 +86,8 @@ pub fn create_asset_data( raw_name: Some(metadata.name.into_bytes().to_vec().clone()), raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()), base_info_seq: Some(0), - fetch_duration_in_secs: None, + fetch_duration_in_ms: None, last_requested_status_code: None, - failed_fetch_attempts: None, }, ) } diff --git a/digital_asset_types/tests/json_parsing.rs b/digital_asset_types/tests/json_parsing.rs index 57c58c4d..3660c41c 100644 --- a/digital_asset_types/tests/json_parsing.rs +++ b/digital_asset_types/tests/json_parsing.rs @@ -37,9 +37,8 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content { raw_name: Some(String::from("Handalf").into_bytes().to_vec()), raw_symbol: Some(String::from("").into_bytes().to_vec()), base_info_seq: Some(0), - fetch_duration_in_secs: None, + fetch_duration_in_ms: None, last_requested_status_code: None, - failed_fetch_attempts: None, }; v1_content_from_json(&asset_data).unwrap()