Skip to content

[PLAT 2527]: Fix Cron Jobs #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 77 additions & 13 deletions core/application/src/state/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::net::IpAddr;
use std::ops::DerefMut;
use std::time::Duration;

use ethers::abi::AbiDecode;
use ethers::abi::{AbiDecode, AbiEncode};
use ethers::core::k256::elliptic_curve::rand_core::SeedableRng;
use ethers::types::{Transaction as EthersTransaction, H160};
use fleek_crypto::{
Expand All @@ -29,6 +29,8 @@ use lightning_interfaces::types::{
ExecutionData,
ExecutionError,
Job,
JobInfo,
JobInput,
JobStatus,
Metadata,
MintInfo,
Expand Down Expand Up @@ -321,7 +323,13 @@ impl<B: Backend> StateExecutor<B> {
self.update_content_registry(txn.payload.sender, updates)
},
UpdateMethod::IncrementNonce {} => TransactionResponse::Success(ExecutionData::None),
UpdateMethod::AddJobs { jobs } => self.add_jobs(jobs),
UpdateMethod::AddJobs { jobs } => {
let sender = match self.only_account_owner(txn.payload.sender) {
Ok(account) => account,
Err(e) => return e,
};
self.add_jobs(sender, jobs)
},
UpdateMethod::RemoveJobs { jobs } => self.remove_jobs(jobs),
UpdateMethod::JobUpdates { updates } => self.update_jobs(updates),
};
Expand Down Expand Up @@ -1281,18 +1289,44 @@ impl<B: Backend> StateExecutor<B> {
TransactionResponse::Success(ExecutionData::None)
}

fn add_jobs(&self, jobs: Vec<Job>) -> TransactionResponse {
let mut jobs = jobs
.into_iter()
.map(|job| (job.hash, job))
.collect::<HashMap<_, _>>();
let job_hashes = jobs.keys().copied().collect();
let assigned_jobs = self.assign_jobs(job_hashes);
fn add_jobs(&self, sender: EthAddress, jobs: Vec<JobInput>) -> TransactionResponse {
let mut total_amount = HpUfixed::<18>::zero();
for job_input in &jobs {
total_amount += job_input.info.amount.clone();
}

let mut account = self.account_info.get(&sender).unwrap_or_default();
if account.flk_balance < total_amount {
return TransactionResponse::Revert(ExecutionError::InsufficientBalance);
}
account.flk_balance -= total_amount;
self.account_info.set(sender, account);

// Record the assignee in the job entry.
let mut job_entries = Vec::new();
for job_input in jobs {
let hash = self.compute_job_hash(&sender, &job_input.info);
if self.jobs.get(&hash).is_some() {
return TransactionResponse::Revert(ExecutionError::JobAlreadyExists);
}
if !self.is_valid_service_id(job_input.info.service) {
return TransactionResponse::Revert(ExecutionError::InvalidServiceId);
}
let job = Job {
hash,
owner: sender,
info: job_input.info.clone(),
status: None,
assignee: None,
prepaid_balance: job_input.info.amount.clone(), // Now valid
};
job_entries.push((hash, job));
}

let job_hashes = job_entries.iter().map(|(hash, _)| *hash).collect();
let assigned_jobs = self.assign_jobs(job_hashes);
for (index, node_jobs) in assigned_jobs.iter() {
for job_hash in node_jobs {
if let Some(job) = jobs.get_mut(job_hash) {
if let Some((_, job)) = job_entries.iter_mut().find(|(h, _)| h == job_hash) {
job.assignee = Some(*index);
}
}
Expand All @@ -1312,8 +1346,8 @@ impl<B: Backend> StateExecutor<B> {
}

// Save jobs.
for (job_hash, job) in jobs {
self.jobs.set(job_hash, job);
for (hash, job) in job_entries {
self.jobs.set(hash, job);
}

TransactionResponse::Success(ExecutionData::None)
Expand Down Expand Up @@ -1343,6 +1377,20 @@ impl<B: Backend> StateExecutor<B> {
fn update_jobs(&self, updates: BTreeMap<[u8; 32], JobStatus>) -> TransactionResponse {
for (job_hash, status) in updates {
if let Some(mut job) = self.jobs.get(&job_hash) {
if status.success && job.prepaid_balance >= job.info.amount {
let node_index = job.assignee.unwrap();
let node_owner = self.node_info.get(&node_index).unwrap().owner;
let mut node_account = self.account_info.get(&node_owner).unwrap_or_default();
node_account.flk_balance += job.info.amount.clone();
job.prepaid_balance -= job.info.amount.clone();
self.account_info.set(node_owner, node_account);
} else if status.success {
tracing::warn!(
"Job {} succeeded but insufficient prepaid balance: {}",
job_hash.encode_hex(),
job.prepaid_balance
);
}
job.status = Some(status);
self.jobs.set(job_hash, job);
}
Expand Down Expand Up @@ -2040,4 +2088,20 @@ impl<B: Backend> StateExecutor<B> {

assigned_jobs
}

/// Generates a unique hash based on sender and job details
fn compute_job_hash(&self, owner: &EthAddress, info: &JobInfo) -> [u8; 32] {
let mut hasher = Sha3_256::new();
hasher.update(owner.0);
hasher.update(info.frequency.to_le_bytes());
hasher.update(info.amount.get_value().to_le_bytes::<32>());
hasher.update(info.service.to_le_bytes());
hasher.update(&info.arguments);
hasher.finalize().into()
}

/// Validates service IDs early
fn is_valid_service_id(&self, service_id: ServiceId) -> bool {
self.services.get(&service_id).is_some()
}
}
28 changes: 24 additions & 4 deletions core/application/src/state/executor/epoch_change.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::Ordering;
use std::collections::HashMap;

use ethers::abi::AbiEncode;
use fleek_crypto::TransactionSender;
use fxhash::FxHashMap;
use hp_fixed::unsigned::HpUfixed;
Expand All @@ -13,6 +14,8 @@ use lightning_interfaces::types::{
Epoch,
ExecutionData,
ExecutionError,
Job,
JobInput,
Metadata,
NodeIndex,
NodeInfo,
Expand Down Expand Up @@ -1172,7 +1175,7 @@ impl<B: Backend> StateExecutor<B> {

fn reassign_jobs(&self) {
// Get all current jobs.
let jobs = self
let jobs: Vec<Job> = self
.jobs
.as_map()
.values()
Expand All @@ -1184,11 +1187,28 @@ impl<B: Backend> StateExecutor<B> {
})
.collect();

// Clear the tables.
// Clear existing job assignments
self.jobs.clear();
self.assigned_jobs.clear();

// Add these jobs as new jobs.
self.add_jobs(jobs);
// Reassign each job with its original owner
for job in jobs {
let job_input = vec![JobInput { info: job.info }];
match self.add_jobs(job.owner, job_input) {
TransactionResponse::Success(_) => {
if let Some(mut job_entry) = self.jobs.get(&job.hash) {
job_entry.prepaid_balance = job.prepaid_balance;
self.jobs.set(job.hash, job_entry);
}
},
TransactionResponse::Revert(err) => {
tracing::warn!(
"Failed to reassign job {}: {:?}",
job.hash.encode_hex(),
err
);
},
}
}
}
}
24 changes: 8 additions & 16 deletions core/e2e/tests/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use axum::{Extension, Router};
use fleek_crypto::NodePublicKey;
use fn_sdk::header::{read_header, TransportDetail};
use futures::SinkExt;
use hp_fixed::unsigned::HpUfixed;
use lightning_application::app::Application;
use lightning_archive::archive::Archive;
use lightning_blockstore::blockstore::Blockstore;
Expand All @@ -15,7 +16,7 @@ use lightning_committee_beacon::CommitteeBeaconComponent;
use lightning_e2e::swarm::Swarm;
use lightning_forwarder::Forwarder;
use lightning_interfaces::fdi::BuildGraph;
use lightning_interfaces::types::{Job, JobInfo, UpdateMethod};
use lightning_interfaces::types::{JobInfo, JobInput, UpdateMethod};
use lightning_interfaces::{
fdi,
partial_node_components,
Expand Down Expand Up @@ -192,38 +193,29 @@ async fn test_watcher() {
let node = pubkeys[2];

// Given: Some jobs.
let job1 = Job {
hash: [0; 32],
let job1 = JobInput {
info: JobInfo {
frequency: 1,
amount: 0,
amount: HpUfixed::<18>::from(0_u32),
service: 0,
arguments: vec![0; 4].into_boxed_slice(),
},
status: None,
assignee: None,
};
let job2 = Job {
hash: [1; 32],
let job2 = JobInput {
info: JobInfo {
frequency: 1,
amount: 0,
amount: HpUfixed::<18>::from(0_u32),
service: 0,
arguments: vec![1; 4].into_boxed_slice(),
},
status: None,
assignee: None,
};
let job3 = Job {
hash: [2; 32],
let job3 = JobInput {
info: JobInfo {
frequency: 1,
amount: 0,
amount: HpUfixed::<18>::from(0_u32),
service: 0,
arguments: vec![2; 4].into_boxed_slice(),
},
status: None,
assignee: None,
};

// When: we submit these jobs for execution.
Expand Down
1 change: 1 addition & 0 deletions core/types/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,5 @@ pub enum ExecutionError {
InvalidClientKeyLength,
DuplicateClientKey,
MissingClientKey,
JobAlreadyExists,
}
30 changes: 28 additions & 2 deletions core/types/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,23 +616,49 @@ impl TryFrom<String> for Tokens {
pub struct Job {
/// The hash of the job.
pub hash: [u8; 32],
/// Track the submitter
pub owner: EthAddress,
/// Information about the job for execution purposes.
pub info: JobInfo,
/// The status of the most recent execution of a job.
pub status: Option<JobStatus>,
/// The node to which this job was assigned.
pub assignee: Option<NodeIndex>,
/// Track remaining funds.
pub prepaid_balance: HpUfixed<18>,
}

#[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize, Serialize, JsonSchema)]
pub struct JobInput {
pub info: JobInfo,
}

#[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize, Serialize, JsonSchema)]
pub struct JobInfo {
/// The frequency in which this job should be performed.
pub frequency: u32,
/// Amount prepaid.
pub amount: u32,
pub amount: HpUfixed<18>,
/// The service that will execute the function.
pub service: ServiceId,
/// The arguments for the job.
/// Arguments for the job, encoded as a JSON string in UTF-8 bytes.
///
/// This field contains the arguments to be passed to the JavaScript function executed by
/// the service. Clients must encode arguments as a JSON string (e.g., `{"foo": "bar", "baz":
/// 42}`) and convert it to UTF-8 bytes. Nodes will decode these bytes back to a JSON
/// string and parse it for execution.
///
/// Example in JavaScript:
/// ```javascript
/// const args = { foo: "bar", baz: 42 };
/// const argsBytes = Buffer.from(JSON.stringify(args), "utf8");
/// // Use argsBytes in the AddJobs transaction
/// ```
/// Example in Rust:
/// ```rust
/// let args = serde_json::json!({"foo": "bar", "baz": 42});
/// let args_bytes: Box<[u8]> = serde_json::to_vec(&args).unwrap().into_boxed_slice();
/// ```
pub arguments: Box<[u8]>,
}

Expand Down
11 changes: 5 additions & 6 deletions core/types/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
use super::{
Epoch,
Event,
Job,
JobInput,
JobStatus,
ProofOfConsensus,
ProofOfMisbehavior,
Expand Down Expand Up @@ -505,7 +505,7 @@ pub enum UpdateMethod {
// instead of `Job` that doesn't expose fields
// used for internal purposes, such as `assignee`.
/// Add new jobs to the jobs table and assign them to nodes.
AddJobs { jobs: Vec<Job> },
AddJobs { jobs: Vec<JobInput> },
/// Remove these jobs from the jobs table and unassigned them.
RemoveJobs { jobs: Vec<[u8; 32]> },
/// Updates about the jobs' most recent executions.
Expand Down Expand Up @@ -794,14 +794,13 @@ impl ToDigest for UpdatePayload {
},
UpdateMethod::AddJobs { jobs } => {
transcript_builder = transcript_builder.with("transaction_name", &"add_jobs");
for job in jobs.iter() {
for (idx, job) in jobs.iter().enumerate() {
transcript_builder = transcript_builder
.with_prefix(job.hash.encode_hex())
.with_prefix(idx.to_string())
.with("service", &job.info.service)
.with("frequency", &job.info.frequency)
.with("arguments", &job.info.arguments.as_ref())
.with("amount", &job.info.amount)
.with("assignee", &job.assignee)
.with("amount", &HpUfixedWrapper(job.info.amount.clone()));
}
},
UpdateMethod::RemoveJobs { jobs } => {
Expand Down