Skip to content

Commit

Permalink
rework encoding Datime<Utc> for postgresql and sqlite
Browse files Browse the repository at this point in the history
  • Loading branch information
pxp9 committed Apr 17, 2024
1 parent 45d896a commit ea85604
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 54 deletions.
20 changes: 8 additions & 12 deletions fang/src/asynk/backend_sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ where
for<'r> std::string::String: Encode<'r, DB> + Type<DB>,
for<'r> &'r str: Encode<'r, DB> + Type<DB>,
for<'r> i32: Encode<'r, DB> + Type<DB>,
for<'r> i64: Encode<'r, DB> + Type<DB>,
for<'r> DateTime<Utc>: Encode<'r, DB> + Type<DB>,
for<'r> &'r Uuid: Encode<'r, DB> + Type<DB>,
for<'r> &'r Pool<DB>: Executor<'r, Database = DB>,
for<'r> <DB as HasArguments<'r>>::Arguments: IntoArguments<'r, DB>,
Expand All @@ -173,7 +173,7 @@ where
// and the caller is the library itself
let task_type = params.task_type.unwrap();

let now = Utc::now().timestamp();
let now = Utc::now();

let task: Task = sqlx::query_as(query)
.bind(task_type)
Expand Down Expand Up @@ -218,13 +218,9 @@ where
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
let now = Utc::now();
let now_i64 = now.timestamp();

let scheduled_at = now + Duration::seconds(params.backoff_seconds.unwrap() as i64);

// shadowing in order to not change a lot depending on types
let scheduled_at = scheduled_at.timestamp();
let now = now_i64;
let task = params.task.unwrap();
let retries = task.retries + 1;

Expand Down Expand Up @@ -254,7 +250,7 @@ where
let metadata = params.metadata.unwrap();

let metadata_str = metadata.to_string();
let scheduled_at = params.scheduled_at.unwrap().timestamp();
let scheduled_at = params.scheduled_at.unwrap();

let task_type = params.task_type.unwrap();

Expand All @@ -278,7 +274,7 @@ where
) -> Result<Task, AsyncQueueError> {
let uuid = Uuid::new_v4();

let scheduled_at_i64 = params.scheduled_at.unwrap().timestamp();
let scheduled_at = params.scheduled_at.unwrap();

let metadata_str = params.metadata.unwrap().to_string();
let task_type = params.task_type.unwrap();
Expand All @@ -287,7 +283,7 @@ where
.bind(&uuid)
.bind(metadata_str)
.bind(task_type)
.bind(scheduled_at_i64)
.bind(scheduled_at)
.fetch_one(pool)
.await?;

Expand All @@ -299,7 +295,7 @@ where
pool: &Pool<DB>,
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
let updated_at = Utc::now().timestamp();
let updated_at = Utc::now();

let state_str: &str = params.state.unwrap().into();

Expand All @@ -320,7 +316,7 @@ where
pool: &Pool<DB>,
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
let updated_at = Utc::now().timestamp();
let updated_at = Utc::now();

let uuid = params.task.unwrap().id;

Expand Down Expand Up @@ -351,7 +347,7 @@ where
query: &str,
pool: &Pool<DB>,
) -> Result<u64, AsyncQueueError> {
let now = Utc::now().timestamp();
let now = Utc::now();

// This converts <DB>QueryResult to AnyQueryResult and then to u64
// do not delete into() method and do not delete Into<AnyQueryResult> trait bound
Expand Down
35 changes: 0 additions & 35 deletions fang/src/asynk/backend_sqlx/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,41 +350,6 @@ impl FangQueryable<MySql> for BackendSqlXMySQL {

Ok(task)
}

async fn fetch_task_type(
query: &str,
pool: &Pool<MySql>,
params: QueryParams<'_>,
) -> Result<Task, AsyncQueueError> {
// Unwraps by QueryParams are safe because the responsibility is of the caller
// and the caller is the library itself
let task_type = params.task_type.unwrap();

let now = Utc::now();

let task: Task = sqlx::query_as(query)
.bind(task_type)
.bind(now)
.fetch_one(pool)
.await?;

Ok(task)
}

async fn remove_all_scheduled_tasks(
query: &str,
pool: &Pool<MySql>,
) -> Result<u64, AsyncQueueError> {
let now = Utc::now();

// This converts <DB>QueryResult to AnyQueryResult and then to u64
// do not delete into() method and do not delete Into<AnyQueryResult> trait bound

Ok(
Into::<MySqlQueryResult>::into(sqlx::query(query).bind(now).execute(pool).await?)
.rows_affected(),
)
}
}

impl BackendSqlXMySQL {
Expand Down
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/fail_task.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "error_message" = $2 , "updated_at" = to_timestamp($3) WHERE id = $4 RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "error_message" = $2 , "updated_at" = $3 WHERE id = $4 RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/fetch_task_type.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND to_timestamp($2) >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
SELECT id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/insert_task.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
INSERT INTO "fang_tasks" ("id", "metadata", "task_type", "scheduled_at") VALUES ($1, $2::jsonb, $3, to_timestamp($4) ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
INSERT INTO "fang_tasks" ("id", "metadata", "task_type", "scheduled_at") VALUES ($1, $2::jsonb, $3, $4 ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/insert_task_uniq.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
INSERT INTO "fang_tasks" ( "id" , "metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2::jsonb , $3, $4, to_timestamp($5) ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
INSERT INTO "fang_tasks" ( "id" , "metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2::jsonb , $3, $4, $5 ) RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DELETE FROM "fang_tasks" WHERE scheduled_at > to_timestamp($1)
DELETE FROM "fang_tasks" WHERE scheduled_at > $1
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/retry_task.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = to_timestamp($3), "updated_at" = to_timestamp($4) WHERE id = $5::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = $3, "updated_at" = $4 WHERE id = $5::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
2 changes: 1 addition & 1 deletion fang/src/asynk/queries_postgres/update_task_state.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "updated_at" = to_timestamp($2) WHERE id = $3::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at
UPDATE "fang_tasks" SET "state" = $1::fang_task_state , "updated_at" = $2 WHERE id = $3::uuid RETURNING id , metadata::text , error_message, state::text , task_type , uniq_hash, retries , scheduled_at , created_at , updated_at

0 comments on commit ea85604

Please sign in to comment.