Skip to content

Commit

Permalink
Merge pull request #82 from mvniekerk/english-to-cron-integration
Browse files Browse the repository at this point in the history
English to cron integration
  • Loading branch information
mvniekerk authored Sep 22, 2024
2 parents 0ad09d9 + c7cf2a3 commit 05e29f4
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 34 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ categories = ["date-and-time"]
tokio = { version = "1", features = ["time", "rt", "sync"] }
croner = "2.0.5"
chrono = { version = "0.4", default-features = false }
english-to-cron = { version = "0.1", optional = true }
uuid = { version = "1", features = ["v4"] }
prost = { version = "0.13", optional = true }
tracing = "0.1"
Expand Down Expand Up @@ -58,6 +59,7 @@ prost-build = { version = "0.13", optional = true }
signal = ["tokio/signal"]
has_bytes = ["prost-build", "prost"]
nats_storage = ["nats", "has_bytes"]
english = ["english-to-cron"]
postgres_storage = ["tokio-postgres", "has_bytes"]
postgres_native_tls = ["postgres_storage", "postgres-native-tls"]
postgres_openssl = ["postgres_storage", "postgres-openssl"]
Expand Down
58 changes: 42 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ async fn main() -> Result<(), JobSchedulerError> {
})?
).await?;

// Needs the `english` feature enabled
sched.add(
Job::new_async("every 4 seconds", |uuid, mut l| {
Box::pin(async move {
println!("I run async every 4 seconds");

// Query the next execution time for this job
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => println!("Next time for 4s job is {:?}", ts),
_ => println!("Could not get next tick for 4s job"),
}
})
})?
);

