Skip to content

Commit

Permalink
Channels with lists
Browse files Browse the repository at this point in the history
  • Loading branch information
mvniekerk committed Aug 9, 2024
1 parent 5e589b7 commit f049f80
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 5 deletions.
2 changes: 1 addition & 1 deletion examples/simple_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn main() {
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed");
let sched = JobScheduler::new().await;
let sched = JobScheduler::new_with_channel_size(1000).await;
let mut sched = sched.unwrap();
let jobs = run_example(&mut sched)
.await
Expand Down
36 changes: 36 additions & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,42 @@ impl Context {
notification_code,
}
}

pub fn new_with_channel_size(
metadata_storage: Arc<RwLock<Box<dyn MetaDataStorage + Send + Sync>>>,
notification_storage: Arc<RwLock<Box<dyn NotificationStore + Send + Sync>>>,
job_code: Arc<RwLock<Box<dyn JobCode + Send + Sync>>>,
notification_code: Arc<RwLock<Box<dyn NotificationCode + Send + Sync>>>,
channel_size: usize,
) -> Self {
let (job_activation_tx, _job_activation_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_tx, _notify_rx) = tokio::sync::broadcast::channel(channel_size);
let (job_create_tx, _job_create_rx) = tokio::sync::broadcast::channel(channel_size);
let (job_created_tx, _job_created_rx) = tokio::sync::broadcast::channel(channel_size);
let (job_delete_tx, _job_delete_rx) = tokio::sync::broadcast::channel(channel_size);
let (job_deleted_tx, _job_deleted_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_create_tx, _notify_create_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_created_tx, _notify_created_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_delete_tx, _notify_delete_rx) = tokio::sync::broadcast::channel(channel_size);
let (notify_deleted_tx, _notify_deleted_rx) = tokio::sync::broadcast::channel(channel_size);

Self {
job_activation_tx,
notify_tx,
job_create_tx,
job_created_tx,
job_delete_tx,
job_deleted_tx,
notify_create_tx,
notify_created_tx,
notify_delete_tx,
notify_deleted_tx,
metadata_storage,
notification_storage,
job_code,
notification_code,
}
}
}

impl Clone for Context {
Expand Down
6 changes: 5 additions & 1 deletion src/job/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::JobSchedulerError;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::RwLock;
use tracing::error;
Expand All @@ -28,7 +29,10 @@ impl JobRunner {
let val = rx.recv().await;
if let Err(e) = val {
error!("Error receiving {:?}", e);
break;
if matches!(e, RecvError::Closed) {
break;
}
continue;
}
let uuid = val.unwrap();
{
Expand Down
20 changes: 18 additions & 2 deletions src/job_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl JobsSchedulerLocked {
notification_storage: Arc<RwLock<Box<dyn NotificationStore + Send + Sync>>>,
job_code: Arc<RwLock<Box<dyn JobCode + Send + Sync>>>,
notify_code: Arc<RwLock<Box<dyn NotificationCode + Send + Sync>>>,
channel_size: usize,
) -> Result<Arc<Context>, JobSchedulerError> {
{
let mut metadata_storage = metadata_storage.write().await;
Expand All @@ -68,11 +69,12 @@ impl JobsSchedulerLocked {
let mut notification_storage = notification_storage.write().await;
notification_storage.init().await?;
}
let context = Context::new(
let context = Context::new_with_channel_size(
metadata_storage,
notification_storage,
job_code.clone(),
notify_code.clone(),
channel_size,
);
{
let mut job_code = job_code.write().await;
Expand Down Expand Up @@ -158,8 +160,19 @@ impl JobsSchedulerLocked {

///
/// Create a new `MetaDataStorage` and `NotificationStore` using the `SimpleMetadataStore`, `SimpleNotificationStore`,
/// `SimpleJobCode` and `SimpleNotificationCode` implementation
/// `SimpleJobCode` and `SimpleNotificationCode` implementation with channel size of 200
pub async fn new() -> Result<Self, JobSchedulerError> {
Self::new_with_channel_size(200).await
}

///
/// Create a new `MetaDataStorage` and `NotificationStore` using the `SimpleMetadataStore`, `SimpleNotificationStore`,
/// `SimpleJobCode` and `SimpleNotificationCode` implementation
///
/// The channel_size parameter is used to set the size of the channels used to communicate between the actors.
/// The amount in short affects how many messages can be buffered before the sender is blocked.
/// When the sender is blocked, the processing is lagged.
pub async fn new_with_channel_size(channel_size: usize) -> Result<Self, JobSchedulerError> {
let metadata_storage = SimpleMetadataStore::default();
let metadata_storage: Arc<RwLock<Box<dyn MetaDataStorage + Send + Sync>>> =
Arc::new(RwLock::new(Box::new(metadata_storage)));
Expand All @@ -181,6 +194,7 @@ impl JobsSchedulerLocked {
notification_storage,
job_code,
notify_code,
channel_size,
)
.await
.map_err(|_| JobSchedulerError::CantInit)?;
Expand Down Expand Up @@ -209,6 +223,7 @@ impl JobsSchedulerLocked {
notification_storage: Box<dyn NotificationStore + Send + Sync>,
job_code: Box<dyn JobCode + Send + Sync>,
notification_code: Box<dyn NotificationCode + Send + Sync>,
channel_size: usize,
) -> Result<Self, JobSchedulerError> {
let metadata_storage = Arc::new(RwLock::new(metadata_storage));
let notification_storage = Arc::new(RwLock::new(notification_storage));
Expand All @@ -220,6 +235,7 @@ impl JobsSchedulerLocked {
notification_storage,
job_code,
notification_code,
channel_size,
)
.await?;

Expand Down
6 changes: 5 additions & 1 deletion src/notification/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::JobSchedulerError;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::sync::RwLock;
use tracing::error;
Expand All @@ -27,7 +28,10 @@ impl NotificationRunner {
let val = rx.recv().await;
if let Err(e) = val {
error!("Error receiving value {:?}", e);
break;
if matches!(e, RecvError::Closed) {
break;
}
continue;
}
let (job_id, state) = val.unwrap();
let mut storage = storage.write().await;
Expand Down

0 comments on commit f049f80

Please sign in to comment.