From ea85604b4a8c4a233d95f873db6d5caf64123539 Mon Sep 17 00:00:00 2001 From: pxp9 <48651252+pxp9@users.noreply.github.com> Date: Wed, 17 Apr 2024 12:39:50 +0200 Subject: [PATCH] rework encoding Datime for postgresql and sqlite --- fang/src/asynk/backend_sqlx.rs | 20 +++++------ fang/src/asynk/backend_sqlx/mysql.rs | 35 ------------------- fang/src/asynk/queries_postgres/fail_task.sql | 2 +- .../queries_postgres/fetch_task_type.sql | 2 +- .../asynk/queries_postgres/insert_task.sql | 2 +- .../queries_postgres/insert_task_uniq.sql | 2 +- .../remove_all_scheduled_tasks.sql | 2 +- .../src/asynk/queries_postgres/retry_task.sql | 2 +- .../queries_postgres/update_task_state.sql | 2 +- 9 files changed, 15 insertions(+), 54 deletions(-) diff --git a/fang/src/asynk/backend_sqlx.rs b/fang/src/asynk/backend_sqlx.rs index 9c9164fd..e4e1d51a 100644 --- a/fang/src/asynk/backend_sqlx.rs +++ b/fang/src/asynk/backend_sqlx.rs @@ -158,7 +158,7 @@ where for<'r> std::string::String: Encode<'r, DB> + Type, for<'r> &'r str: Encode<'r, DB> + Type, for<'r> i32: Encode<'r, DB> + Type, - for<'r> i64: Encode<'r, DB> + Type, + for<'r> DateTime: Encode<'r, DB> + Type, for<'r> &'r Uuid: Encode<'r, DB> + Type, for<'r> &'r Pool: Executor<'r, Database = DB>, for<'r> >::Arguments: IntoArguments<'r, DB>, @@ -173,7 +173,7 @@ where // and the caller is the library itself let task_type = params.task_type.unwrap(); - let now = Utc::now().timestamp(); + let now = Utc::now(); let task: Task = sqlx::query_as(query) .bind(task_type) @@ -218,13 +218,9 @@ where params: QueryParams<'_>, ) -> Result { let now = Utc::now(); - let now_i64 = now.timestamp(); let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64); - // shadowing in order to not change a lot depending on types - let scheduled_at = scheduled_at.timestamp(); - let now = now_i64; let task = params.task.unwrap(); let retries = task.retries + 1; @@ -254,7 +250,7 @@ where let metadata = params.metadata.unwrap(); let metadata_str = metadata.to_string(); - let scheduled_at = params.scheduled_at.unwrap().timestamp(); + let scheduled_at = params.scheduled_at.unwrap(); let task_type = params.task_type.unwrap(); @@ -278,7 +274,7 @@ where ) -> Result { let uuid = Uuid::new_v4(); - let scheduled_at_i64 = params.scheduled_at.unwrap().timestamp(); + let scheduled_at = params.scheduled_at.unwrap(); let metadata_str = params.metadata.unwrap().to_string(); let task_type = params.task_type.unwrap(); @@ -287,7 +283,7 @@ where .bind(&uuid) .bind(metadata_str) .bind(task_type) - .bind(scheduled_at_i64) + .bind(scheduled_at) .fetch_one(pool) .await?; @@ -299,7 +295,7 @@ where pool: &Pool, params: QueryParams<'_>, ) -> Result { - let updated_at = Utc::now().timestamp(); + let updated_at = Utc::now(); let state_str: &str = params.state.unwrap().into(); @@ -320,7 +316,7 @@ where pool: &Pool, params: QueryParams<'_>, ) -> Result { - let updated_at = Utc::now().timestamp(); + let updated_at = Utc::now(); let uuid = params.task.unwrap().id; @@ -351,7 +347,7 @@ where query: &str, pool: &Pool, ) -> Result { - let now = Utc::now().timestamp(); + let now = Utc::now(); // This converts QueryResult to AnyQueryResult and then to u64 // do not delete into() method and do not delete Into trait bound diff --git a/fang/src/asynk/backend_sqlx/mysql.rs b/fang/src/asynk/backend_sqlx/mysql.rs index 6c92daf3..32a2d218 100644 --- a/fang/src/asynk/backend_sqlx/mysql.rs +++ b/fang/src/asynk/backend_sqlx/mysql.rs @@ -350,41 +350,6 @@ impl FangQueryable for BackendSqlXMySQL { Ok(task) } - - async fn fetch_task_type( - query: &str, - pool: &Pool, - params: QueryParams<'_>, - ) -> Result { - // Unwraps by QueryParams are safe because the responsibility is of the caller - // and the caller is the library itself - let task_type = params.task_type.unwrap(); - - let now = Utc::now(); - - let task: Task = sqlx::query_as(query) - .bind(task_type) - .bind(now) - .fetch_one(pool) - .await?; - - Ok(task) - } - - async fn remove_all_scheduled_tasks( - query: &str, - pool: &Pool, - ) -> Result { - let now = Utc::now(); - - // This converts QueryResult to AnyQueryResult and then to u64 - // do not delete into() method and do not delete Into trait bound - - Ok( - Into::::into(sqlx::query(query).bind(now).execute(pool).await?) - .rows_affected(), - ) - } } impl BackendSqlXMySQL { diff --git a/fang/src/asynk/queries_postgres/fail_task.sql b/fang/src/asynk/queries_postgres/fail_task.sql index 2cb1f337..fd9c703f 100644 --- a/fang/src/asynk/queries_postgres/fail_task.sql +++ b/fang/src/asynk/queries_postgres/fail_task.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "error_message" = $2 , "updated_at" = to_timestamp($3) WHERE id = $4 RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at \ No newline at end of file +UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at \ No newline at end of file diff --git a/fang/src/asynk/queries_postgres/fetch_task_type.sql b/fang/src/asynk/queries_postgres/fetch_task_type.sql index 3d39ca48..e9aa6b55 100644 --- a/fang/src/asynk/queries_postgres/fetch_task_type.sql +++ b/fang/src/asynk/queries_postgres/fetch_task_type.sql @@ -1 +1 @@ -SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND to_timestamp($2) >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED +SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED diff --git a/fang/src/asynk/queries_postgres/insert_task.sql b/fang/src/asynk/queries_postgres/insert_task.sql index e5becd6d..30b0c64a 100644 --- a/fang/src/asynk/queries_postgres/insert_task.sql +++ b/fang/src/asynk/queries_postgres/insert_task.sql @@ -1 +1 @@ -INSERT INTO "fang_tasks" ("id", "metadata", "task_type", "scheduled_at") VALUES ($1, $2::jsonb, $3, to_timestamp($4) ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at +INSERT INTO "fang_tasks" ("id", "metadata", "task_type", "scheduled_at") VALUES ($1, $2::jsonb, $3, $4 ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at diff --git a/fang/src/asynk/queries_postgres/insert_task_uniq.sql b/fang/src/asynk/queries_postgres/insert_task_uniq.sql index 9a8d3958..02d079f9 100644 --- a/fang/src/asynk/queries_postgres/insert_task_uniq.sql +++ b/fang/src/asynk/queries_postgres/insert_task_uniq.sql @@ -1 +1 @@ -INSERT INTO "fang_tasks" ( "id" , "metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2::jsonb , $3, $4, to_timestamp($5) ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at +INSERT INTO "fang_tasks" ( "id" , "metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2::jsonb , $3, $4, $5 ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at diff --git a/fang/src/asynk/queries_postgres/remove_all_scheduled_tasks.sql b/fang/src/asynk/queries_postgres/remove_all_scheduled_tasks.sql index 9acd38e1..61a5b6b5 100644 --- a/fang/src/asynk/queries_postgres/remove_all_scheduled_tasks.sql +++ b/fang/src/asynk/queries_postgres/remove_all_scheduled_tasks.sql @@ -1 +1 @@ -DELETE FROM "fang_tasks" WHERE scheduled_at > to_timestamp($1) +DELETE FROM "fang_tasks" WHERE scheduled_at > $1 diff --git a/fang/src/asynk/queries_postgres/retry_task.sql b/fang/src/asynk/queries_postgres/retry_task.sql index 5ad6c9bb..0aee87b5 100644 --- a/fang/src/asynk/queries_postgres/retry_task.sql +++ b/fang/src/asynk/queries_postgres/retry_task.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = to_timestamp($3), "updated_at" = to_timestamp($4) WHERE id = $5::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at +UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = $3, "updated_at" = $4 WHERE id = $5::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at diff --git a/fang/src/asynk/queries_postgres/update_task_state.sql b/fang/src/asynk/queries_postgres/update_task_state.sql index e41a533e..b9bd7200 100644 --- a/fang/src/asynk/queries_postgres/update_task_state.sql +++ b/fang/src/asynk/queries_postgres/update_task_state.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "updated_at" = to_timestamp($2) WHERE id = $3::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at +UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "updated_at" = $2 WHERE id = $3::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at