Skip to content

Commit

Permalink
refactor: hamt-ify blob reader actor (#537)
Browse files Browse the repository at this point in the history
Closes #448

Signed-off-by: Bruno Calza <[email protected]>
  • Loading branch information
brunocalza authored Feb 20, 2025
1 parent 5c37881 commit 2b3dc82
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 54 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions fendermint/actors/blob_reader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ crate-type = ["cdylib", "lib"]

[dependencies]
fil_actors_runtime = { workspace = true }
fvm_ipld_blockstore = { workspace = true }
fvm_ipld_encoding = { workspace = true }
fvm_shared = { workspace = true }
frc42_dispatch = { workspace = true }
Expand All @@ -25,6 +26,8 @@ serde = { workspace = true, features = ["derive"] }
fendermint_actor_blobs_shared = { path = "../blobs/shared" }
fendermint_actor_machine = { path = "../machine" }

recall_ipld = { path = "../../../recall/ipld" }

[dev-dependencies]
fil_actors_evm_shared = { workspace = true }
fil_actors_runtime = { workspace = true, features = ["test_utils"] }
Expand Down
30 changes: 14 additions & 16 deletions fendermint/actors/blob_reader/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
// Copyright 2021-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::shared::{
CloseReadRequestParams, GetOpenReadRequestsParams, GetReadRequestStatusParams, Method,
OpenReadRequestParams, OpenReadRequestTuple, ReadRequestStatus, SetReadRequestPendingParams,
State, BLOB_READER_ACTOR_NAME,
};
use fendermint_actor_blobs_shared::state::Hash;
use fendermint_actor_machine::events::emit_evm_event;
use fil_actors_runtime::{
Expand All @@ -15,14 +20,6 @@ use recall_sol_facade::blob_reader::{
read_request_closed, read_request_opened, read_request_pending,
};

use crate::shared::{
CloseReadRequestParams, GetOpenReadRequestsParams, GetReadRequestStatusParams, Method,
OpenReadRequestParams, ReadRequestStatus, SetReadRequestPendingParams, State,
BLOB_READER_ACTOR_NAME,
};

type OpenReadRequestTuple = (Hash, Hash, u32, u32, Address, u64);

#[cfg(feature = "fil-actor")]
fil_actors_runtime::wasm_trampoline!(ReadReqActor);

Expand All @@ -31,7 +28,7 @@ pub struct ReadReqActor;
impl ReadReqActor {
fn constructor(rt: &impl Runtime) -> Result<(), ActorError> {
rt.validate_immediate_caller_is(std::iter::once(&SYSTEM_ACTOR_ADDR))?;
let state = State::default();
let state = State::new(rt.store())?;
rt.create(&state)
}

Expand All @@ -43,6 +40,7 @@ impl ReadReqActor {

let id = rt.transaction(|st: &mut State, _rt| {
st.open_read_request(
rt.store(),
params.hash,
params.offset,
params.len,
Expand Down Expand Up @@ -71,15 +69,18 @@ impl ReadReqActor {
params: GetOpenReadRequestsParams,
) -> Result<Vec<OpenReadRequestTuple>, ActorError> {
rt.validate_immediate_caller_accept_any()?;
Ok(rt.state::<State>()?.get_open_read_requests(params.0))
rt.state::<State>()?
.get_open_read_requests(rt.store(), params.0)
}

fn get_read_request_status(
rt: &impl Runtime,
params: GetReadRequestStatusParams,
) -> Result<Option<ReadRequestStatus>, ActorError> {
rt.validate_immediate_caller_accept_any()?;
let status = rt.state::<State>()?.get_read_request_status(params.0);
let status = rt
.state::<State>()?
.get_read_request_status(rt.store(), params.0)?;
Ok(status)
}

Expand All @@ -88,9 +89,7 @@ impl ReadReqActor {
params: CloseReadRequestParams,
) -> Result<(), ActorError> {
rt.validate_immediate_caller_is(std::iter::once(&SYSTEM_ACTOR_ADDR))?;

rt.transaction(|st: &mut State, _| st.close_read_request(params.0))?;

rt.transaction(|st: &mut State, _| st.close_read_request(rt.store(), params.0))?;
emit_evm_event(rt, read_request_closed(&params.0 .0))
}

Expand All @@ -100,8 +99,7 @@ impl ReadReqActor {
) -> Result<(), ActorError> {
rt.validate_immediate_caller_is(std::iter::once(&SYSTEM_ACTOR_ADDR))?;

rt.transaction(|st: &mut State, _| st.set_read_request_pending(params.0))?;

rt.transaction(|st: &mut State, _| st.set_read_request_pending(rt.store(), params.0))?;
emit_evm_event(rt, read_request_pending(&params.0 .0))
}

Expand Down
6 changes: 4 additions & 2 deletions fendermint/actors/blob_reader/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub const BLOB_READER_ACTOR_ID: ActorID = 67;
pub const BLOB_READER_ACTOR_ADDR: Address = Address::new_id(BLOB_READER_ACTOR_ID);

/// The status of a read request.
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
pub enum ReadRequestStatus {
/// Read request is open and waiting to be processed
#[default]
Expand All @@ -36,7 +36,7 @@ impl fmt::Display for ReadRequestStatus {
}

/// A request to read blob data.
#[derive(Clone, Debug, Serialize_tuple, Deserialize_tuple)]
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
pub struct ReadRequest {
/// The hash of the blob to read data from.
pub blob_hash: Hash,
Expand Down Expand Up @@ -97,3 +97,5 @@ pub struct SetReadRequestPendingParams(pub Hash);
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct GetReadRequestStatusParams(pub Hash);

pub type OpenReadRequestTuple = (Hash, Hash, u32, u32, Address, u64);
130 changes: 95 additions & 35 deletions fendermint/actors/blob_reader/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,40 @@
// Copyright 2021-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use std::collections::HashMap;

use fendermint_actor_blobs_shared::state::Hash;
use fil_actors_runtime::ActorError;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::tuple::*;
use fvm_shared::{address::Address, MethodNum};
use fvm_shared::address::Address;
use log::info;

use crate::shared::{ReadRequest, ReadRequestStatus};
use crate::shared::{OpenReadRequestTuple, ReadRequest, ReadRequestStatus};
use recall_ipld::hamt;
use recall_ipld::hamt::map::TrackedFlushResult;

const MAX_READ_REQUEST_LEN: u32 = 1024 * 1024; // 1MB

/// The state represents all read requests.
#[derive(Debug, Default, Serialize_tuple, Deserialize_tuple)]
#[derive(Debug, Serialize_tuple, Deserialize_tuple)]
pub struct State {
/// Map of read requests by request ID.
pub read_requests: HashMap<Hash, ReadRequest>,
/// ReadRequests Hamt.
pub read_requests: ReadRequests,
/// Counter to sequence the requests
pub request_id_counter: u64,
}

impl State {
pub fn open_read_request(
pub fn new<BS: Blockstore>(store: &BS) -> Result<Self, ActorError> {
let read_requests = ReadRequests::new(store)?;
Ok(State {
read_requests,
request_id_counter: 0,
})
}

pub fn open_read_request<BS: Blockstore>(
&mut self,
store: &BS,
blob_hash: Hash,
offset: u32,
len: u32,
Expand All @@ -51,51 +61,73 @@ impl State {
};
info!("opening a read request onchain: {:?}", request_id);
// will create a new request even if the request parameters are the same
self.read_requests.insert(request_id, read_request);
let mut read_requests = self.read_requests.hamt(store)?;
self.read_requests
.save_tracked(read_requests.set_and_flush_tracked(&request_id, read_request)?);
Ok(request_id)
}

pub fn close_read_request(&mut self, request_id: Hash) -> Result<(), ActorError> {
if self.get_read_request_status(request_id).is_none() {
pub fn close_read_request<BS: Blockstore>(
&mut self,
store: &BS,
request_id: Hash,
) -> Result<(), ActorError> {
if self.get_read_request_status(store, request_id)?.is_none() {
return Err(ActorError::not_found(
"cannot close read request, it does not exist".to_string(),
));
}

// remove the closed request
self.read_requests.remove(&request_id);
let mut read_requests = self.read_requests.hamt(store)?;
self.read_requests
.save_tracked(read_requests.delete_and_flush_tracked(&request_id)?.0);
Ok(())
}

pub fn get_open_read_requests(
pub fn get_open_read_requests<BS: Blockstore>(
&self,
store: BS,
size: u32,
) -> Vec<(Hash, Hash, u32, u32, Address, MethodNum)> {
self.read_requests
.iter()
.filter(|(_, request)| matches!(request.status, ReadRequestStatus::Open))
.take(size as usize)
.map(|element| {
(
*element.0,
element.1.blob_hash,
element.1.offset,
element.1.len,
element.1.callback_addr,
element.1.callback_method,
)
})
.collect::<Vec<_>>()
) -> Result<Vec<OpenReadRequestTuple>, ActorError> {
let read_requests = self.read_requests.hamt(store)?;

let mut requests = Vec::new();
read_requests.for_each(|id, request| {
if matches!(request.status, ReadRequestStatus::Open) && (requests.len() as u32) < size {
requests.push((
id,
request.blob_hash,
request.offset,
request.len,
request.callback_addr,
request.callback_method,
))
}

Ok(())
})?;
Ok(requests)
}

pub fn get_read_request_status(&self, id: Hash) -> Option<ReadRequestStatus> {
self.read_requests.get(&id).map(|r| r.status.clone())
pub fn get_read_request_status<BS: Blockstore>(
&self,
store: BS,
id: Hash,
) -> Result<Option<ReadRequestStatus>, ActorError> {
let read_requests = self.read_requests.hamt(store)?;
Ok(read_requests.get(&id)?.map(|r| r.status.clone()))
}

/// Set a read request status to pending.
pub fn set_read_request_pending(&mut self, id: Hash) -> Result<(), ActorError> {
let request = self
.read_requests
.get_mut(&id)
pub fn set_read_request_pending<BS: Blockstore>(
&mut self,
store: BS,
id: Hash,
) -> Result<(), ActorError> {
let mut read_requests = self.read_requests.hamt(store)?;
let mut request = read_requests
.get(&id)?
.ok_or_else(|| ActorError::not_found(format!("read request {} not found", id)))?;

if !matches!(request.status, ReadRequestStatus::Open) {
Expand All @@ -106,6 +138,9 @@ impl State {
}

request.status = ReadRequestStatus::Pending;
self.read_requests
.save_tracked(read_requests.set_and_flush_tracked(&id, request)?);

Ok(())
}

Expand All @@ -114,3 +149,28 @@ impl State {
Hash::from(self.request_id_counter)
}
}

#[derive(Debug, Serialize_tuple, Deserialize_tuple)]
pub struct ReadRequests {
pub root: hamt::Root<Hash, ReadRequest>,
size: u64,
}

impl ReadRequests {
pub fn new<BS: Blockstore>(store: &BS) -> Result<Self, ActorError> {
let root = hamt::Root::<Hash, ReadRequest>::new(store, "read_requests")?;
Ok(Self { root, size: 0 })
}

pub fn hamt<BS: Blockstore>(
&self,
store: BS,
) -> Result<hamt::map::Hamt<BS, Hash, ReadRequest>, ActorError> {
self.root.hamt(store, self.size)
}

pub fn save_tracked(&mut self, tracked_flush_result: TrackedFlushResult<Hash, ReadRequest>) {
self.root = tracked_flush_result.root;
self.size = tracked_flush_result.size;
}
}
2 changes: 1 addition & 1 deletion fendermint/vm/interpreter/src/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ impl GenesisBuilder {
.create_custom_actor(
fendermint_actor_blob_reader::BLOB_READER_ACTOR_NAME,
blob_reader::BLOB_READER_ACTOR_ID,
&fendermint_actor_blob_reader::State::default(),
&fendermint_actor_blob_reader::State::new(&state.store())?,
TokenAmount::zero(),
None,
)
Expand Down

0 comments on commit 2b3dc82

Please sign in to comment.