diff --git a/src/ic_oss/src/agent.rs b/src/ic_oss/src/agent.rs new file mode 100644 index 0000000..6f95e2d --- /dev/null +++ b/src/ic_oss/src/agent.rs @@ -0,0 +1,62 @@ +use candid::{ + utils::{encode_args, ArgumentEncoder}, + CandidType, Decode, Principal, +}; +use ic_agent::{identity::Identity, Agent}; +use ic_oss_types::format_error; + +pub async fn build_agent(host: &str, identity: Box) -> Result { + let agent = Agent::builder() + .with_url(host) + .with_boxed_identity(identity) + .with_verify_query_signatures(true) + .build() + .map_err(format_error)?; + if host.starts_with("http://") { + agent.fetch_root_key().await.map_err(format_error)?; + } + + Ok(agent) +} + +pub async fn update_call( + agent: &Agent, + canister_id: &Principal, + method_name: &str, + args: In, +) -> Result +where + In: ArgumentEncoder + Send, + Out: CandidType + for<'a> candid::Deserialize<'a>, +{ + let input = encode_args(args).map_err(format_error)?; + let res = agent + .update(canister_id, method_name) + .with_arg(input) + .call_and_wait() + .await + .map_err(format_error)?; + let output = Decode!(res.as_slice(), Out).map_err(format_error)?; + Ok(output) +} + +pub async fn query_call( + agent: &Agent, + canister_id: &Principal, + method_name: &str, + args: In, +) -> Result +where + In: ArgumentEncoder + Send, + Out: CandidType + for<'a> candid::Deserialize<'a>, +{ + let input = encode_args(args).map_err(format_error)?; + let res = agent + .query(canister_id, method_name) + .with_arg(input) + .call() + .await + .map_err(format_error)?; + let output = Decode!(res.as_slice(), Out).map_err(format_error)?; + Ok(output) +} diff --git a/src/ic_oss/src/file.rs b/src/ic_oss/src/bucket.rs similarity index 51% rename from src/ic_oss/src/file.rs rename to src/ic_oss/src/bucket.rs index c2aabc6..a75b70e 100644 --- a/src/ic_oss/src/file.rs +++ b/src/ic_oss/src/bucket.rs @@ -1,7 +1,7 @@ use bytes::{Bytes, BytesMut}; -use candid::{CandidType, Decode, Encode, Principal}; +use candid::{CandidType, Principal}; use ic_agent::Agent; -use ic_oss_types::{crc32, file::*, format_error}; +use ic_oss_types::{bucket::*, crc32, file::*, folder::*, format_error, ByteN}; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use sha3::{Digest, Sha3_256}; @@ -11,6 +11,8 @@ use tokio::sync::{mpsc, RwLock, Semaphore}; use tokio_stream::StreamExt; use tokio_util::codec::{Decoder, FramedRead}; +use crate::agent::{query_call, update_call}; + #[derive(Clone)] pub struct Client { chunk_size: u32, @@ -51,6 +53,237 @@ impl Client { } } + /// the caller of agent should be canister controller + pub async fn admin_set_managers(&self, args: BTreeSet) -> Result<(), String> { + update_call(&self.agent, &self.bucket, "admin_set_managers", (args,)).await? + } + + /// the caller of agent should be canister controller + pub async fn admin_set_auditors(&self, args: BTreeSet) -> Result<(), String> { + update_call(&self.agent, &self.bucket, "admin_set_auditors", (args,)).await? + } + + /// the caller of agent should be canister controller + pub async fn admin_update_bucket(&self, args: UpdateBucketInput) -> Result<(), String> { + update_call(&self.agent, &self.bucket, "admin_update_bucket", (args,)).await? + } + + pub async fn get_bucket_info(&self) -> Result { + query_call( + &self.agent, + &self.bucket, + "get_bucket_info", + (&self.access_token,), + ) + .await? + } + + pub async fn get_file_info(&self, id: u32) -> Result { + query_call( + &self.agent, + &self.bucket, + "get_file_info", + (id, &self.access_token), + ) + .await? + } + + pub async fn get_file_info_by_hash(&self, hash: ByteN<32>) -> Result { + query_call( + &self.agent, + &self.bucket, + "get_file_info_by_hash", + (hash, &self.access_token), + ) + .await? + } + + pub async fn get_file_ancestors(&self, id: u32) -> Result, String> { + query_call( + &self.agent, + &self.bucket, + "get_file_ancestors", + (id, &self.access_token), + ) + .await? + } + + pub async fn get_file_chunks( + &self, + id: u32, + index: u32, + take: Option, + ) -> Result, String> { + query_call( + &self.agent, + &self.bucket, + "get_file_chunks", + (id, index, take, &self.access_token), + ) + .await? + } + + pub async fn list_files( + &self, + parent: u32, + prev: Option, + take: Option, + ) -> Result, String> { + query_call( + &self.agent, + &self.bucket, + "list_files", + (parent, prev, take, &self.access_token), + ) + .await? + } + + pub async fn get_folder_info(&self, id: u32) -> Result { + query_call( + &self.agent, + &self.bucket, + "get_folder_info", + (id, &self.access_token), + ) + .await? + } + + pub async fn get_folder_ancestors(&self, id: u32) -> Result, String> { + query_call( + &self.agent, + &self.bucket, + "get_folder_ancestors", + (id, &self.access_token), + ) + .await? + } + + pub async fn list_folders(&self, parent: u32) -> Result, String> { + query_call( + &self.agent, + &self.bucket, + "list_folders", + (parent, &self.access_token), + ) + .await? + } + + pub async fn create_file(&self, file: CreateFileInput) -> Result { + update_call( + &self.agent, + &self.bucket, + "create_file", + (file, &self.access_token), + ) + .await? + } + + pub async fn update_file_chunk( + &self, + input: UpdateFileChunkInput, + ) -> Result { + update_call( + &self.agent, + &self.bucket, + "update_file_chunk", + (input, &self.access_token), + ) + .await? + } + + pub async fn update_file_info( + &self, + input: UpdateFileInput, + ) -> Result { + update_call( + &self.agent, + &self.bucket, + "update_file_info", + (input, &self.access_token), + ) + .await? + } + + pub async fn move_file(&self, input: MoveInput) -> Result { + update_call( + &self.agent, + &self.bucket, + "move_file", + (input, &self.access_token), + ) + .await? + } + + pub async fn delete_file(&self, id: u32) -> Result { + update_call( + &self.agent, + &self.bucket, + "delete_file", + (id, &self.access_token), + ) + .await? + } + + pub async fn batch_delete_subfiles( + &self, + parent: u32, + ids: BTreeSet, + ) -> Result, String> { + update_call( + &self.agent, + &self.bucket, + "batch_delete_subfiles", + (parent, ids, &self.access_token), + ) + .await? + } + + pub async fn create_folder( + &self, + input: CreateFolderInput, + ) -> Result { + update_call( + &self.agent, + &self.bucket, + "create_folder", + (input, &self.access_token), + ) + .await? + } + + pub async fn update_folder_info( + &self, + input: UpdateFolderInput, + ) -> Result { + update_call( + &self.agent, + &self.bucket, + "update_folder_info", + (input, &self.access_token), + ) + .await? + } + + pub async fn move_folder(&self, input: MoveInput) -> Result { + update_call( + &self.agent, + &self.bucket, + "move_folder", + (input, &self.access_token), + ) + .await? + } + + pub async fn delete_folder(&self, id: u32) -> Result { + update_call( + &self.agent, + &self.bucket, + "delete_folder", + (id, &self.access_token), + ) + .await? + } + pub async fn upload( &self, ar: T, @@ -76,18 +309,10 @@ impl Client { status: Some(1), ..file }; - let res = self - .agent - .update(&self.bucket, "create_file") - .with_arg(Encode!(&file, &self.access_token).map_err(format_error)?) - .call_and_wait() - .await - .map_err(format_error)?; - let file_output = Decode!(res.as_slice(), Result) - .map_err(format_error)??; + let res = self.create_file(file).await?; progress(size as usize); return Ok(UploadFileChunksResult { - id: file_output.id, + id: res.id, uploaded: size as usize, uploaded_chunks: BTreeSet::new(), error: None, @@ -96,17 +321,9 @@ impl Client { } // create file + let res = self.create_file(file).await?; let res = self - .agent - .update(&self.bucket, "create_file") - .with_arg(Encode!(&file, &self.access_token).map_err(format_error)?) - .call_and_wait() - .await - .map_err(format_error)?; - let file_output = - Decode!(res.as_slice(), Result).map_err(format_error)??; - let res = self - .upload_chunks(ar, file_output.id, &BTreeSet::new(), progress) + .upload_chunks(ar, res.id, &BTreeSet::new(), progress) .await; Ok(res) } @@ -177,26 +394,22 @@ impl Client { tokio::spawn(async move { let res = async { let checksum = crc32(&chunk); - let args = Encode!( - &UpdateFileChunkInput { - id, - chunk_index, - content: ByteBuf::from(chunk.to_vec()), - crc32: Some(checksum), - }, - &access_token + let _: Result = update_call( + &agent, + &bucket, + "update_file_chunk", + ( + UpdateFileChunkInput { + id, + chunk_index, + content: ByteBuf::from(chunk.to_vec()), + crc32: Some(checksum), + }, + &access_token, + ), ) - .map_err(format_error)?; - - let res = agent - .update(&bucket, "update_file_chunk") - .with_arg(args) - .call_and_wait() - .await - .map_err(format_error)?; - let _ = - Decode!(res.as_slice(), Result) - .map_err(format_error)??; + .await?; + Ok(()) } .await; @@ -231,24 +444,14 @@ impl Client { let (hash, _) = futures::future::try_join(uploading_loop, uploading_result).await?; // commit file - let args = Encode!( - &UpdateFileInput { + let _ = self + .update_file_info(UpdateFileInput { id, hash: Some(hash.into()), status: Some(1), ..Default::default() - }, - &self.access_token - ) - .map_err(format_error)?; - - let _ = self - .agent - .update(&self.bucket, "update_file_info") - .with_arg(args) - .call_and_wait() - .await - .map_err(format_error)?; + }) + .await?; Ok::<(), String>(()) } .await; diff --git a/src/ic_oss/src/lib.rs b/src/ic_oss/src/lib.rs index e380250..2f8e4b2 100644 --- a/src/ic_oss/src/lib.rs +++ b/src/ic_oss/src/lib.rs @@ -1,4 +1,5 @@ -pub mod file; +pub mod agent; +pub mod bucket; #[cfg(test)] mod tests { diff --git a/src/ic_oss_cli/src/file.rs b/src/ic_oss_cli/src/file.rs index 6cac49d..356228d 100644 --- a/src/ic_oss_cli/src/file.rs +++ b/src/ic_oss_cli/src/file.rs @@ -2,7 +2,11 @@ use chrono::prelude::*; use ic_oss_types::{file::*, format_error}; use tokio::{time, time::Duration}; -pub async fn upload_file(cli: &ic_oss::file::Client, file: &str, retry: u8) -> Result<(), String> { +pub async fn upload_file( + cli: &ic_oss::bucket::Client, + file: &str, + retry: u8, +) -> Result<(), String> { let start_ts: DateTime = Local::now(); let file_path = std::path::Path::new(file); let metadata = std::fs::metadata(file_path).map_err(format_error)?; diff --git a/src/ic_oss_cli/src/main.rs b/src/ic_oss_cli/src/main.rs index b37e590..e9d4d73 100644 --- a/src/ic_oss_cli/src/main.rs +++ b/src/ic_oss_cli/src/main.rs @@ -1,9 +1,7 @@ use candid::Principal; use clap::{Parser, Subcommand}; -use ic_agent::{ - identity::{AnonymousIdentity, BasicIdentity, Identity, Secp256k1Identity}, - Agent, -}; +use ic_agent::identity::{AnonymousIdentity, BasicIdentity, Identity, Secp256k1Identity}; +use ic_oss::agent::build_agent; use ic_oss_types::format_error; use ring::{rand, signature::Ed25519KeyPair}; use std::{ @@ -110,9 +108,9 @@ async fn main() -> Result<(), String> { }) => { let is_ic = *ic || cli.ic; let host = if is_ic { IC_HOST } else { cli.host.as_str() }; - let agent = build_agent(host, identity).await.map_err(format_error)?; + let agent = build_agent(host, identity).await?; let bucket = Principal::from_text(bucket).map_err(format_error)?; - let cli = ic_oss::file::Client::new(Arc::new(agent), bucket); + let cli = ic_oss::bucket::Client::new(Arc::new(agent), bucket); upload_file(&cli, file, *retry).await?; return Ok(()); } @@ -137,16 +135,3 @@ fn load_identity(path: &str) -> anyhow::Result> { }, } } - -async fn build_agent(host: &str, identity: Box) -> anyhow::Result { - let agent = Agent::builder() - .with_url(host) - .with_boxed_identity(identity) - .with_verify_query_signatures(true) - .build()?; - if host.starts_with("http://") { - agent.fetch_root_key().await?; - } - - Ok(agent) -}