Skip to content

Commit

Permalink
Merge pull request #75 from mvniekerk/0.11.0
Browse files Browse the repository at this point in the history
0.11.0
  • Loading branch information
mvniekerk authored Aug 9, 2024
2 parents a427253 + 18db3d3 commit 47059ed
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 53 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio-cron-scheduler"
version = "0.10.3"
version = "0.11.0"
authors = ["Michael van Niekerk <[email protected]>"]
edition = "2018"

Expand Down Expand Up @@ -85,3 +85,5 @@ name = "postgres"
path = "examples/postgres_job.rs"
required-features = ["postgres_storage", "tracing-subscriber"]



2 changes: 1 addition & 1 deletion examples/simple_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn main() {
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed");
let sched = JobScheduler::new().await;
let sched = JobScheduler::new_with_channel_size(1000).await;
let mut sched = sched.unwrap();
let jobs = run_example(&mut sched)
.await
Expand Down
36 changes: 36 additions & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,42 @@ impl Context {
notification_code,
}
}

pub fn new_with_channel_size(
metadata_storage: Arc<RwLock<Box<dyn MetaDataStorage + Send + Sync>>>,
notification_storage: Arc<RwLock<Box<dyn NotificationStore + Send + Sync>>>,
job_code: Arc<RwLock<Box<dyn JobCode + Send + Sync>>>,
notification_code: Arc<RwLock<Box<dyn NotificationCode + Send + Sync>>>,
channel_size: usize,
) -> Self {
let (job_activation_tx, _job_activation_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_tx, _notify_rx) = tokio::sync::broadcast::channel(channel_size);
let (job_create_tx, _job_create_rx) = tokio::sync::broadcast::channel(channel_size);
let (job_created_tx, _job_created_rx) = tokio::sync::broadcast::channel(channel_size);
let (job_delete_tx, _job_delete_rx) = tokio::sync::broadcast::channel(channel_size);
let (job_deleted_tx, _job_deleted_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_create_tx, _notify_create_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_created_tx, _notify_created_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_delete_tx, _notify_delete_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_deleted_tx, _notify_deleted_rx) = tokio::sync::broadcast::channel(channel_size);

Self {
job_activation_tx,
notify_tx,
job_create_tx,
job_created_tx,
job_delete_tx,
job_deleted_tx,
notify_create_tx,
notify_created_tx,
notify_delete_tx,
notify_deleted_tx,
metadata_storage,
notification_storage,
job_code,
notification_code,
}
}
}

