Skip to content

Commit

Permalink
chore: optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
Nagaprasadvr committed Oct 31, 2024
1 parent ccacf6f commit 7384692
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 61 deletions.
72 changes: 21 additions & 51 deletions core/src/metadata_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,11 @@ pub struct MetadataJsonDownloadWorkerArgs {
metadata_json_download_worker_request_timeout: u64,
}

async fn is_asset_data_fetch_req(
download_metadata_info: &DownloadMetadataInfo,
pool: Pool<Postgres>,
) -> bool {
async fn skip_index(download_metadata_info: &DownloadMetadataInfo, pool: Pool<Postgres>) -> bool {
let DownloadMetadataInfo {
asset_data_id,
slot: incoming_slot,
..
uri,
} = download_metadata_info;

let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
Expand All @@ -83,7 +80,11 @@ async fn is_asset_data_fetch_req(
.one(&conn)
.await
.unwrap_or(None)
.is_some_and(|model| *incoming_slot > model.slot_updated)
.is_some_and(|model| {
!model.reindex.unwrap_or(false)
&& model.metadata_url.eq(uri)
&& model.slot_updated >= *incoming_slot
})
}

impl MetadataJsonDownloadWorkerArgs {
Expand Down Expand Up @@ -112,9 +113,10 @@ impl MetadataJsonDownloadWorkerArgs {

let pool = pool.clone();

if is_asset_data_fetch_req(&download_metadata_info, pool.clone()).await {
handlers.push(spawn_task(client.clone(), pool, download_metadata_info));
if skip_index(&download_metadata_info, pool.clone()).await {
continue;
}
handlers.push(spawn_task(client.clone(), pool, download_metadata_info));
}

while handlers.next().await.is_some() {}
Expand Down Expand Up @@ -181,26 +183,11 @@ pub enum StatusCode {
Code(reqwest::StatusCode),
}

pub struct MetadataJsonData {
value: serde_json::Value,
time_elapsed: u64,
retries: u8,
}

pub struct MetadataJsonFetchError {
error: FetchMetadataJsonError,
time_elapsed: u64,
retries: u8,
}

async fn fetch_metadata_json(
client: Client,
metadata_json_url: &str,
) -> Result<MetadataJsonData, MetadataJsonFetchError> {
let retries = AtomicU8::new(0);
let start = Instant::now();

let res = (|| async {
) -> Result<serde_json::Value, FetchMetadataJsonError> {
(|| async {
let url = ReqwestUrl::parse(metadata_json_url)?;

let response = client.get(url.clone()).send().await?;
Expand Down Expand Up @@ -229,25 +216,7 @@ async fn fetch_metadata_json(
}
})
.retry(&ExponentialBuilder::default())
.notify(|_e, _d| {
retries.fetch_add(1, Ordering::Relaxed);
})
.await;

let time_elapsed = start.elapsed().as_secs();

let retries = retries.load(Ordering::Relaxed);

res.map(|value| MetadataJsonData {
value,
time_elapsed,
retries,
})
.map_err(|error| MetadataJsonFetchError {
error,
time_elapsed,
retries,
})
.await
}

#[derive(thiserror::Error, Debug)]
Expand All @@ -266,15 +235,17 @@ pub async fn perform_metadata_json_task(
download_metadata_info: &DownloadMetadataInfo,
) -> Result<asset_data::Model, MetadataJsonTaskError> {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
match fetch_metadata_json(client, &download_metadata_info.uri).await {
let start = Instant::now();
let fetch_res = fetch_metadata_json(client, &download_metadata_info.uri).await;
let time_elapsed = start.elapsed().as_millis() as u64;
match fetch_res {
Ok(metadata) => {
let active_model = asset_data::ActiveModel {
id: Set(download_metadata_info.asset_data_id.clone()),
metadata: Set(metadata.value),
metadata: Set(metadata),
reindex: Set(Some(false)),
last_requested_status_code: Set(Some(MetadataJsonFetchResult::Success)),
fetch_duration_in_secs: Set(Some(metadata.time_elapsed)),
failed_fetch_attempts: Set(Some(metadata.retries)),
fetch_duration_in_ms: Set(Some(time_elapsed)),
..Default::default()
};

Expand All @@ -287,14 +258,13 @@ pub async fn perform_metadata_json_task(
id: Set(download_metadata_info.asset_data_id.clone()),
reindex: Set(Some(true)),
last_requested_status_code: Set(Some(MetadataJsonFetchResult::Failure)),
failed_fetch_attempts: Set(Some(e.retries)),
fetch_duration_in_secs: Set(Some(e.time_elapsed)),
fetch_duration_in_ms: Set(Some(time_elapsed)),
..Default::default()
};

active_model.update(&conn).await?;

Err(MetadataJsonTaskError::Fetch(e.error))
Err(MetadataJsonTaskError::Fetch(e))
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions digital_asset_types/src/dao/generated/asset_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ pub struct Model {
pub raw_name: Option<Vec<u8>>,
pub raw_symbol: Option<Vec<u8>>,
pub base_info_seq: Option<i64>,
pub fetch_duration_in_secs: Option<u64>,
pub fetch_duration_in_ms: Option<u64>,
pub last_requested_status_code: Option<MetadataJsonFetchResult>,
pub failed_fetch_attempts: Option<u8>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
Expand All @@ -46,9 +45,8 @@ pub enum Column {
RawName,
RawSymbol,
BaseInfoSeq,
FetchDurationInSecs,
FetchDurationInMs,
LastRequestedStatusCode,
FailedFetchAttempts,
}

#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
Expand Down Expand Up @@ -81,9 +79,8 @@ impl ColumnTrait for Column {
Self::RawName => ColumnType::Binary.def().null(),
Self::RawSymbol => ColumnType::Binary.def().null(),
Self::BaseInfoSeq => ColumnType::BigInteger.def().null(),
Self::FetchDurationInSecs => ColumnType::Unsigned.def().null(),
Self::FetchDurationInMs => ColumnType::Unsigned.def().null(),
Self::LastRequestedStatusCode => ColumnType::Unsigned.def().null(),
Self::FailedFetchAttempts => ColumnType::Unsigned.def().null(),
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions digital_asset_types/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ pub fn create_asset_data(
raw_name: Some(metadata.name.into_bytes().to_vec().clone()),
raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()),
base_info_seq: Some(0),
fetch_duration_in_secs: None,
fetch_duration_in_ms: None,
last_requested_status_code: None,
failed_fetch_attempts: None,
},
)
}
Expand Down
3 changes: 1 addition & 2 deletions digital_asset_types/tests/json_parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content {
raw_name: Some(String::from("Handalf").into_bytes().to_vec()),
raw_symbol: Some(String::from("").into_bytes().to_vec()),
base_info_seq: Some(0),
fetch_duration_in_secs: None,
fetch_duration_in_ms: None,
last_requested_status_code: None,
failed_fetch_attempts: None,
};

v1_content_from_json(&asset_data).unwrap()
Expand Down

0 comments on commit 7384692

Please sign in to comment.