Skip to content

Commit

Permalink
Nico/fix cron (#161)
Browse files Browse the repository at this point in the history
* cp

* more fixes

* node working

* update libs

* fix

* version sync and health update

* dont allow to finish something already finished
  • Loading branch information
nicarq authored Dec 1, 2023
1 parent e01788d commit 89f55d6
Show file tree
Hide file tree
Showing 16 changed files with 226 additions and 110 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shinkai_node"
version = "0.1.0"
version = "0.2.9"
edition = "2018"
authors = ["Nico Arqueros <[email protected]>"]

Expand Down
18 changes: 15 additions & 3 deletions src/agent/execution/job_execution_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,18 @@ impl JobManager {
return Err(AgentError::AgentNotFound);
}
};
shinkai_log(ShinkaiLogOption::JobExecution, ShinkaiLogLevel::Debug, format!("KaiJobFile: {:?}", kai_file).as_str());
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Debug,
format!("KaiJobFile: {:?}", kai_file).as_str(),
);
match kai_file.schema {
KaiSchemaType::CronJobRequest(cron_task_request) => {
shinkai_log(ShinkaiLogOption::JobExecution, ShinkaiLogLevel::Debug, format!("CronJobRequest: {:?}", cron_task_request).as_str());
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Debug,
format!("CronJobRequest: {:?}", cron_task_request).as_str(),
);
// Handle CronJobRequest
JobManager::handle_cron_job_request(
db.clone(),
Expand All @@ -267,7 +275,11 @@ impl JobManager {
return Ok(true);
}
KaiSchemaType::CronJob(cron_task) => {
shinkai_log(ShinkaiLogOption::JobExecution, ShinkaiLogLevel::Debug, format!("CronJob: {:?}", cron_task).as_str());
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Debug,
format!("CronJob: {:?}", cron_task).as_str(),
);
// Handle CronJob
JobManager::handle_cron_job(
db.clone(),
Expand Down
7 changes: 3 additions & 4 deletions src/agent/execution/job_execution_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
},
cron_tasks::web_scrapper::{CronTaskRequest, CronTaskRequestResponse, WebScraper},
db::{db_cron_task::CronTask, db_errors::ShinkaiDBError, ShinkaiDB},
planner::kai_files::{KaiJobFile, KaiSchemaType},
planner::{kai_files::{KaiJobFile, KaiSchemaType}, kai_manager::KaiJobFileManager},
};

impl JobManager {
Expand Down Expand Up @@ -154,11 +154,10 @@ impl JobManager {
);
// Create Job
let job_id = full_job.job_id.to_string();
let inbox_name = InboxName::get_job_inbox_name_from_params(job_id.clone())?;
let shinkai_message = ShinkaiMessageBuilder::job_message_from_agent(
full_job.clone().job_id.to_string(),
inference_response_content.clone(),
inbox_name.to_string(),
"".to_string(),
clone_signature_secret_key(&identity_secret_key),
profile.node_name.clone(),
profile.node_name.clone(),
Expand Down Expand Up @@ -204,7 +203,7 @@ impl JobManager {
let shinkai_message = ShinkaiMessageBuilder::job_message_from_agent(
full_job.clone().job_id.to_string(),
inference_response_content.clone(),
inbox_name.to_string(),
"".to_string(),
clone_signature_secret_key(&identity_secret_key),
profile.node_name.clone(),
profile.node_name.clone(),
Expand Down
3 changes: 0 additions & 3 deletions src/agent/providers/ollama.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ impl LLMProvider for Ollama {
if let Some(base_url) = url {
if let Some(key) = api_key {
let url = format!("{}{}", base_url, "/api/generate");
// TODO: we need a router to handle the different models. Maybe in agents_capabilities_manager.rs
// assume api_key is empty

let ollama = Ollama {
model_type: self.model_type.clone(),
};
Expand Down
171 changes: 112 additions & 59 deletions src/cron_tasks/cron_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ use cron_parser::parse;
use ed25519_dalek::SecretKey as SignatureStaticKey;
use futures::Future;
use shinkai_message_primitives::{
schemas::shinkai_name::ShinkaiName,
schemas::{
inbox_name::{InboxName, InboxNameError},
shinkai_name::ShinkaiName,
},
shinkai_message::shinkai_message_schemas::{JobCreationInfo, JobMessage},
shinkai_utils::{
job_scope::JobScope,
shinkai_logging::{shinkai_log, ShinkaiLogLevel, ShinkaiLogOption},
signatures::clone_signature_secret_key,
signatures::clone_signature_secret_key, shinkai_message_builder::ShinkaiMessageBuilder,
},
};
use std::str::FromStr;
Expand All @@ -48,8 +51,9 @@ use tokio::sync::{Mutex, Semaphore};
use crate::{
agent::{error::AgentError, job_manager::JobManager, queue::job_queue_manager::JobQueueManager},
cron_tasks::web_scrapper::WebScraper,
db::{db_cron_task::CronTask, ShinkaiDB},
db::{db_cron_task::CronTask, db_errors, ShinkaiDB},
planner::kai_files::{KaiJobFile, KaiSchemaType},
schemas::{identity::Identity, inbox_permission::InboxPermission},
};

use super::youtube_checker::YoutubeChecker;
Expand All @@ -67,6 +71,9 @@ pub enum CronManagerError {
SomeError(String),
JobDequeueFailed(String),
JobCreationError(String),
StrError(String),
DBError(db_errors::ShinkaiDBError),
InboxError(InboxNameError),
}

impl From<AgentError> for CronManagerError {
Expand All @@ -75,37 +82,56 @@ impl From<AgentError> for CronManagerError {
}
}

impl From<&str> for CronManagerError {
fn from(error: &str) -> Self {
CronManagerError::StrError(error.to_string())
}
}

impl From<db_errors::ShinkaiDBError> for CronManagerError {
fn from(error: db_errors::ShinkaiDBError) -> Self {
CronManagerError::DBError(error)
}
}

impl From<InboxNameError> for CronManagerError {
fn from(error: InboxNameError) -> Self {
CronManagerError::InboxError(error)
}
}

const NUM_THREADS: usize = 2;
const CRON_INTERVAL_TIME: u64 = 60 * 1;

impl CronManager {
pub async fn new(
db: Arc<Mutex<ShinkaiDB>>,
identity_secret_key: SignatureStaticKey,
node_profile_name: ShinkaiName,
node_name: ShinkaiName,
job_manager: Arc<Mutex<JobManager>>,
) -> Self {
let cron_processing_task = CronManager::process_job_queue(
db.clone(),
node_profile_name.clone(),
node_name.clone(),
clone_signature_secret_key(&identity_secret_key),
CRON_INTERVAL_TIME,
job_manager.clone(),
|job, db, identity_sk, job_manager, node_profile_name| {
|job, db, identity_sk, job_manager, node_name, profile| {
Box::pin(CronManager::process_job_message_queued(
job,
db,
identity_sk,
job_manager,
node_profile_name,
node_name,
profile,
))
},
);

Self {
db,
identity_secret_key,
node_profile_name,
node_profile_name: node_name,
job_manager,
cron_processing_task: Some(cron_processing_task),
}
Expand All @@ -123,6 +149,7 @@ impl CronManager {
SignatureStaticKey,
Arc<Mutex<JobManager>>,
ShinkaiName,
String,
) -> Pin<Box<dyn Future<Output = Result<bool, CronManagerError>> + Send>>
+ Send
+ Sync
Expand All @@ -140,7 +167,7 @@ impl CronManager {
let is_testing = std::env::var("IS_TESTING").unwrap_or_else(|_| String::from("false")) != "false";

loop {
let jobs_to_process: HashMap<String, CronTask> = {
let jobs_to_process: HashMap<String, Vec<(String, CronTask)>> = {
let mut db_lock = db.lock().await;
db_lock.get_all_cron_tasks_from_all_profiles().unwrap_or(HashMap::new())
};
Expand All @@ -152,50 +179,54 @@ impl CronManager {
let mut handles = Vec::new();

// Spawn tasks based on filtered job IDs
for (_, cron_task) in jobs_to_process {
if !is_testing && !Self::should_execute_cron_task(&cron_task, cron_time_interval) {
shinkai_log(
ShinkaiLogOption::CronExecution,
ShinkaiLogLevel::Debug,
format!("Cron Job not ready to be executed: {:?}", cron_task).as_str(),
);
continue;
}
for (profile, tasks) in jobs_to_process {
for (_, cron_task) in tasks {
if !is_testing && !Self::should_execute_cron_task(&cron_task, cron_time_interval) {
shinkai_log(
ShinkaiLogOption::CronExecution,
ShinkaiLogLevel::Debug,
format!("Cron Job not ready to be executed: {:?}", cron_task).as_str(),
);
continue;
}

let db_clone = db.clone();
let identity_sk_clone = clone_signature_secret_key(&identity_sk);
let job_manager_clone = job_manager.clone();
let node_profile_name_clone = node_profile_name.clone();
let job_processing_fn_clone = Arc::clone(&job_processing_fn);

let handle = tokio::spawn(async move {
let result = job_processing_fn_clone(
cron_task,
db_clone,
identity_sk_clone,
job_manager_clone,
node_profile_name_clone,
)
.await;
match result {
Ok(_) => {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Debug,
"Cron Job processed successfully",
);
}
Err(e) => {
shinkai_log(
ShinkaiLogOption::CronExecution,
ShinkaiLogLevel::Error,
format!("Cron Job processing failed: {:?}", e).as_str(),
);
let db_clone = db.clone();
let identity_sk_clone = clone_signature_secret_key(&identity_sk);
let job_manager_clone = job_manager.clone();
let node_profile_name_clone = node_profile_name.clone();
let job_processing_fn_clone = Arc::clone(&job_processing_fn);
let profile_clone = profile.clone();

let handle = tokio::spawn(async move {
let result = job_processing_fn_clone(
cron_task,
db_clone,
identity_sk_clone,
job_manager_clone,
node_profile_name_clone,
profile_clone,
)
.await;
match result {
Ok(_) => {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Debug,
"Cron Job processed successfully",
);
}
Err(e) => {
shinkai_log(
ShinkaiLogOption::CronExecution,
ShinkaiLogLevel::Error,
format!("Cron Job processing failed: {:?}", e).as_str(),
);
}
}
}
});
});

handles.push(handle);
handles.push(handle);
}
}
futures::future::join_all(handles).await;
tokio::time::sleep(tokio::time::Duration::from_secs(cron_time_interval)).await;
Expand All @@ -206,29 +237,28 @@ impl CronManager {
pub async fn process_job_message_queued(
cron_job: CronTask,
db: Arc<Mutex<ShinkaiDB>>,
_: SignatureStaticKey,
identity_secret_key: SignatureStaticKey,
job_manager: Arc<Mutex<JobManager>>,
node_profile_name: ShinkaiName,
profile: String,
) -> Result<bool, CronManagerError> {
shinkai_log(
ShinkaiLogOption::CronExecution,
ShinkaiLogLevel::Debug,
format!("Processing job: {:?}", cron_job).as_str(),
);

let shinkai_profile = ShinkaiName::from_node_and_profile(node_profile_name.to_string(), profile)?;
let kai_file = KaiJobFile {
schema: KaiSchemaType::CronJob(cron_job.clone()),
shinkai_profile: Some(node_profile_name.clone()),
shinkai_profile: Some(shinkai_profile.clone()),
agent_id: cron_job.agent_id.clone(),
};

let job_creation = JobCreationInfo {
scope: JobScope::new_default(),
};

eprintln!("Job Creation: {:?}", job_creation);
eprintln!("Cron job: {:?}", cron_job);

// Create Job
let job_id = job_manager
.lock()
Expand All @@ -237,7 +267,6 @@ impl CronManager {
.await?;

// Note(Nico): should we close the job after the processing?

let inbox_name_result =
JobManager::insert_kai_job_file_into_inbox(db.clone(), "cron_job".to_string(), kai_file).await;

Expand All @@ -253,6 +282,32 @@ impl CronManager {
)));
}

{
// Get the inbox name
let inbox_name = InboxName::get_job_inbox_name_from_params(job_id.clone())?;

// Add permission
let mut db = db.lock().await;
db.add_permission_with_profile(
inbox_name.to_string().as_str(),
shinkai_profile.clone(),
InboxPermission::Admin,
)?;

let cron_request_message = format!("My scheduled job \"{}\" created on \"{}\" is ready to be executed", cron_job.prompt, cron_job.created_at);
let shinkai_message = ShinkaiMessageBuilder::job_message_from_agent(
job_id.to_string(),
cron_request_message.to_string(),
"".to_string(),
identity_secret_key,
node_profile_name.node_name.clone(),
node_profile_name.node_name.clone(),
)
.unwrap();
db.add_message_to_job_inbox(&job_id.clone(), &shinkai_message)?;
db.update_smart_inbox_name(inbox_name.to_string().as_str(), cron_job.prompt.as_str())?;
}

// Add Message to Job Queue
let job_message = JobMessage {
job_id: job_id.clone(),
Expand All @@ -267,8 +322,6 @@ impl CronManager {
.await?;

Ok(true)


}

pub fn should_execute_cron_task(cron_task: &CronTask, cron_time_interval: u64) -> bool {
Expand Down Expand Up @@ -302,7 +355,7 @@ impl CronManager {
// TODO: rename this or refactor it to a manager
pub async fn add_cron_task(
&self,
profile: String,
profile: ShinkaiName,
task_id: String,
cron: String,
prompt: String,
Expand Down
Loading

0 comments on commit 89f55d6

Please sign in to comment.