Skip to content

Commit

Permalink
chore: improve idempotent-proxy-canister caller states
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Aug 3, 2024
1 parent 3b64893 commit 8c04970
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 40 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ strip = true
opt-level = 's'

[workspace.package]
version = "1.1.6"
version = "1.2.0"
edition = "2021"
repository = "https://github.com/ldclabs/idempotent-proxy"
keywords = ["idempotent", "reverse", "proxy", "icp"]
Expand Down
19 changes: 10 additions & 9 deletions src/idempotent-proxy-canister/idempotent-proxy-canister.did
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,18 @@ type InitArgs = record {
};
type Result = variant { Ok : bool; Err : text };
type Result_1 = variant { Ok; Err : text };
type Result_2 = variant { Ok : StateInfo; Err : text };
type StateInfo = record {
freezing_threshold : nat64;
proxy_token_public_key : text;
service_fee : nat64;
ecdsa_key_name : text;
managers : vec principal;
name : text;
auditors : vec principal;
schnorr_key_name : text;
cose : opt CoseClient;
uncollectible_cycles : nat;
callers : nat64;
agents : vec Agent;
incoming_cycles : nat;
proxy_token_refresh_interval : nat64;
subnet_size : nat64;
namespace_total : nat64;
vetkd_key_name : text;
};
type TransformArgs = record { context : blob; response : HttpResponse };
type TransformContext = record {
Expand All @@ -60,13 +61,13 @@ service : (opt ChainArgs) -> {
admin_remove_callers : (vec principal) -> (Result_1);
admin_remove_managers : (vec principal) -> (Result_1);
admin_set_agents : (vec Agent) -> (Result_1);
get_state : () -> (Result_2) query;
is_caller : (principal) -> (Result) query;
caller_info : (principal) -> (opt record { nat; nat64 }) query;
parallel_call_all_ok : (CanisterHttpRequestArgument) -> (HttpResponse);
parallel_call_any_ok : (CanisterHttpRequestArgument) -> (HttpResponse);
parallel_call_cost : (CanisterHttpRequestArgument) -> (nat) query;
proxy_http_request : (CanisterHttpRequestArgument) -> (HttpResponse);
proxy_http_request_cost : (CanisterHttpRequestArgument) -> (nat) query;
state_info : () -> (StateInfo) query;
validate_admin_add_managers : (vec principal) -> (Result_1);
validate_admin_remove_managers : (vec principal) -> (Result_1);
validate_admin_set_agents : (vec Agent) -> (Result_1);
Expand Down
67 changes: 50 additions & 17 deletions src/idempotent-proxy-canister/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ use std::collections::BTreeSet;

use crate::{agent::Agent, cose::CoseClient, store};

const MILLISECONDS: u64 = 1_000_000;

#[derive(CandidType, Deserialize, Serialize)]
pub struct StateInfo {
pub ecdsa_key_name: String,
pub proxy_token_public_key: String,
pub proxy_token_refresh_interval: u64, // seconds
pub agents: Vec<Agent>,
pub managers: BTreeSet<Principal>,
pub callers: u64,
pub subnet_size: u64,
pub service_fee: u64, // in cycles
pub incoming_cycles: u128,
Expand All @@ -22,8 +25,8 @@ pub struct StateInfo {
}

#[ic_cdk::query]
fn get_state() -> Result<StateInfo, String> {
let s = store::state::with(|s| StateInfo {
fn state_info() -> StateInfo {
store::state::with(|s| StateInfo {
ecdsa_key_name: s.ecdsa_key_name.clone(),
proxy_token_public_key: s.proxy_token_public_key.clone(),
proxy_token_refresh_interval: s.proxy_token_refresh_interval,
Expand All @@ -38,18 +41,18 @@ fn get_state() -> Result<StateInfo, String> {
})
.collect(),
managers: s.managers.clone(),
callers: s.callers.len() as u64,
subnet_size: s.subnet_size,
service_fee: s.service_fee,
incoming_cycles: s.incoming_cycles,
uncollectible_cycles: s.uncollectible_cycles,
cose: s.cose.clone(),
});
Ok(s)
})
}

#[ic_cdk::query]
fn is_caller(id: Principal) -> Result<bool, String> {
store::state::with(|s| Ok(s.allowed_callers.contains(&id)))
fn caller_info(id: Principal) -> Option<(u128, u64)> {
store::state::with(|s| s.callers.get(&id).copied())
}

#[ic_cdk::query]
Expand All @@ -75,7 +78,8 @@ async fn parallel_call_cost(req: CanisterHttpRequestArgument) -> u128 {
/// Proxy HTTP request by all agents in sequence until one returns an status <= 500 result.
#[ic_cdk::update]
async fn proxy_http_request(req: CanisterHttpRequestArgument) -> HttpResponse {
if !store::state::is_allowed(&ic_cdk::caller()) {
let caller = ic_cdk::caller();
if !store::state::is_allowed(&caller) {
return HttpResponse {
status: Nat::from(403u64),
body: "caller is not allowed".as_bytes().to_vec(),
Expand All @@ -92,6 +96,7 @@ async fn proxy_http_request(req: CanisterHttpRequestArgument) -> HttpResponse {
};
}

let balance = ic_cdk::api::call::msg_cycles_available128();
let calc = store::state::cycles_calculator();
store::state::receive_cycles(
calc.ingress_cost(ic_cdk::api::call::arg_data_raw_size()),
Expand All @@ -106,20 +111,31 @@ async fn proxy_http_request(req: CanisterHttpRequestArgument) -> HttpResponse {
Ok(res) => {
let cycles = calc.http_outcall_response_cost(calc.count_response_bytes(&res), 1);
store::state::receive_cycles(cycles, true);
store::state::update_caller_state(
&caller,
balance - ic_cdk::api::call::msg_cycles_available128(),
ic_cdk::api::time() / MILLISECONDS,
);
return res;
}
Err(res) => last_err = Some(res),
}
}

store::state::update_caller_state(
&caller,
balance - ic_cdk::api::call::msg_cycles_available128(),
ic_cdk::api::time() / MILLISECONDS,
);
last_err.unwrap()
}

/// Proxy HTTP request by all agents in parallel and return the result if all are the same,
/// or a 500 HttpResponse with all result.
#[ic_cdk::update]
async fn parallel_call_all_ok(req: CanisterHttpRequestArgument) -> HttpResponse {
if !store::state::is_allowed(&ic_cdk::caller()) {
let caller = ic_cdk::caller();
if !store::state::is_allowed(&caller) {
return HttpResponse {
status: Nat::from(403u64),
body: "caller is not allowed".as_bytes().to_vec(),
Expand All @@ -136,14 +152,15 @@ async fn parallel_call_all_ok(req: CanisterHttpRequestArgument) -> HttpResponse
};
}

let balance = ic_cdk::api::call::msg_cycles_available128();
let calc = store::state::cycles_calculator();
let cycles = calc.ingress_cost(ic_cdk::api::call::arg_data_raw_size())
+ calc.http_outcall_request_cost(calc.count_request_bytes(&req), agents.len());
store::state::receive_cycles(cycles, false);

let results =
futures::future::try_join_all(agents.iter().map(|agent| agent.call(req.clone()))).await;
match results {
let result = match results {
Err(res) => res,
Ok(res) => {
let mut results = res.into_iter();
Expand All @@ -164,22 +181,30 @@ async fn parallel_call_all_ok(req: CanisterHttpRequestArgument) -> HttpResponse
let mut buf = vec![];
into_writer(&inconsistent_results, &mut buf)
.expect("failed to encode inconsistent results");
return HttpResponse {
HttpResponse {
status: Nat::from(500u64),
body: buf,
headers: vec![],
};
}
} else {
base_result
}

base_result
}
}
};

store::state::update_caller_state(
&caller,
balance - ic_cdk::api::call::msg_cycles_available128(),
ic_cdk::api::time() / MILLISECONDS,
);
result
}

/// Proxy HTTP request by all agents in parallel and return the first (status <= 500) result.
#[ic_cdk::update]
async fn parallel_call_any_ok(req: CanisterHttpRequestArgument) -> HttpResponse {
if !store::state::is_allowed(&ic_cdk::caller()) {
let caller = ic_cdk::caller();
if !store::state::is_allowed(&caller) {
return HttpResponse {
status: Nat::from(403u64),
body: "caller is not allowed".as_bytes().to_vec(),
Expand All @@ -196,6 +221,7 @@ async fn parallel_call_any_ok(req: CanisterHttpRequestArgument) -> HttpResponse
};
}

let balance = ic_cdk::api::call::msg_cycles_available128();
let calc = store::state::cycles_calculator();
let cycles = calc.ingress_cost(ic_cdk::api::call::arg_data_raw_size())
+ calc.http_outcall_request_cost(calc.count_request_bytes(&req), agents.len());
Expand All @@ -204,13 +230,20 @@ async fn parallel_call_any_ok(req: CanisterHttpRequestArgument) -> HttpResponse
let result =
futures::future::select_ok(agents.iter().map(|agent| agent.call(req.clone()).boxed()))
.await;
match result {
let result = match result {
Ok((res, _)) => {
let cycles =
calc.http_outcall_response_cost(calc.count_response_bytes(&res), agents.len());
store::state::receive_cycles(cycles, true);
res
}
Err(res) => res,
}
};

store::state::update_caller_state(
&caller,
balance - ic_cdk::api::call::msg_cycles_available128(),
ic_cdk::api::time() / MILLISECONDS,
);
result
}
16 changes: 12 additions & 4 deletions src/idempotent-proxy-canister/src/api_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@ fn admin_add_caller(id: Principal) -> Result<bool, String> {
Err("anonymous caller cannot be added".to_string())?;
}

store::state::with_mut(|r| Ok(r.allowed_callers.insert(id)))
store::state::with_mut(|r| {
let has = r.callers.contains_key(&id);
r.callers.entry(id).or_insert((0, 0));
Ok(!has)
})
}

#[ic_cdk::update(guard = "is_controller_or_manager")]
fn admin_add_callers(mut args: BTreeSet<Principal>) -> Result<(), String> {
fn admin_add_callers(args: BTreeSet<Principal>) -> Result<(), String> {
validate_principals(&args)?;
store::state::with_mut(|r| {
r.allowed_callers.append(&mut args);
args.into_iter().for_each(|p| {
r.callers.entry(p).or_insert((0, 0));
});
Ok(())
})
}
Expand All @@ -44,7 +50,9 @@ fn admin_add_callers(mut args: BTreeSet<Principal>) -> Result<(), String> {
fn admin_remove_callers(args: BTreeSet<Principal>) -> Result<(), String> {
validate_principals(&args)?;
store::state::with_mut(|r| {
r.allowed_callers.retain(|p| !args.contains(p));
args.iter().for_each(|p| {
r.callers.remove(p);
});
Ok(())
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/idempotent-proxy-canister/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use candid::Principal;
use ic_cdk::api::management_canister::http_request::{CanisterHttpRequestArgument, HttpResponse};
use ic_cose_types::types::state::StateInfo;
use std::collections::BTreeSet;

mod agent;
Expand All @@ -13,7 +12,8 @@ mod init;
mod store;
mod tasks;

use crate::init::ChainArgs;
use api::StateInfo;
use init::ChainArgs;

fn is_controller() -> Result<(), String> {
let caller = ic_cdk::caller();
Expand Down
30 changes: 26 additions & 4 deletions src/idempotent-proxy-canister/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use ic_stable_structures::{
};
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use std::{borrow::Cow, cell::RefCell, collections::BTreeSet};
use std::{
borrow::Cow,
cell::RefCell,
collections::{BTreeMap, BTreeSet},
};

use crate::{
agent::Agent,
Expand All @@ -27,7 +31,9 @@ pub struct State {
pub proxy_token_refresh_interval: u64, // seconds
pub agents: Vec<Agent>,
pub managers: BTreeSet<Principal>,
pub allowed_callers: BTreeSet<Principal>,
pub allowed_callers: BTreeSet<Principal>, //deprecated
#[serde(default)]
pub callers: BTreeMap<Principal, (u128, u64)>,
#[serde(default)]
pub subnet_size: u64,
#[serde(default)]
Expand Down Expand Up @@ -154,7 +160,16 @@ pub mod state {
}

pub fn is_allowed(caller: &Principal) -> bool {
STATE.with(|r| r.borrow().allowed_callers.contains(caller))
STATE.with(|r| r.borrow().callers.contains_key(caller))
}

pub fn update_caller_state(caller: &Principal, cycles: u128, now_ms: u64) {
STATE.with(|r| {
r.borrow_mut().callers.get_mut(caller).map(|v| {
v.0 = v.0.saturating_add(cycles);
v.1 = now_ms;
})
});
}

pub fn with<R>(f: impl FnOnce(&State) -> R) -> R {
Expand Down Expand Up @@ -185,7 +200,14 @@ pub mod state {

pub fn load() {
STATE_STORE.with(|r| {
let s = r.borrow_mut().get().clone();
let mut s = r.borrow().get().clone();
if !s.allowed_callers.is_empty() {
s.allowed_callers.iter().for_each(|p| {
s.callers.entry(*p).or_insert((0, 0));
});
s.allowed_callers.clear();
}

STATE.with(|h| {
*h.borrow_mut() = s;
});
Expand Down

0 comments on commit 8c04970

Please sign in to comment.