impl Clone for Context {
Expand Down
18 changes: 13 additions & 5 deletions src/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ impl JobLocked {

/// Create a new one shot job.
///
/// This is checked if it is running only after 500ms in 500ms intervals.
/// This will schedule a job that is only run once after the duration has passed.
///
/// ```rust,ignore
/// let mut sched = JobScheduler::new();
/// let job = Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
Expand All @@ -393,6 +394,7 @@ impl JobLocked {
/// sched.add(job)
/// tokio::spawn(sched.start());
/// ```
/// Above will run the code after 18 seconds, only once
pub fn new_one_shot<T>(duration: Duration, run: T) -> Result<Self, JobSchedulerError>
where
T: 'static,
Expand All @@ -403,15 +405,21 @@ impl JobLocked {

/// Create a new async one shot job.
///
/// This is checked if it is running only after 500ms in 500ms intervals.
/// This will schedule a job that is only run once after the duration has passed.
///
/// ```rust,ignore
/// let mut sched = JobScheduler::new();
/// let job = Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| Box::pin(async move {
/// println!("{:?} I'm only run once", chrono::Utc::now());
/// }));
///
/// let job = Job::new_one_shot_async(Duration::from_secs(16), |_uuid, _l| {
/// Box::pin(async move {
/// info!("I'm only run once async");
/// })
/// })
/// .unwrap();
/// sched.add(job)
/// tokio::spawn(sched.start());
/// ```
/// Above will run the code after 18 seconds, only once
pub fn new_one_shot_async<T>(duration: Duration, run: T) -> Result<Self, JobSchedulerError>
where
T: 'static,
Expand Down
6 changes: 5 additions & 1 deletion src/job/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::JobSchedulerError;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::RwLock;
use tracing::error;
Expand All @@ -28,7 +29,10 @@ impl JobRunner {
let val = rx.recv().await;
if let Err(e) = val {
error!("Error receiving {:?}", e);
break;
if matches!(e, RecvError::Closed) {
break;
}
continue;
}
let uuid = val.unwrap();
{
Expand Down
35 changes: 24 additions & 11 deletions src/job_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::store::{MetaDataStorage, NotificationStore};
use chrono::{DateTime, NaiveDateTime, Utc};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[cfg(all(unix, feature = "signal"))]
use tokio::signal::unix::SignalKind;
Expand All @@ -24,7 +25,7 @@ pub type ShutdownNotification =
/// The JobScheduler contains and executes the scheduled jobs.
pub struct JobsSchedulerLocked {
pub context: Arc<Context>,
pub inited: Arc<RwLock<bool>>,
pub inited: Arc<AtomicBool>,
pub job_creator: Arc<RwLock<JobCreator>>,
pub job_deleter: Arc<RwLock<JobDeleter>>,
pub job_runner: Arc<RwLock<JobRunner>>,
Expand Down Expand Up @@ -58,6 +59,7 @@ impl JobsSchedulerLocked {
notification_storage: Arc<RwLock<Box<dyn NotificationStore + Send + Sync>>>,
job_code: Arc<RwLock<Box<dyn JobCode + Send + Sync>>>,
notify_code: Arc<RwLock<Box<dyn NotificationCode + Send + Sync>>>,
channel_size: usize,
) -> Result<Arc<Context>, JobSchedulerError> {
{
let mut metadata_storage = metadata_storage.write().await;
Expand All @@ -67,11 +69,12 @@ impl JobsSchedulerLocked {
let mut notification_storage = notification_storage.write().await;
notification_storage.init().await?;
}
let context = Context::new(
let context = Context::new_with_channel_size(
metadata_storage,
notification_storage,
job_code.clone(),
notify_code.clone(),
channel_size,
);
{
let mut job_code = job_code.write().await;
Expand Down Expand Up @@ -139,8 +142,7 @@ impl JobsSchedulerLocked {
///
/// Get whether the scheduler is initialized
pub async fn inited(&self) -> bool {
let r = self.inited.read().await;
*r
self.inited.load(Ordering::Relaxed)
}

///
Expand All @@ -149,10 +151,7 @@ impl JobsSchedulerLocked {
if self.inited().await {
return Ok(());
}
{
let mut w = self.inited.write().await;
*w = true;
}
self.inited.swap(true, Ordering::Relaxed);
self.clone()
.init_actors()
.await
Expand All @@ -161,8 +160,19 @@ impl JobsSchedulerLocked {

///
/// Create a new `MetaDataStorage` and `NotificationStore` using the `SimpleMetadataStore`, `SimpleNotificationStore`,
/// `SimpleJobCode` and `SimpleNotificationCode` implementation
/// `SimpleJobCode` and `SimpleNotificationCode` implementation with channel size of 200
pub async fn new() -> Result<Self, JobSchedulerError> {
Self::new_with_channel_size(200).await
}

///
/// Create a new `MetaDataStorage` and `NotificationStore` using the `SimpleMetadataStore`, `SimpleNotificationStore`,
/// `SimpleJobCode` and `SimpleNotificationCode` implementation
///
/// The channel_size parameter is used to set the size of the channels used to communicate between the actors.
/// The amount in short affects how many messages can be buffered before the sender is blocked.
/// When the sender is blocked, the processing is lagged.
pub async fn new_with_channel_size(channel_size: usize) -> Result<Self, JobSchedulerError> {
let metadata_storage = SimpleMetadataStore::default();
let metadata_storage: Arc<RwLock<Box<dyn MetaDataStorage + Send + Sync>>> =
Arc::new(RwLock::new(Box::new(metadata_storage)));
Expand All @@ -184,13 +194,14 @@ impl JobsSchedulerLocked {
notification_storage,
job_code,
notify_code,
channel_size,
)
.await
.map_err(|_| JobSchedulerError::CantInit)?;

let val = JobsSchedulerLocked {
context,
inited: Arc::new(RwLock::new(false)),
inited: Arc::new(AtomicBool::new(false)),
job_creator: Arc::new(Default::default()),
job_deleter: Arc::new(Default::default()),
job_runner: Arc::new(Default::default()),
Expand All @@ -212,6 +223,7 @@ impl JobsSchedulerLocked {
notification_storage: Box<dyn NotificationStore + Send + Sync>,
job_code: Box<dyn JobCode + Send + Sync>,
notification_code: Box<dyn NotificationCode + Send + Sync>,
channel_size: usize,
) -> Result<Self, JobSchedulerError> {
let metadata_storage = Arc::new(RwLock::new(metadata_storage));
let notification_storage = Arc::new(RwLock::new(notification_storage));
Expand All @@ -223,12 +235,13 @@ impl JobsSchedulerLocked {
notification_storage,
job_code,
notification_code,
channel_size,
)
.await?;

let val = JobsSchedulerLocked {
context,
inited: Arc::new(RwLock::new(false)),
inited: Arc::new(AtomicBool::new(false)),
job_creator: Arc::new(Default::default()),
job_deleter: Arc::new(Default::default()),
job_runner: Arc::new(Default::default()),
Expand Down
6 changes: 5 additions & 1 deletion src/notification/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::JobSchedulerError;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::sync::RwLock;
use tracing::error;
Expand All @@ -27,7 +28,10 @@ impl NotificationRunner {
let val = rx.recv().await;
if let Err(e) = val {
error!("Error receiving value {:?}", e);
break;
if matches!(e, RecvError::Closed) {
break;
}
continue;
}
let (job_id, state) = val.unwrap();
let mut storage = storage.write().await;
Expand Down
47 changes: 14 additions & 33 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::job::job_data::{JobState, JobType};
use crate::job::job_data_prost::{JobState, JobType};
use crate::JobSchedulerError;
use chrono::{FixedOffset, Utc};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot::{Receiver, Sender};
Expand All @@ -13,22 +14,22 @@ use tracing::error;
use uuid::Uuid;

pub struct Scheduler {
pub shutdown: Arc<RwLock<bool>>,
pub shutdown: Arc<AtomicBool>,
pub start_tx: Arc<RwLock<Option<Sender<bool>>>>,
pub start_rx: Arc<RwLock<Option<Receiver<bool>>>>,
pub ticking: Arc<RwLock<bool>>,
pub ticking: Arc<AtomicBool>,
pub inited: bool,
}

impl Default for Scheduler {
fn default() -> Self {
let (ticker_tx, ticker_rx) = tokio::sync::oneshot::channel();
Self {
shutdown: Arc::new(RwLock::new(false)),
shutdown: Arc::new(AtomicBool::new(false)),
inited: false,
start_tx: Arc::new(RwLock::new(Some(ticker_tx))),
start_rx: Arc::new(RwLock::new(Some(ticker_rx))),
ticking: Arc::new(RwLock::new(false)),
ticking: Arc::new(AtomicBool::new(false)),
}
}
}
Expand All @@ -49,35 +50,23 @@ impl Scheduler {

let start_rx = {
let mut w = self.start_rx.write().await;

let mut start_rx: Option<Receiver<bool>> = None;
std::mem::swap(&mut start_rx, &mut *w);
start_rx
w.take()
};

let ticking = self.ticking.clone();
tokio::spawn(async move {
let is_ticking = {
let ticking = ticking.read().await;
*ticking
};
let is_ticking = ticking.load(Ordering::Relaxed);
if !is_ticking {
if let Some(start_rx) = start_rx {
if let Err(e) = start_rx.await {
error!(?e, "Could not subscribe to ticker starter");
return;
}
}
let is_ticking = {
let ticking = ticking.read().await;
*ticking
};
let is_ticking = ticking.load(Ordering::Relaxed);
if !is_ticking {
loop {
let is_ticking = {
let ticking = ticking.read().await;
*ticking
};
let is_ticking = ticking.load(Ordering::Relaxed);
if is_ticking {
break;
}
Expand All @@ -87,8 +76,8 @@ impl Scheduler {
}
'next_tick: loop {
let shutdown = {
let r = shutdown.read().await;
*r
let r = shutdown.load(Ordering::Relaxed);
r
};
if shutdown {
break 'next_tick;
Expand Down Expand Up @@ -191,7 +180,6 @@ impl Scheduler {
Ok(Some(job)) => {
let job_type: JobType = JobType::from_i32(job.job_type).unwrap();
let schedule = job.schedule();
// TODO continue from here
let fixed_offset = FixedOffset::east_opt(job.time_offset_seconds)
.unwrap_or(FixedOffset::east_opt(0).unwrap());
let now = now.with_timezone(&fixed_offset);
Expand Down Expand Up @@ -236,22 +224,15 @@ impl Scheduler {
}

pub async fn shutdown(&mut self) {
let mut w = self.shutdown.write().await;
*w = true;
self.shutdown.swap(true, Ordering::Relaxed);
}

pub async fn start(&mut self) -> Result<(), JobSchedulerError> {
let is_ticking = {
let ticking = self.ticking.read().await;
*ticking
};
let is_ticking = self.ticking.load(Ordering::Relaxed);
if is_ticking {
Err(JobSchedulerError::TickError)
} else {
{
let mut w = self.ticking.write().await;
*w = true;
}
self.ticking.swap(true, Ordering::Relaxed);
let tx = {
let mut w = self.start_tx.write().await;
let mut tx: Option<Sender<bool>> = None;
Expand Down

0 comments on commit 47059ed

Please sign in to comment.