From 5e589b74c16d5f348448f2e3944152a2de25656f Mon Sep 17 00:00:00 2001 From: Michael van Niekerk Date: Fri, 9 Aug 2024 21:33:04 +0200 Subject: [PATCH 1/3] Changes as per issue 65 --- Cargo.toml | 2 ++ src/job/mod.rs | 18 ++++++++++++----- src/job_scheduler.rs | 15 ++++++-------- src/scheduler.rs | 47 +++++++++++++------------------------------- 4 files changed, 35 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9a796e5..fd7189c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,3 +85,5 @@ name = "postgres" path = "examples/postgres_job.rs" required-features = ["postgres_storage", "tracing-subscriber"] + + diff --git a/src/job/mod.rs b/src/job/mod.rs index ebbbf13..77925a4 100644 --- a/src/job/mod.rs +++ b/src/job/mod.rs @@ -384,7 +384,8 @@ impl JobLocked { /// Create a new one shot job. /// - /// This is checked if it is running only after 500ms in 500ms intervals. + /// This will schedule a job that is only run once after the duration has passed. + /// /// ```rust,ignore /// let mut sched = JobScheduler::new(); /// let job = Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| { @@ -393,6 +394,7 @@ impl JobLocked { /// sched.add(job) /// tokio::spawn(sched.start()); /// ``` + /// Above will run the code after 18 seconds, only once pub fn new_one_shot(duration: Duration, run: T) -> Result where T: 'static, @@ -403,15 +405,21 @@ impl JobLocked { /// Create a new async one shot job. /// - /// This is checked if it is running only after 500ms in 500ms intervals. + /// This will schedule a job that is only run once after the duration has passed. + /// /// ```rust,ignore /// let mut sched = JobScheduler::new(); - /// let job = Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| Box::pin(async move { - /// println!("{:?} I'm only run once", chrono::Utc::now()); - /// })); + /// + /// let job = Job::new_one_shot_async(Duration::from_secs(16), |_uuid, _l| { + /// Box::pin(async move { + /// info!("I'm only run once async"); + /// }) + /// }) + /// .unwrap(); /// sched.add(job) /// tokio::spawn(sched.start()); /// ``` + /// Above will run the code after 18 seconds, only once pub fn new_one_shot_async(duration: Duration, run: T) -> Result where T: 'static, diff --git a/src/job_scheduler.rs b/src/job_scheduler.rs index aea80ff..b8238c0 100644 --- a/src/job_scheduler.rs +++ b/src/job_scheduler.rs @@ -11,6 +11,7 @@ use crate::store::{MetaDataStorage, NotificationStore}; use chrono::{DateTime, NaiveDateTime, Utc}; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; #[cfg(all(unix, feature = "signal"))] use tokio::signal::unix::SignalKind; @@ -24,7 +25,7 @@ pub type ShutdownNotification = /// The JobScheduler contains and executes the scheduled jobs. pub struct JobsSchedulerLocked { pub context: Arc, - pub inited: Arc>, + pub inited: Arc, pub job_creator: Arc>, pub job_deleter: Arc>, pub job_runner: Arc>, @@ -139,8 +140,7 @@ impl JobsSchedulerLocked { /// /// Get whether the scheduler is initialized pub async fn inited(&self) -> bool { - let r = self.inited.read().await; - *r + self.inited.load(Ordering::Relaxed) } /// @@ -149,10 +149,7 @@ impl JobsSchedulerLocked { if self.inited().await { return Ok(()); } - { - let mut w = self.inited.write().await; - *w = true; - } + self.inited.swap(true, Ordering::Relaxed); self.clone() .init_actors() .await @@ -190,7 +187,7 @@ impl JobsSchedulerLocked { let val = JobsSchedulerLocked { context, - inited: Arc::new(RwLock::new(false)), + inited: Arc::new(AtomicBool::new(false)), job_creator: Arc::new(Default::default()), job_deleter: Arc::new(Default::default()), job_runner: Arc::new(Default::default()), @@ -228,7 +225,7 @@ impl JobsSchedulerLocked { let val = JobsSchedulerLocked { context, - inited: Arc::new(RwLock::new(false)), + inited: Arc::new(AtomicBool::new(false)), job_creator: Arc::new(Default::default()), job_deleter: Arc::new(Default::default()), job_runner: Arc::new(Default::default()), diff --git a/src/scheduler.rs b/src/scheduler.rs index ac776c5..38ae66a 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,6 +5,7 @@ use crate::job::job_data::{JobState, JobType}; use crate::job::job_data_prost::{JobState, JobType}; use crate::JobSchedulerError; use chrono::{FixedOffset, Utc}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::oneshot::{Receiver, Sender}; @@ -13,10 +14,10 @@ use tracing::error; use uuid::Uuid; pub struct Scheduler { - pub shutdown: Arc>, + pub shutdown: Arc, pub start_tx: Arc>>>, pub start_rx: Arc>>>, - pub ticking: Arc>, + pub ticking: Arc, pub inited: bool, } @@ -24,11 +25,11 @@ impl Default for Scheduler { fn default() -> Self { let (ticker_tx, ticker_rx) = tokio::sync::oneshot::channel(); Self { - shutdown: Arc::new(RwLock::new(false)), + shutdown: Arc::new(AtomicBool::new(false)), inited: false, start_tx: Arc::new(RwLock::new(Some(ticker_tx))), start_rx: Arc::new(RwLock::new(Some(ticker_rx))), - ticking: Arc::new(RwLock::new(false)), + ticking: Arc::new(AtomicBool::new(false)), } } } @@ -49,18 +50,12 @@ impl Scheduler { let start_rx = { let mut w = self.start_rx.write().await; - - let mut start_rx: Option> = None; - std::mem::swap(&mut start_rx, &mut *w); - start_rx + w.take() }; let ticking = self.ticking.clone(); tokio::spawn(async move { - let is_ticking = { - let ticking = ticking.read().await; - *ticking - }; + let is_ticking = ticking.load(Ordering::Relaxed); if !is_ticking { if let Some(start_rx) = start_rx { if let Err(e) = start_rx.await { @@ -68,16 +63,10 @@ impl Scheduler { return; } } - let is_ticking = { - let ticking = ticking.read().await; - *ticking - }; + let is_ticking = ticking.load(Ordering::Relaxed); if !is_ticking { loop { - let is_ticking = { - let ticking = ticking.read().await; - *ticking - }; + let is_ticking = ticking.load(Ordering::Relaxed); if is_ticking { break; } @@ -87,8 +76,8 @@ impl Scheduler { } 'next_tick: loop { let shutdown = { - let r = shutdown.read().await; - *r + let r = shutdown.load(Ordering::Relaxed); + r }; if shutdown { break 'next_tick; @@ -191,7 +180,6 @@ impl Scheduler { Ok(Some(job)) => { let job_type: JobType = JobType::from_i32(job.job_type).unwrap(); let schedule = job.schedule(); - // TODO continue from here let fixed_offset = FixedOffset::east_opt(job.time_offset_seconds) .unwrap_or(FixedOffset::east_opt(0).unwrap()); let now = now.with_timezone(&fixed_offset); @@ -236,22 +224,15 @@ impl Scheduler { } pub async fn shutdown(&mut self) { - let mut w = self.shutdown.write().await; - *w = true; + self.shutdown.swap(true, Ordering::Relaxed); } pub async fn start(&mut self) -> Result<(), JobSchedulerError> { - let is_ticking = { - let ticking = self.ticking.read().await; - *ticking - }; + let is_ticking = self.ticking.load(Ordering::Relaxed); if is_ticking { Err(JobSchedulerError::TickError) } else { - { - let mut w = self.ticking.write().await; - *w = true; - } + self.ticking.swap(true, Ordering::Relaxed); let tx = { let mut w = self.start_tx.write().await; let mut tx: Option> = None; From f049f809faf1950fe8b50f0ebe3626c815ad3730 Mon Sep 17 00:00:00 2001 From: Michael van Niekerk Date: Fri, 9 Aug 2024 22:05:43 +0200 Subject: [PATCH 2/3] Channels with lists --- examples/simple_job.rs | 2 +- src/context.rs | 36 ++++++++++++++++++++++++++++++++++++ src/job/runner.rs | 6 +++++- src/job_scheduler.rs | 20 ++++++++++++++++++-- src/notification/runner.rs | 6 +++++- 5 files changed, 65 insertions(+), 5 deletions(-) diff --git a/examples/simple_job.rs b/examples/simple_job.rs index 6310722..17229cd 100644 --- a/examples/simple_job.rs +++ b/examples/simple_job.rs @@ -11,7 +11,7 @@ async fn main() { .with_max_level(Level::TRACE) .finish(); tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed"); - let sched = JobScheduler::new().await; + let sched = JobScheduler::new_with_channel_size(1000).await; let mut sched = sched.unwrap(); let jobs = run_example(&mut sched) .await diff --git a/src/context.rs b/src/context.rs index fe66b45..9a0b6a9 100644 --- a/src/context.rs +++ b/src/context.rs @@ -67,6 +67,42 @@ impl Context { notification_code, } } + + pub fn new_with_channel_size( + metadata_storage: Arc>>, + notification_storage: Arc>>, + job_code: Arc>>, + notification_code: Arc>>, + channel_size: usize, + ) -> Self { + let (job_activation_tx, _job_activation_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_tx, _notify_rx) = tokio::sync::broadcast::channel(channel_size); + let (job_create_tx, _job_create_rx) = tokio::sync::broadcast::channel(channel_size); + let (job_created_tx, _job_created_rx) = tokio::sync::broadcast::channel(channel_size); + let (job_delete_tx, _job_delete_rx) = tokio::sync::broadcast::channel(channel_size); + let (job_deleted_tx, _job_deleted_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_create_tx, _notify_create_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_created_tx, _notify_created_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_delete_tx, _notify_delete_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_deleted_tx, _notify_deleted_rx) = tokio::sync::broadcast::channel(channel_size); + + Self { + job_activation_tx, + notify_tx, + job_create_tx, + job_created_tx, + job_delete_tx, + job_deleted_tx, + notify_create_tx, + notify_created_tx, + notify_delete_tx, + notify_deleted_tx, + metadata_storage, + notification_storage, + job_code, + notification_code, + } + } } impl Clone for Context { diff --git a/src/job/runner.rs b/src/job/runner.rs index 8e6bc8f..2bef567 100644 --- a/src/job/runner.rs +++ b/src/job/runner.rs @@ -9,6 +9,7 @@ use crate::JobSchedulerError; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::RwLock; use tracing::error; @@ -28,7 +29,10 @@ impl JobRunner { let val = rx.recv().await; if let Err(e) = val { error!("Error receiving {:?}", e); - break; + if matches!(e, RecvError::Closed) { + break; + } + continue; } let uuid = val.unwrap(); { diff --git a/src/job_scheduler.rs b/src/job_scheduler.rs index b8238c0..c252872 100644 --- a/src/job_scheduler.rs +++ b/src/job_scheduler.rs @@ -59,6 +59,7 @@ impl JobsSchedulerLocked { notification_storage: Arc>>, job_code: Arc>>, notify_code: Arc>>, + channel_size: usize, ) -> Result, JobSchedulerError> { { let mut metadata_storage = metadata_storage.write().await; @@ -68,11 +69,12 @@ impl JobsSchedulerLocked { let mut notification_storage = notification_storage.write().await; notification_storage.init().await?; } - let context = Context::new( + let context = Context::new_with_channel_size( metadata_storage, notification_storage, job_code.clone(), notify_code.clone(), + channel_size, ); { let mut job_code = job_code.write().await; @@ -158,8 +160,19 @@ impl JobsSchedulerLocked { /// /// Create a new `MetaDataStorage` and `NotificationStore` using the `SimpleMetadataStore`, `SimpleNotificationStore`, - /// `SimpleJobCode` and `SimpleNotificationCode` implementation + /// `SimpleJobCode` and `SimpleNotificationCode` implementation with channel size of 200 pub async fn new() -> Result { + Self::new_with_channel_size(200).await + } + + /// + /// Create a new `MetaDataStorage` and `NotificationStore` using the `SimpleMetadataStore`, `SimpleNotificationStore`, + /// `SimpleJobCode` and `SimpleNotificationCode` implementation + /// + /// The channel_size parameter is used to set the size of the channels used to communicate between the actors. + /// The amount in short affects how many messages can be buffered before the sender is blocked. + /// When the sender is blocked, the processing is lagged. + pub async fn new_with_channel_size(channel_size: usize) -> Result { let metadata_storage = SimpleMetadataStore::default(); let metadata_storage: Arc>> = Arc::new(RwLock::new(Box::new(metadata_storage))); @@ -181,6 +194,7 @@ impl JobsSchedulerLocked { notification_storage, job_code, notify_code, + channel_size, ) .await .map_err(|_| JobSchedulerError::CantInit)?; @@ -209,6 +223,7 @@ impl JobsSchedulerLocked { notification_storage: Box, job_code: Box, notification_code: Box, + channel_size: usize, ) -> Result { let metadata_storage = Arc::new(RwLock::new(metadata_storage)); let notification_storage = Arc::new(RwLock::new(notification_storage)); @@ -220,6 +235,7 @@ impl JobsSchedulerLocked { notification_storage, job_code, notification_code, + channel_size, ) .await?; diff --git a/src/notification/runner.rs b/src/notification/runner.rs index d45540b..754179d 100644 --- a/src/notification/runner.rs +++ b/src/notification/runner.rs @@ -9,6 +9,7 @@ use crate::JobSchedulerError; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::sync::RwLock; use tracing::error; @@ -27,7 +28,10 @@ impl NotificationRunner { let val = rx.recv().await; if let Err(e) = val { error!("Error receiving value {:?}", e); - break; + if matches!(e, RecvError::Closed) { + break; + } + continue; } let (job_id, state) = val.unwrap(); let mut storage = storage.write().await; From 18db3d341e49736197f8e7e95db154a9846260f6 Mon Sep 17 00:00:00 2001 From: Michael van Niekerk Date: Fri, 9 Aug 2024 22:07:54 +0200 Subject: [PATCH 3/3] Specify channel size --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index fd7189c..eec23d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tokio-cron-scheduler" -version = "0.10.3" +version = "0.11.0" authors = ["Michael van Niekerk "] edition = "2018"