From 86ba14196f21262d7a5423f2ed4a78c0bb7c4ea1 Mon Sep 17 00:00:00 2001 From: "chalex.eth" <43524913+chalex-eth@users.noreply.github.com> Date: Mon, 14 Oct 2024 22:31:51 +0200 Subject: [PATCH] Add boilerplate for ClientApp --- Makefile | 1 + contract-bindings/src/lib.rs | 12 +++++++- operator/src/lib.rs | 59 ++++++++++++++++++++++++++++++++---- 3 files changed, 65 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 1468460..9d2ee00 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,7 @@ help: RPC_URL=http://localhost:8545 DEPLOYER_PK=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 TASK_REGISTRY_ADDRESS=0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512 +CLIENT_APP_REGISTRY_ADDRESS=0x5FbDB2315678afecb367f032d93F642f64180aa3 TASK_ID=0xc86aab04e8ef18a63006f43fa41a2a0150bae3dbe276d581fa8b5cde0ccbc966 -----------------------------: ## diff --git a/contract-bindings/src/lib.rs b/contract-bindings/src/lib.rs index 42f85df..9f180d7 100644 --- a/contract-bindings/src/lib.rs +++ b/contract-bindings/src/lib.rs @@ -42,9 +42,19 @@ impl std::fmt::Debug for TaskRegistry::TaskRequested { } } +impl std::fmt::Debug for ClientAppRegistry::ClientAppMetadata { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "name: {:?}, description: {:?}, logo_url: {:?}, docker_url: {:?}", + self.name, self.description, self.logoUrl, self.dockerUrl + ) + } +} + sol!( #[sol(rpc)] - ClientAppRegistryContract, + ClientAppRegistry, "../contracts/out/ClientAppRegistry.sol/ClientAppRegistry.json" ); diff --git a/operator/src/lib.rs b/operator/src/lib.rs index 33c78ef..f249529 100644 --- a/operator/src/lib.rs +++ b/operator/src/lib.rs @@ -3,8 +3,9 @@ use alloy::{ pubsub::PubSubFrontend, }; use contract_bindings::{ + ClientAppRegistry::{self, ClientAppMetadata, ClientAppRegistryInstance}, TaskRegistry::{self, TaskRegistryInstance}, - TASK_REGISTRY_ADDRESS, + CLIENT_APP_REGISTRY_ADDRESS, TASK_REGISTRY_ADDRESS, }; use eyre::{Result, WrapErr}; use futures::StreamExt; @@ -16,12 +17,18 @@ use tokio::{ }; use tracing::{error, info, warn}; +type TaskRegistryProvider = + Arc>>>; +type ClientAppRegistryProvider = + Arc>>>; + // Adjust this based on your expected load and system resources const QUEUE_CAPACITY: usize = 100; #[derive(Clone)] pub struct Operator { - task_registry: Arc>>, + task_registry: TaskRegistryProvider, + client_app_registry: ClientAppRegistryProvider, } impl Operator { @@ -33,15 +40,25 @@ impl Operator { .on_ipc(ipc) .await .wrap_err("Failed to create provider")?; - - let task_registry = Arc::new(TaskRegistry::new(TASK_REGISTRY_ADDRESS, provider)); - - Ok(Self { task_registry }) + let provider = Arc::new(provider); + + let task_registry = Arc::new(TaskRegistry::new(TASK_REGISTRY_ADDRESS, provider.clone())); + let client_app_registry = Arc::new(ClientAppRegistry::new( + CLIENT_APP_REGISTRY_ADDRESS, + provider, + )); + + Ok(Self { + task_registry, + client_app_registry, + }) } pub async fn run(&self) -> Result<()> { info!("Starting operator..."); + self.fetch_client_app().await?; + // Create a bounded channel for task communication // NOTE: Using a bounded channel helps with backpressure, preventing the event listener from overwhelming the task processor. However, if the // channel becomes full, it may block the event listener. @@ -59,6 +76,36 @@ impl Operator { Ok(()) } + async fn fetch_client_app(&self) -> Result<()> { + // Fetch all the client apps registered + let clients_list = self + .client_app_registry + .ClientAppRegistered_filter() + .query() + .await? + .into_iter() + .map(|(client_app_id, _)| client_app_id.clientAppId) + .collect::>(); + + // Fetch the metadata for all the client apps + let mut client_apps: Vec = Vec::new(); + for client_app_id in clients_list { + let client_app = self + .client_app_registry + .getClientAppMetadata(client_app_id) + .call() + .await? + ._0; + client_apps.push(client_app); + } + + // For PoC let's assume operators opt for all the client apps + // Check if image is already downloaded, if not download the image + // Run a container for each client app + + Ok(()) + } + async fn listen_for_events(self, tx: Sender) -> Result<()> { // Create a stream of events from the the TaskRequested filter, will block until there an incoming event let mut stream = self