Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/MTG-1254-json-downloader-service[mtg-144][mtg-526] #415

Draft
wants to merge 22 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add tests for json tasks selection && fix issues with db selection
  • Loading branch information
kstepanovdev committed Feb 25, 2025
commit 89f0cade2cf6d5968d0755b3c64a2640b91aa43b
5 changes: 3 additions & 2 deletions postgre-client/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl PgClient {
let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new(
"WITH selected_tasks AS (
SELECT t.metadata_hash FROM tasks AS t
WHERE t.task_status = 'refresh' AND NOW() > t.next_try_at AND t.mutability = 'mutable'
WHERE t.task_status = 'success' AND NOW() > t.next_try_at AND t.mutability = 'mutable'
FOR UPDATE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same deadlocking potential here

LIMIT ",
);
Expand All @@ -209,7 +209,8 @@ impl PgClient {
SET next_try_at = NOW() + INTERVAL '1 day'
FROM selected_tasks
WHERE t.metadata_hash = selected_tasks.metadata_hash
RETURNING t.metadata_hash, t.metadata_url, t.task_status, t.attempts, t.max_attempts, t.error;");
RETURNING t.metadata_url, t.task_status, t.etag, t.last_modified_at;",
);

let query = query_builder.build();
let rows = query.fetch_all(&self.pool).await?;
Expand Down
247 changes: 168 additions & 79 deletions postgre-client/tests/json_tasks_test.rs
Original file line number Diff line number Diff line change
@@ -1,110 +1,199 @@
#[cfg(test)]
#[cfg(feature = "integration_tests")]
// #[cfg(feature = "integration_tests")]
mod tests {
use chrono::{DateTime, Days, Utc};
use entities::{enums::TaskStatus, models::Task};
use entities::{
enums::{OffchainDataMutability, TaskStatus},
models::Task,
};
use setup::pg::*;
use testcontainers::clients::Cli;

#[tokio::test]
async fn test_json_task_select() {
async fn test_select_pending_tasks() {
let cli = Cli::default();
let env = TestEnvironment::new(&cli).await;
let asset_index_storage = &env.client;

// make sure we select pending task
let pending_task = Task {
ofd_metadata_url: "https://url1.com".to_string(),
ofd_locked_until: Some(Utc::now().checked_sub_days(Days::new(1)).unwrap()),
ofd_attempts: 0,
ofd_max_attempts: 10,
ofd_error: None,
ofd_status: TaskStatus::Pending,
let pending_mutable_task = Task {
metadata_url: "https://url1.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Mutable,
next_try_at: DateTime::<Utc>::from(Utc::now()),
status: TaskStatus::Pending,
};
asset_index_storage
.insert_json_download_tasks(&mut vec![pending_task.clone()])
.await
.unwrap();

let selected = asset_index_storage.get_pending_tasks(100).await.unwrap();
assert_eq!(selected.len(), 1);
assert_eq!(selected.get(0).unwrap().metadata_url, pending_task.ofd_metadata_url);

// make sure we do not select locked task
let locked_until: DateTime<Utc> = Utc::now().checked_add_days(Days::new(1)).unwrap();
let locked_task = Task {
ofd_metadata_url: "https://url2.com".to_string(),
ofd_locked_until: Some(locked_until),
ofd_attempts: 0,
ofd_max_attempts: 10,
ofd_error: None,
ofd_status: TaskStatus::Running,
let pending_immutable_task = Task {
metadata_url: "https://url2.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Immutable,
next_try_at: DateTime::<Utc>::from(Utc::now()),
status: TaskStatus::Pending,
};
asset_index_storage
.insert_json_download_tasks(&mut vec![locked_task.clone()])
.await
.unwrap();
let mut tasks = vec![pending_mutable_task.clone(), pending_immutable_task.clone()];

let selected = asset_index_storage.get_pending_tasks(100).await.unwrap();
asset_index_storage.insert_new_tasks(&mut tasks).await.unwrap();
let selected = asset_index_storage.get_pending_metadata_tasks(2).await.unwrap();
assert_eq!(selected.len(), 2);
assert_eq!(selected.get(0).unwrap().metadata_url, pending_mutable_task.metadata_url);
assert_eq!(selected.get(1).unwrap().metadata_url, pending_immutable_task.metadata_url);
}

// previous task was selected and locked until row updated so here we receive 0
assert_eq!(selected.len(), 0);
#[tokio::test]
async fn test_select_pending_tasks_that_shouldnt_be_processed() {
let cli = Cli::default();
let env = TestEnvironment::new(&cli).await;
let asset_index_storage = &env.client;
let next_try_at = Utc::now().checked_add_days(Days::new(10)).unwrap();

// make sure we do not select tasks with max_attempts reached
let locked_until: DateTime<Utc> = Utc::now().checked_sub_days(Days::new(1)).unwrap();
let attempts_reached_task = Task {
ofd_metadata_url: "https://url3.com".to_string(),
ofd_locked_until: Some(locked_until),
ofd_attempts: 10,
ofd_max_attempts: 10,
ofd_error: None,
ofd_status: TaskStatus::Pending,
let pending_mutable_task = Task {
metadata_url: "https://url1.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Mutable,
next_try_at,
status: TaskStatus::Pending,
};
asset_index_storage
.insert_json_download_tasks(&mut vec![attempts_reached_task.clone()])
.await
.unwrap();

let selected = asset_index_storage.get_pending_tasks(100).await.unwrap();
let pending_immutable_task = Task {
metadata_url: "https://url2.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Mutable,
next_try_at,
status: TaskStatus::Pending,
};
let mut tasks = vec![pending_mutable_task.clone(), pending_immutable_task.clone()];

asset_index_storage.insert_new_tasks(&mut tasks).await.unwrap();
let selected = asset_index_storage.get_pending_metadata_tasks(1).await.unwrap();
assert_eq!(selected.len(), 0);
}

// make sure we do not select failed tasks
let locked_until: DateTime<Utc> = Utc::now().checked_sub_days(Days::new(1)).unwrap();
let failed_task = Task {
ofd_metadata_url: "https://url4.com".to_string(),
ofd_locked_until: Some(locked_until),
ofd_attempts: 1,
ofd_max_attempts: 10,
ofd_error: None,
ofd_status: TaskStatus::Failed,
#[tokio::test]
async fn test_select_successeful_tasks() {
let cli = Cli::default();
let env = TestEnvironment::new(&cli).await;
let asset_index_storage = &env.client;
let next_try_at = Utc::now().checked_sub_days(Days::new(10)).unwrap();

let pending_mutable_task = Task {
metadata_url: "https://url1.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Mutable,
next_try_at,
status: TaskStatus::Success,
};

// will not be selected because it's immutable so refresh makes no sense
let pending_immmutable_task = Task {
metadata_url: "https://url2.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Immutable,
next_try_at,
status: TaskStatus::Success,
};
asset_index_storage
.insert_json_download_tasks(&mut vec![failed_task.clone()])
.await
.unwrap();

let selected = asset_index_storage.get_pending_tasks(100).await.unwrap();
let mut tasks = vec![pending_mutable_task.clone(), pending_immmutable_task.clone()];

asset_index_storage.insert_new_tasks(&mut tasks).await.unwrap();
let selected = asset_index_storage.get_refresh_metadata_tasks(1).await.unwrap();
assert_eq!(selected.len(), 1);
assert_eq!(selected.get(0).unwrap().metadata_url, pending_mutable_task.metadata_url);
}

#[tokio::test]
async fn test_select_successeful_tasks_that_shouldnt_be_processed() {
let cli = Cli::default();
let env = TestEnvironment::new(&cli).await;
let asset_index_storage = &env.client;
let next_try_at = Utc::now().checked_add_days(Days::new(10)).unwrap();

let pending_task = Task {
metadata_url: "https://url1.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Mutable,
next_try_at,
status: TaskStatus::Success,
};

asset_index_storage.insert_new_tasks(&mut vec![pending_task.clone()]).await.unwrap();
let selected = asset_index_storage.get_refresh_metadata_tasks(1).await.unwrap();
assert_eq!(selected.len(), 0);
}

// make sure we do not select success tasks
let locked_until: DateTime<Utc> = Utc::now().checked_sub_days(Days::new(1)).unwrap();
let success_task = Task {
ofd_metadata_url: "https://url5.com".to_string(),
ofd_locked_until: Some(locked_until),
ofd_attempts: 1,
ofd_max_attempts: 10,
ofd_error: None,
ofd_status: TaskStatus::Success,
#[tokio::test]
async fn test_immutable_task_is_unselectable_for_refresh() {
let cli = Cli::default();
let env = TestEnvironment::new(&cli).await;
let asset_index_storage = &env.client;

let immutable_and_pending_task = Task {
metadata_url: "https://url1.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Immutable,
next_try_at: DateTime::<Utc>::from(Utc::now()),
status: TaskStatus::Pending,
};
let immutable_and_successful_task = Task {
metadata_url: "https://url2.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Immutable,
next_try_at: DateTime::<Utc>::from(Utc::now()),
status: TaskStatus::Success,
};
asset_index_storage
.insert_json_download_tasks(&mut vec![success_task.clone()])
.await
.unwrap();
let immutable_and_failed_task = Task {
metadata_url: "https://url3.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Immutable,
next_try_at: DateTime::<Utc>::from(Utc::now()),
status: TaskStatus::Failed,
};
let mut tasks: Vec<Task> = vec![
immutable_and_pending_task.clone(),
immutable_and_successful_task.clone(),
immutable_and_failed_task.clone(),
];

asset_index_storage.insert_new_tasks(&mut tasks).await.unwrap();
let selected = asset_index_storage.get_refresh_metadata_tasks(1).await.unwrap();
assert_eq!(selected.len(), 0);
}

#[tokio::test]
async fn test_failed_task_is_unselectable() {
let cli = Cli::default();
let env = TestEnvironment::new(&cli).await;
let asset_index_storage = &env.client;

let selected = asset_index_storage.get_pending_tasks(100).await.unwrap();
let failed_and_mutable_task = Task {
metadata_url: "https://url1.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Mutable,
next_try_at: DateTime::<Utc>::from(Utc::now()),
status: TaskStatus::Failed,
};
let failed_and_immutable_task = Task {
metadata_url: "https://url2.com".to_string(),
etag: None,
last_modified_at: None,
mutability: OffchainDataMutability::Immutable,
next_try_at: DateTime::<Utc>::from(Utc::now()),
status: TaskStatus::Failed,
};
let mut tasks = vec![failed_and_mutable_task.clone(), failed_and_immutable_task.clone()];

asset_index_storage.insert_new_tasks(&mut tasks).await.unwrap();
let selected = asset_index_storage.get_pending_metadata_tasks(1).await.unwrap();
assert_eq!(selected.len(), 0);
let selected = asset_index_storage.get_refresh_metadata_tasks(1).await.unwrap();
assert_eq!(selected.len(), 0);
}
}