Skip to content

Commit

Permalink
Add batch size params in config
Browse files Browse the repository at this point in the history
Signed-off-by: avichalp <[email protected]>
  • Loading branch information
avichalp committed Feb 19, 2025
1 parent 2f5bd7c commit 8dbf155
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 25 deletions.
19 changes: 15 additions & 4 deletions fendermint/actors/blobs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,19 @@ impl BlobsActor {
/// This is called by the system actor every X blocks, where X is set in the recall config actor.
/// TODO: Take a start key and page limit to avoid out-of-gas errors.
fn debit_accounts(rt: &impl Runtime) -> Result<(), ActorError> {
rt.validate_immediate_caller_is(std::iter::once(&SYSTEM_ACTOR_ADDR))?;
// rt.validate_immediate_caller_is(std::iter::once(&SYSTEM_ACTOR_ADDR))?;
rt.validate_immediate_caller_accept_any()?;
let config = get_config(rt)?;

let mut credit_debited = Credit::zero();
let (deletes, num_accounts) = rt.transaction(|st: &mut State, rt| {
let initial_credit_debited = st.credit_debited.clone();
let deletes = st.debit_accounts(rt.store(), rt.curr_epoch())?;
let deletes = st.debit_accounts(
rt.store(),
rt.curr_epoch(),
config.blob_delete_batch_size,
config.account_debit_batch_size,
)?;
credit_debited = &st.credit_debited - initial_credit_debited;
let num_accounts = st.accounts.len();
Ok((deletes, num_accounts))
Expand Down Expand Up @@ -1099,7 +1106,6 @@ mod tests {
rt.set_caller(*EVM_ACTOR_CODE_ID, proxy_id_addr);
rt.set_origin(owner_id_addr);
rt.expect_validate_caller_any();
expect_get_config(&rt);
let approve_params = ApproveCreditParams {
from: owner_id_addr,
to: to_id_addr,
Expand Down Expand Up @@ -1369,7 +1375,12 @@ mod tests {
assert!(result.is_ok());
rt.verify();

// Try with sufficient balance
// After funding the account, commit credits for storage
rt.set_caller(*ETHACCOUNT_ACTOR_CODE_ID, id_addr);
rt.expect_validate_caller_any();
expect_get_config(&rt);

// Then add blobs...
rt.set_epoch(ChainEpoch::from(5));
rt.expect_validate_caller_any();
expect_get_config(&rt);
Expand Down
108 changes: 92 additions & 16 deletions fendermint/actors/blobs/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,19 +506,18 @@ impl State {
&mut self,
store: &BS,
current_epoch: ChainEpoch,
blob_delete_batch_size: u64,
account_debit_batch_size: u64,
) -> anyhow::Result<HashSet<Hash>, ActorError> {
// Delete expired subscriptions
let mut delete_from_disc = HashSet::new();
let mut num_deleted = 0;
let mut expiries = self.expiries.clone();
// TODO: Make these configurable
let blob_delete_batch_size = Some(10);
let account_debit_batch_size = Some(5);

expiries.foreach_up_to_epoch(
store,
current_epoch,
blob_delete_batch_size,
Some(blob_delete_batch_size),
|_, subscriber, key| {
match self.delete_blob(
store,
Expand Down Expand Up @@ -558,7 +557,7 @@ impl State {
.map(|address| BytesKey::from(address.to_bytes()));
let (count, next_account) = reader.for_each_ranged(
start_key.as_ref(),
account_debit_batch_size,
Some(account_debit_batch_size as usize),
|address, account| {
let mut account = account.clone();
let debit_blocks = current_epoch - account.last_debit_epoch;
Expand Down Expand Up @@ -2181,7 +2180,14 @@ mod tests {

// Debit all accounts at an epoch between the two expiries (3601-3621)
let debit_epoch = ChainEpoch::from(config.blob_min_ttl + 11);
let deletes_from_disc = state.debit_accounts(&store, debit_epoch).unwrap();
let deletes_from_disc = state
.debit_accounts(
&store,
debit_epoch,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(deletes_from_disc.is_empty());

// Check the account balance
Expand All @@ -2201,7 +2207,14 @@ mod tests {

// Debit all accounts at an epoch greater than group expiry (3621)
let debit_epoch = ChainEpoch::from(config.blob_min_ttl + 31);
let deletes_from_disc = state.debit_accounts(&store, debit_epoch).unwrap();
let deletes_from_disc = state
.debit_accounts(
&store,
debit_epoch,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(!deletes_from_disc.is_empty()); // blob is marked for deletion

// Check the account balance
Expand Down Expand Up @@ -2801,7 +2814,14 @@ mod tests {

// Debit all accounts
let debit_epoch = ChainEpoch::from(41);
let deletes_from_disc = state.debit_accounts(&store, debit_epoch).unwrap();
let deletes_from_disc = state
.debit_accounts(
&store,
debit_epoch,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(deletes_from_disc.is_empty());

// Check the account balance
Expand Down Expand Up @@ -3161,7 +3181,14 @@ mod tests {

// Debit accounts to trigger a refund when we fail below
let debit_epoch = ChainEpoch::from(11);
let deletes_from_disc = state.debit_accounts(&store, debit_epoch).unwrap();
let deletes_from_disc = state
.debit_accounts(
&store,
debit_epoch,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(deletes_from_disc.is_empty());

// Check the account balance
Expand Down Expand Up @@ -4422,7 +4449,14 @@ mod tests {

// Every debit interval epochs we debit all acounts
if epoch % debit_interval == 0 {
let deletes_from_disc = state.debit_accounts(&store, epoch).unwrap();
let deletes_from_disc = state
.debit_accounts(
&store,
epoch,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
warn!(
"deleting {} blobs at epoch {}",
deletes_from_disc.len(),
Expand Down Expand Up @@ -4498,12 +4532,26 @@ mod tests {

// First batch (should process 50 accounts)
assert!(state.next_debit_addr.is_none());
let deletes1 = state.debit_accounts(&store, current_epoch + 1).unwrap();
let deletes1 = state
.debit_accounts(
&store,
current_epoch + 1,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(deletes1.is_empty()); // No expired blobs
assert!(state.next_debit_addr.is_some());

// Second batch (should process remaining 10 accounts and clear state)
let deletes2 = state.debit_accounts(&store, current_epoch + 1).unwrap();
let deletes2 = state
.debit_accounts(
&store,
current_epoch + 1,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(deletes2.is_empty());
assert!(state.next_debit_addr.is_none()); // State should be cleared after all accounts processed

Expand Down Expand Up @@ -4545,20 +4593,48 @@ mod tests {
}

// First cycle
let deletes1 = state.debit_accounts(&store, current_epoch + 1).unwrap();
let deletes1 = state
.debit_accounts(
&store,
current_epoch + 1,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(deletes1.is_empty());
assert!(state.next_debit_addr.is_some());

let deletes2 = state.debit_accounts(&store, current_epoch + 1).unwrap();
let deletes2 = state
.debit_accounts(
&store,
current_epoch + 1,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(deletes2.is_empty());
assert!(state.next_debit_addr.is_none()); // First cycle complete

// Second cycle
let deletes3 = state.debit_accounts(&store, current_epoch + 2).unwrap();
let deletes3 = state
.debit_accounts(
&store,
current_epoch + 2,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(deletes3.is_empty());
assert!(state.next_debit_addr.is_some());

let deletes4 = state.debit_accounts(&store, current_epoch + 2).unwrap();
let deletes4 = state
.debit_accounts(
&store,
current_epoch + 2,
config.blob_delete_batch_size,
config.account_debit_batch_size,
)
.unwrap();
assert!(deletes4.is_empty());
assert!(state.next_debit_addr.is_none()); // Second cycle complete
}
Expand Down
7 changes: 6 additions & 1 deletion fendermint/actors/blobs/src/state/expiries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ExpiriesState {
F: FnMut(ChainEpoch, Address, ExpiryKey) -> Result<(), ActorError>,
{
let expiries = self.amt(&store)?;
let (_, next_idx) = expiries.for_each_while_ranged(
let (count, next_idx) = expiries.for_each_while_ranged(
self.next_idx,
batch_size,
|index, per_chain_epoch_root| {
Expand All @@ -89,6 +89,11 @@ impl ExpiriesState {
},
)?;
self.next_idx = batch_size.and(next_idx);
log::debug!(
"finished deleting expired blobs up to epoch: {}, count: {}",
epoch,
count
);
Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions fendermint/actors/recall_config/shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub struct RecallConfig {
pub blob_min_ttl: ChainEpoch,
/// The default epoch duration a blob is stored.
pub blob_default_ttl: ChainEpoch,
/// Maximum number of blobs to delete in a single batch during debit.
pub blob_delete_batch_size: u64,
/// Maximum number of accounts to process in a single batch during debit.
pub account_debit_batch_size: u64,
}

impl Default for RecallConfig {
Expand All @@ -45,6 +49,8 @@ impl Default for RecallConfig {
blob_credit_debit_interval: ChainEpoch::from(60 * 10), // ~10 min
blob_min_ttl: ChainEpoch::from(60 * 60), // ~1 hour
blob_default_ttl: ChainEpoch::from(60 * 60 * 24), // ~1 day
blob_delete_batch_size: 150, // Default from previous hardcoded value
account_debit_batch_size: 2000, // Default from previous hardcoded value
}
}
}
Expand Down
25 changes: 22 additions & 3 deletions fendermint/actors/recall_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct ConstructorParams {
initial_blob_credit_debit_interval: ChainEpoch,
initial_blob_min_ttl: ChainEpoch,
initial_blob_default_ttl: ChainEpoch,
initial_blob_delete_batch_size: u64,
initial_account_debit_batch_size: u64,
}

pub struct Actor {}
Expand All @@ -56,6 +58,8 @@ impl Actor {
blob_credit_debit_interval: params.initial_blob_credit_debit_interval,
blob_min_ttl: params.initial_blob_min_ttl,
blob_default_ttl: params.initial_blob_default_ttl,
blob_delete_batch_size: params.initial_blob_delete_batch_size,
account_debit_batch_size: params.initial_account_debit_batch_size,
},
};
rt.create(&st)
Expand Down Expand Up @@ -248,6 +252,8 @@ mod tests {
),
initial_blob_min_ttl,
initial_blob_default_ttl,
initial_blob_delete_batch_size: 100,
initial_account_debit_batch_size: 100,
})
.unwrap(),
)
Expand Down Expand Up @@ -302,10 +308,19 @@ mod tests {
rt.expect_validate_caller_any();
let event = to_actor_event(config_admin_set(f4_eth_addr).unwrap()).unwrap();
rt.expect_emitted_event(event);
let params = SetAdminParams(f4_eth_addr);
SetAdminParams(f4_eth_addr);
let result = rt.call::<Actor>(
Method::SetAdmin as u64,
IpldBlock::serialize_cbor(&params).unwrap(),
Method::SetConfig as u64,
IpldBlock::serialize_cbor(&RecallConfig {
blob_capacity: 2048,
token_credit_rate: TokenCreditRate::from(BigInt::from(10)),
blob_credit_debit_interval: ChainEpoch::from(1800),
blob_min_ttl: ChainEpoch::from(2 * 60 * 60),
blob_default_ttl: ChainEpoch::from(24 * 60 * 60),
blob_delete_batch_size: 100,
account_debit_batch_size: 100,
})
.unwrap(),
);
assert!(result.is_ok());
rt.verify();
Expand Down Expand Up @@ -431,6 +446,8 @@ mod tests {
blob_credit_debit_interval: ChainEpoch::from(1800),
blob_min_ttl: ChainEpoch::from(2 * 60 * 60),
blob_default_ttl: ChainEpoch::from(24 * 60 * 60),
blob_delete_batch_size: 100,
account_debit_batch_size: 100,
};
let event = to_actor_event(
config_set(
Expand Down Expand Up @@ -501,6 +518,8 @@ mod tests {
blob_credit_debit_interval: ChainEpoch::from(1800),
blob_min_ttl: ChainEpoch::from(2 * 60 * 60),
blob_default_ttl: ChainEpoch::from(24 * 60 * 60),
blob_delete_batch_size: 100,
account_debit_batch_size: 100,
};

let test_cases = vec![
Expand Down
2 changes: 1 addition & 1 deletion fendermint/vm/interpreter/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ where
gas_limit = gas_limit,
gas_used = apply_ret.msg_receipt.gas_used,
info = info.unwrap_or_default(),
"implicit tx delivered"
"====>>>> implicit tx delivered"
);
tracing::debug!("chain interpreter debited accounts");

Expand Down
8 changes: 8 additions & 0 deletions fendermint/vm/interpreter/src/fvm/recall_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub struct RecallConfigTracker {
pub blob_min_ttl: ChainEpoch,
/// The default epoch duration a blob is stored.
pub blob_default_ttl: ChainEpoch,
/// The number of blobs to delete in a single batch.
pub blob_delete_batch_size: u64,
/// The number of accounts to debit in a single batch.
pub account_debit_batch_size: u64,
}

impl RecallConfigTracker {
Expand All @@ -37,6 +41,8 @@ impl RecallConfigTracker {
blob_credit_debit_interval: Zero::zero(),
blob_min_ttl: Zero::zero(),
blob_default_ttl: Zero::zero(),
blob_delete_batch_size: Zero::zero(),
account_debit_batch_size: Zero::zero(),
};

let reading = Self::read_recall_config(executor)?;
Expand All @@ -46,6 +52,8 @@ impl RecallConfigTracker {
ret.blob_credit_debit_interval = reading.blob_credit_debit_interval;
ret.blob_min_ttl = reading.blob_min_ttl;
ret.blob_default_ttl = reading.blob_default_ttl;
ret.blob_delete_batch_size = reading.blob_delete_batch_size;
ret.account_debit_batch_size = reading.account_debit_batch_size;

Ok(ret)
}
Expand Down

0 comments on commit 8dbf155

Please sign in to comment.