Skip to content

Commit

Permalink
Add boilerplate for ClientApp
Browse files Browse the repository at this point in the history
  • Loading branch information
chalex-eth committed Oct 14, 2024
1 parent 1fed790 commit 86ba141
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 7 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ help:
RPC_URL=http://localhost:8545
DEPLOYER_PK=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
TASK_REGISTRY_ADDRESS=0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512
CLIENT_APP_REGISTRY_ADDRESS=0x5FbDB2315678afecb367f032d93F642f64180aa3
TASK_ID=0xc86aab04e8ef18a63006f43fa41a2a0150bae3dbe276d581fa8b5cde0ccbc966

-----------------------------: ##
Expand Down
12 changes: 11 additions & 1 deletion contract-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,19 @@ impl std::fmt::Debug for TaskRegistry::TaskRequested {
}
}

impl std::fmt::Debug for ClientAppRegistry::ClientAppMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"name: {:?}, description: {:?}, logo_url: {:?}, docker_url: {:?}",
self.name, self.description, self.logoUrl, self.dockerUrl
)
}
}

sol!(
#[sol(rpc)]
ClientAppRegistryContract,
ClientAppRegistry,
"../contracts/out/ClientAppRegistry.sol/ClientAppRegistry.json"
);

Expand Down
59 changes: 53 additions & 6 deletions operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use alloy::{
pubsub::PubSubFrontend,
};
use contract_bindings::{
ClientAppRegistry::{self, ClientAppMetadata, ClientAppRegistryInstance},
TaskRegistry::{self, TaskRegistryInstance},
TASK_REGISTRY_ADDRESS,
CLIENT_APP_REGISTRY_ADDRESS, TASK_REGISTRY_ADDRESS,
};
use eyre::{Result, WrapErr};
use futures::StreamExt;
Expand All @@ -16,12 +17,18 @@ use tokio::{
};
use tracing::{error, info, warn};

type TaskRegistryProvider =
Arc<TaskRegistryInstance<PubSubFrontend, Arc<RootProvider<PubSubFrontend>>>>;
type ClientAppRegistryProvider =
Arc<ClientAppRegistryInstance<PubSubFrontend, Arc<RootProvider<PubSubFrontend>>>>;

// Adjust this based on your expected load and system resources
const QUEUE_CAPACITY: usize = 100;

#[derive(Clone)]
pub struct Operator {
task_registry: Arc<TaskRegistryInstance<PubSubFrontend, RootProvider<PubSubFrontend>>>,
task_registry: TaskRegistryProvider,
client_app_registry: ClientAppRegistryProvider,
}

impl Operator {
Expand All @@ -33,15 +40,25 @@ impl Operator {
.on_ipc(ipc)
.await
.wrap_err("Failed to create provider")?;

let task_registry = Arc::new(TaskRegistry::new(TASK_REGISTRY_ADDRESS, provider));

Ok(Self { task_registry })
let provider = Arc::new(provider);

let task_registry = Arc::new(TaskRegistry::new(TASK_REGISTRY_ADDRESS, provider.clone()));
let client_app_registry = Arc::new(ClientAppRegistry::new(
CLIENT_APP_REGISTRY_ADDRESS,
provider,
));

Ok(Self {
task_registry,
client_app_registry,
})
}

pub async fn run(&self) -> Result<()> {
info!("Starting operator...");

self.fetch_client_app().await?;

// Create a bounded channel for task communication
// NOTE: Using a bounded channel helps with backpressure, preventing the event listener from overwhelming the task processor. However, if the
// channel becomes full, it may block the event listener.
Expand All @@ -59,6 +76,36 @@ impl Operator {
Ok(())
}

async fn fetch_client_app(&self) -> Result<()> {
// Fetch all the client apps registered
let clients_list = self
.client_app_registry
.ClientAppRegistered_filter()
.query()
.await?
.into_iter()
.map(|(client_app_id, _)| client_app_id.clientAppId)
.collect::<Vec<_>>();

// Fetch the metadata for all the client apps
let mut client_apps: Vec<ClientAppMetadata> = Vec::new();
for client_app_id in clients_list {
let client_app = self
.client_app_registry
.getClientAppMetadata(client_app_id)
.call()
.await?
._0;
client_apps.push(client_app);
}

// For PoC let's assume operators opt for all the client apps
// Check if image is already downloaded, if not download the image
// Run a container for each client app

Ok(())
}

async fn listen_for_events(self, tx: Sender<TaskRegistry::TaskRequested>) -> Result<()> {
// Create a stream of events from the the TaskRequested filter, will block until there an incoming event
let mut stream = self
Expand Down

0 comments on commit 86ba141

Please sign in to comment.