// Add one-shot job with given duration
sched.add(
Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
Expand Down Expand Up @@ -139,23 +155,25 @@ chrono-tz is not included into the dependencies, so you need to add it to your C
would like to have easy creation of a `Timezone` struct.

```rust
let job = JobBuilder::new()
.with_timezone(chrono_tz::Africa::Johannesburg)
.with_cron_job_type()
.with_schedule("*/2 * * * *")
.unwrap()
.with_run_async(Box::new( | uuid, mut l| {
Box::pin(async move {
info ! ("JHB run async every 2 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info !("Next time for JHB 2s is {:?}", ts),
_ => warn !("Could not get next tick for 2s job"),
async fn tz_job() {
let job = JobBuilder::new()
.with_timezone(chrono_tz::Africa::Johannesburg)
.with_cron_job_type()
.with_schedule("*/2 * * * *")
.unwrap()
.with_run_async(Box::new(|uuid, mut l| {
Box::pin(async move {
info!("JHB run async every 2 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info!("Next time for JHB 2s is {:?}", ts),
_ => warn!("Could not get next tick for 2s job"),
}
})
}))
.build()
.unwrap();
}
})
}))
.build()
.unwrap();
```

## Similar Libraries
Expand Down Expand Up @@ -191,6 +209,14 @@ Please see the [CONTRIBUTING](CONTRIBUTING.md) file for more information.

## Features

### english

Since 0.13.0

Enables the schedule text to be interpreted in English. This is done using
the [english-to-cron](https://crates.io/crates/english-to-cron) crate.
For instance "every 15 seconds" will be converted in the background to "0/15 * * * * ? *".

### has_bytes

Since 0.7
Expand Down
30 changes: 29 additions & 1 deletion examples/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,32 @@ pub async fn run_example(sched: &mut JobScheduler) -> Result<Vec<Uuid>, JobSched
let jhb_job_guid = jhb_job.guid();
sched.add(jhb_job).await.unwrap();

#[cfg(feature = "english")]
let english_job_guid = {
let english_job = JobBuilder::new()
.with_timezone(Utc)
.with_cron_job_type()
// .with_schedule("every 10 seconds")
.with_schedule("every 10 seconds")
.unwrap()
.with_run_async(Box::new(|uuid, mut l| {
Box::pin(async move {
info!("English parsed job every 10 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info!("Next time for English parsed job is is {:?}", ts),
_ => warn!("Could not get next tick for English parsed job"),
}
})
}))
.build()
.unwrap();

let english_job_guid = english_job.guid();
sched.add(english_job).await.unwrap();
english_job_guid
};

let start = sched.start().await;
if let Err(e) = start {
error!("Error starting scheduler {}", e);
Expand All @@ -194,8 +220,10 @@ pub async fn run_example(sched: &mut JobScheduler) -> Result<Vec<Uuid>, JobSched
jja_guid,
utc_job_guid,
jhb_job_guid,
#[cfg(feature = "english")]
english_job_guid,
];
return Ok(ret);
Ok(ret)
}

pub async fn stop_example(
Expand Down
11 changes: 5 additions & 6 deletions src/job/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::job::cron_job::CronJob;
#[cfg(not(feature = "has_bytes"))]
use crate::job::job_data;
#[cfg(not(feature = "has_bytes"))]
pub use crate::job::job_data::{JobStoredData, JobType, Uuid};
#[cfg(feature = "has_bytes")]
use crate::job::job_data_prost;
#[cfg(feature = "has_bytes")]
pub use crate::job::job_data_prost::{JobStoredData, JobType, Uuid};
use crate::job::{nop, nop_async, JobLocked};
use crate::{JobSchedulerError, JobToRun, JobToRunAsync};
Expand All @@ -11,11 +15,6 @@ use croner::Cron;
use std::sync::{Arc, RwLock};
use std::time::Instant;

#[cfg(not(feature = "has_bytes"))]
use crate::job::job_data;
#[cfg(feature = "has_bytes")]
use crate::job::job_data_prost;

use uuid::Uuid as UuidUuid;

pub struct JobBuilder<T> {
Expand Down Expand Up @@ -100,7 +99,7 @@ impl<T: TimeZone> JobBuilder<T> {
where
TS: ToString,
{
let schedule = schedule.to_string();
let schedule = JobLocked::schedule_to_cron(schedule)?;
let schedule = Cron::new(&schedule)
.with_seconds_required()
.with_dom_and_dow()
Expand Down
58 changes: 47 additions & 11 deletions src/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod runner;
pub mod to_code;

use crate::notification::{NotificationCreator, NotificationDeleter};
use crate::JobSchedulerError::ParseSchedule;
pub use builder::JobBuilder;
pub use creator::JobCreator;
pub use deleter::JobDeleter;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl JobLocked {
S: ToString,
TZ: TimeZone,
{
let schedule = schedule.to_string();
let schedule = Self::schedule_to_cron(schedule)?;
let time_offset_seconds = timezone
.offset_from_utc_datetime(&Utc::now().naive_local())
.fix()
Expand All @@ -132,7 +133,7 @@ impl JobLocked {
.with_seconds_required()
.with_dom_and_dow()
.parse()
.map_err(|_| JobSchedulerError::ParseSchedule)?;
.map_err(|_| ParseSchedule)?;
let job_id = Uuid::new_v4();
Ok(Self(Arc::new(RwLock::new(Box::new(CronJob {
data: JobStoredData {
Expand Down Expand Up @@ -179,12 +180,13 @@ impl JobLocked {
/// sched.add(job)
/// tokio::spawn(sched.start());
/// ```
pub fn new_async<T>(schedule: &str, run: T) -> Result<Self, JobSchedulerError>
pub fn new_async<S, T>(schedule: S, run: T) -> Result<Self, JobSchedulerError>
where
T: 'static,
T: FnMut(Uuid, JobsSchedulerLocked) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync,
S: ToString,
{
Self::new_async_tz(schedule, Utc, run)
}
Expand Down Expand Up @@ -214,7 +216,7 @@ impl JobLocked {
S: ToString,
TZ: TimeZone,
{
let schedule = schedule.to_string();
let schedule = Self::schedule_to_cron(schedule)?;
let time_offset_seconds = timezone
.offset_from_utc_datetime(&Utc::now().naive_local())
.fix()
Expand All @@ -223,7 +225,7 @@ impl JobLocked {
.with_seconds_required()
.with_dom_and_dow()
.parse()
.map_err(|_| JobSchedulerError::ParseSchedule)?;
.map_err(|_| ParseSchedule)?;
let job_id = Uuid::new_v4();
Ok(Self(Arc::new(RwLock::new(Box::new(CronJob {
data: JobStoredData {
Expand Down Expand Up @@ -315,7 +317,7 @@ impl JobLocked {
/// tokio::spawn(sched.start());
/// ```
pub fn new_cron_job_async_tz<S, T, TZ>(
schedule: &str,
schedule: S,
timezone: TZ,
run: T,
) -> Result<Self, JobSchedulerError>
Expand Down Expand Up @@ -488,10 +490,7 @@ impl JobLocked {
/// sched.add(job)
/// tokio::spawn(sched.start());
/// ```
pub fn new_one_shot_at_instant<T>(
instant: std::time::Instant,
run: T,
) -> Result<Self, JobSchedulerError>
pub fn new_one_shot_at_instant<T>(instant: Instant, run: T) -> Result<Self, JobSchedulerError>
where
T: 'static,
T: FnMut(Uuid, JobsSchedulerLocked) + Send + Sync,
Expand All @@ -515,7 +514,7 @@ impl JobLocked {
/// tokio::spawn(sched.start());
/// ```
pub fn new_one_shot_at_instant_async<T>(
instant: std::time::Instant,
instant: Instant,
run: T,
) -> Result<Self, JobSchedulerError>
where
Expand Down Expand Up @@ -885,4 +884,41 @@ impl JobLocked {
_ => Err(JobSchedulerError::GetJobData),
}
}

#[cfg(not(feature = "english"))]
pub fn schedule_to_cron<T: ToString>(schedule: T) -> Result<String, JobSchedulerError> {
Ok(schedule.to_string())
}

#[cfg(feature = "english")]
pub fn schedule_to_cron<T: ToString>(schedule: T) -> Result<String, JobSchedulerError> {
let schedule = schedule.to_string();
match Cron::new(&schedule)
.with_seconds_required()
.with_dom_and_dow()
.parse()
{
Ok(_) => Ok(schedule),
Err(_) => match english_to_cron::str_cron_syntax(&schedule) {
Ok(english_to_cron) => {
if english_to_cron != schedule {
if english_to_cron == "0 * * * * ? *" {
Err(ParseSchedule)
} else {
// english-to-cron adds the year field which we can't put off (currently)
let cron = english_to_cron
.split(' ')
.take(6)
.collect::<Vec<_>>()
.join(" ");
Ok(cron)
}
} else {
Ok(schedule)
}
}
Err(_) => Err(ParseSchedule),
},
}
}
}

0 comments on commit 05e29f4

Please sign in to comment.