Skip to content

Commit

Permalink
refactor: optimising state manager and wasm runtime module (#111)
Browse files Browse the repository at this point in the history
* refactor: remove unwanted methods inside ssb client and clear clippy warnings

* fix: create new type `Event` to use in publish event method and pass missing fields

* chore: add state manager to the context & optimize wasm runtime and state manager

* refactor: fix spelling mistakes, format Cargo.toml and remove unwanted files

* chore: remove all print statements and log instead

* fix: change corestorage db name in consumer to fix conflicts

* chore: remove event type in ssb client publish event

* fix: remove unwanted comments

* fix: remove duplicate dependencies

* chore: fix locking issue while running workflow

* chore: remove all hardcoded values and create constant value

* fix: remove unwanted spaces

* fix: change the consumer implementation to sync first before waiting for invite
  • Loading branch information
ajaykumargdr authored Mar 29, 2024
1 parent 4539344 commit ae4427e
Show file tree
Hide file tree
Showing 18 changed files with 389 additions and 405 deletions.
10 changes: 6 additions & 4 deletions runtime/lite/examples/basic-producer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use dotenv::dotenv;
use kuska_ssb::{api::dto::content::Mention, keystore::read_patchwork_config};
use runtime::constants::DEFAULT_PRODUCER_SECRET_DIR;
use runtime::kuska_ssb_client::client::Client;
use runtime::logger::{CoreLogger, Logger};
use runtime::common::constants::{DEFAULT_PRODUCER_IP, DEFAULT_PRODUCER_PORT};

#[tokio::main]
async fn main() {
Expand All @@ -11,18 +13,18 @@ async fn main() {

logger.info("starting producer...");
let secret = std::env::var("PRODUCER_SECRET").unwrap_or_else(|_| {
let home_dir = dirs::home_dir().unwrap();
std::format!("{}/.ssb/secret", home_dir.to_string_lossy())
DEFAULT_PRODUCER_SECRET_DIR.to_string()
});

let port = std::env::var("PRODUCER_PORT").unwrap_or("8014".to_string());
let port = std::env::var("PRODUCER_PORT").unwrap_or(DEFAULT_PRODUCER_PORT.to_string());
let ip = std::env::var("PRODUCER_IP").unwrap_or(DEFAULT_PRODUCER_IP.to_string());
let pub_address = std::env::var("PUB_ADDRESS").expect("Pub address must be provided");
let invite = std::env::var("PRODUCER_INVITE").expect("invite address must be provided");

let mut file = async_std::fs::File::open(secret).await.unwrap();
let key = read_patchwork_config(&mut file).await.unwrap();

let mut client = Client::new(Some(key), "0.0.0.0".to_string(), port)
let mut client = Client::new(Some(key), ip, port)
.await
.unwrap();

Expand Down
44 changes: 23 additions & 21 deletions runtime/lite/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use kuska_ssb::keystore::read_patchwork_config;
use runtime::{
common::{constants::FOLLOW_LIST_KEY, RequestBody},
common::{
constants::{
DEFAULT_CONSUMER_IP, DEFAULT_CONSUMER_PORT, FOLLOW_LIST_KEY, HTTP_SERVER_ADDRESS,
},
RequestBody,
},
constants::DEFAULT_CONSUMER_SECRET_DIR,
logger::CoreLogger,
modules::kuska_ssb_client::client::Client,
state_manager::GlobalState,
Expand All @@ -19,48 +25,44 @@ use dotenv::dotenv;
#[tokio::main]
async fn main() {
dotenv().ok();
let db = CoreStorage::new("runtime_db").unwrap();
let db = CoreStorage::new("runtime-db").unwrap();
let logger = CoreLogger::new(Some("./ssb-consumer.log"));
let state_manager = GlobalState::new(logger.clone());

logger.info("starting consumer...");

let context = Arc::new(Mutex::new(Context::new(logger.clone(), db, state_manager)));

let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| {
let home_dir = dirs::home_dir().unwrap();
std::format!("{}/.ssb/secret", home_dir.to_string_lossy())
});
let port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| 8008.to_string());
let secret = std::env::var("CONSUMER_SECRET")
.unwrap_or_else(|_| DEFAULT_CONSUMER_SECRET_DIR.to_string());
let port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string());
let ip = std::env::var("CONSUMER_IP").unwrap_or(DEFAULT_CONSUMER_IP.to_string());
let mut file = async_std::fs::File::open(secret).await.unwrap();
let key = read_patchwork_config(&mut file).await.unwrap();

let ssb_context = context.clone();

let (channel_invite_tx, channel_invite_rx) = std::sync::mpsc::channel::<String>();
let mut client = Client::new(Some(key), "0.0.0.0".to_string(), port)
.await
.unwrap();
let mut client = Client::new(Some(key), ip, port).await.unwrap();
// Spawn the SSB feed listener task
tokio::spawn(async move {
let logger = ssb_context.clone().lock().unwrap().get_logger().clone();
logger.info("consumer successfully started✅");
{
let invite = channel_invite_rx.recv().unwrap();
let res = client.accept_invite(&invite).await;
logger.info(&format!("Invite Accepted: {:?}", res));
}

client
.live_feed_with_execution_of_workflow(true, ssb_context)
.live_feed_with_execution_of_workflow(true, ssb_context.clone())
.await
.unwrap();

let logger = ssb_context.lock().unwrap().get_logger().clone();
logger.info("consumer successfully started✅");

let invite = channel_invite_rx.recv().unwrap();
let res = client.accept_invite(&invite).await;
logger.info(&format!("Invite Accepted: {:?}", res));
});

// Spawn the HTTP server task
tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind to address");
logger.info("Listening on 127.0.0.1:8080...");
let listener = TcpListener::bind(HTTP_SERVER_ADDRESS).expect("Failed to bind to address");
logger.info(&format!("Listening on {HTTP_SERVER_ADDRESS}..."));
for stream in listener.incoming() {
match stream {
Ok(stream) => {
Expand Down
13 changes: 11 additions & 2 deletions runtime/lite/src/modules/common/constants.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@

pub const FOLLOW_LIST_KEY: &str = "follow_list";
pub const FOLLOW_LIST_KEY: &str = "follow_list";
pub const HTTP_SERVER_ADDRESS: &str = "127.0.0.1:8080";
pub const DEFAULT_CONSUMER_IP: &str = "0.0.0.0";
pub const DEFAULT_PRODUCER_IP: &str = "0.0.0.0";
pub const DEFAULT_PUB_IP: &str = "0.0.0.0";
pub const DEFAULT_PUB_PORT: &str = "8013";
pub const DEFAULT_CONSUMER_PORT: &str = "8014";
pub const DEFAULT_PRODUCER_PORT: &str = "8015";
pub const DEFAULT_CONSUMER_SECRET_DIR: &str = "scripts/secret/consumer_secret";
pub const DEFAULT_PRODUCER_SECRET_DIR: &str = "scripts/secret/producer_secret";
pub const DEFAULT_PUB_SECRET_DIR: &str = "scripts/secret/pubs_secret";
4 changes: 2 additions & 2 deletions runtime/lite/src/modules/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub struct RequestBody {
pub wasm: Vec<u8>,
pub invite: String,
pub pub_id: String,
pub allowed_hosts : Option<Vec<String>>,
pub allowed_hosts: Option<Vec<String>>,
pub input: Value,
}

Expand All @@ -25,4 +25,4 @@ pub fn combine_values(dest: &mut serde_json::Value, src: &serde_json::Value) {
}
(_, _) => panic!("update_with only works with two serde_json::Value::Object s"),
}
}
}
25 changes: 17 additions & 8 deletions runtime/lite/src/modules/kuska_ssb_client/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::{Arc, Mutex};

use crate::{
modules::{logger::Logger, storage::Storage, wasmtime_wasi_module},
modules::{logger::Logger, storage::Storage, wasmtime_wasi_module, state_manager::traits::GlobalStateManager},
Ctx,
};

Expand Down Expand Up @@ -81,9 +81,9 @@ impl Client {

let server_pk =
ed25519::PublicKey::from_slice(&base64::decode(&server_pk)?).expect("bad public key");
let server_ipport = format!("{}:{}", ip, port);
let server_ip_port = format!("{}:{}", ip, port);

let mut socket = TcpStream::connect(server_ipport).await?;
let mut socket = TcpStream::connect(server_ip_port).await?;

let handshake =
handshake_client(&mut socket, ssb_net_id(), pk, sk.clone(), server_pk).await?;
Expand Down Expand Up @@ -164,9 +164,9 @@ impl Client {
Ok(())
}

pub async fn create_invite(&mut self) -> Result<Vec<String>> {
pub async fn create_invite(&mut self) -> Result<String> {
let req_id = self.api.invite_create_req_send(1).await?;
let responses = self.get_responses(req_id, invite_create).await?;
let responses = self.get_async(req_id, invite_create).await?;
Ok(responses)
}

Expand Down Expand Up @@ -225,6 +225,7 @@ impl Client {
let ctx = ctx.lock().unwrap();
let db = ctx.get_db();
let logger = ctx.get_logger();
let mut state_manager = ctx.get_state_manager();

if id == req_no {
match msg {
Expand All @@ -243,19 +244,27 @@ impl Client {
Ok(mut event) => {
logger.info(&format!("Event: {:#?}", event));

match db.get(&x.mentions.unwrap()[0].link) {
match db.get_request_body(
&x.mentions.unwrap()[0].link,
) {
Ok(body) => {
let data = serde_json::json!({
"data" : crate::common::combine_values(&mut event, &body.input),
"allowed_hosts": body.allowed_hosts
});

let workflow_index = state_manager
.new_workflow(0, "hello");

let _ =
wasmtime_wasi_module::run_workflow(
state_manager,
logger,
serde_json::to_value(data)
.unwrap(),
body.wasm,
0,
"hello",
workflow_index,
false,
);
}
Err(e) => logger.error(&e.to_string()),
Expand Down
Loading

0 comments on commit ae4427e

Please sign in to comment.