Skip to content

Commit

Permalink
Merge branch 'AV-205-Optimising-SSB-client-module' into AV-206-Optimi…
Browse files Browse the repository at this point in the history
…sing-state-manager-and-wasm-runtime-module
  • Loading branch information
ajaykumargdr committed Mar 28, 2024
2 parents 23fb88c + d2d57f6 commit 98571bc
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 29 deletions.
21 changes: 14 additions & 7 deletions runtime/lite/examples/basic-producer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use dotenv::dotenv;
use kuska_ssb::{api::dto::content::Mention, keystore::read_patchwork_config};
use runtime::kuska_ssb_client::client::Client;
use runtime::logger::{CoreLogger, Logger};

#[tokio::main]
async fn main() {
dotenv().ok();
println!("start");

let logger = CoreLogger::new(Some("./ssb-producer.log"));

logger.info("starting producer...");
let secret = std::env::var("PRODUCER_SECRET").unwrap_or_else(|_| {
let home_dir = dirs::home_dir().unwrap();
std::format!("{}/.ssb/secret", home_dir.to_string_lossy())
Expand All @@ -21,6 +25,8 @@ async fn main() {
.await
.unwrap();

logger.info("producer successfully started✅");

use subxt::{OnlineClient, PolkadotConfig};

#[subxt::subxt(runtime_metadata_path = "./src/modules/utils/polkadot_metadata_small.scale")]
Expand All @@ -38,9 +44,9 @@ async fn main() {
let block_number = block.header().number;
let block_hash = block.hash();

println!("Block #{block_number}:");
println!(" Hash: {block_hash}");
println!(" Extrinsics:");
logger.info(&format!(
"Block #{block_number} Hash: {block_hash} Extrinsics:"
));

// Log each of the extrinsic with it's associated events:
let extrinsics = block.extrinsics().await.unwrap();
Expand All @@ -56,20 +62,21 @@ async fn main() {
let from_addr = transfer.from.to_string();
let to_addr = transfer.from.to_string();
let amount = transfer.amount;
println!("{from_addr:?}");

logger.info(&format!("Transfer: {from_addr} -> {to_addr} {amount}"));

let value = format!(
"{{\"from\":\"{}\",\"to\":\"{}\",\"amount\":\"{}\"}}",
from_addr, to_addr, amount
);

let menttion = Mention {
let mention = Mention {
link: pub_address.clone(),
name: None,
};

let _ = client
.publish(&value.to_string(), Some(vec![menttion]))
.publish(&value.to_string(), Some(vec![mention]))
.await;
}
None => (),
Expand Down
20 changes: 14 additions & 6 deletions runtime/lite/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ use dotenv::dotenv;
async fn main() {
dotenv().ok();
let db = CoreStorage::new("runtime").unwrap();
let logger = CoreLogger::new(Some("./workflow"));
let logger = CoreLogger::new(Some("./ssb-consumer.log"));
let state_manager = GlobalState::new(logger.clone());

let context = Arc::new(Mutex::new(Context::new(logger, db, state_manager)));
logger.info("starting consumer...");

let context = Arc::new(Mutex::new(Context::new(
logger.clone(),
db,
state_manager
)));

let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| {
let home_dir = dirs::home_dir().unwrap();
Expand All @@ -34,11 +40,13 @@ async fn main() {
let key = read_patchwork_config(&mut file).await.unwrap();

let ssb_context = context.clone();

// Spawn the SSB feed listener task
tokio::spawn(async {
tokio::spawn(async move{
let mut client = Client::new(Some(key), "0.0.0.0".to_string(), port)
.await
.unwrap();

client
.live_feed_with_execution_of_workflow(true, ssb_context)
.await
Expand All @@ -48,14 +56,15 @@ async fn main() {
// Spawn the HTTP server task
tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind to address");
println!("Listening on 127.0.0.1:8080...");
logger.info("Listening on 127.0.0.1:8080...");

for stream in listener.incoming() {
match stream {
Ok(stream) => {
handle_client(stream, context.clone());
}
Err(e) => {
eprintln!("Error accepting connection: {}", e);
logger.error(&format!("Error accepting connection: {}", e));
}
}
}
Expand All @@ -81,7 +90,6 @@ fn handle_client(mut stream: TcpStream, ctx: Arc<Mutex<dyn Ctx>>) {
db.insert_request_body(&body.pub_id.clone(), body).unwrap();
logger.info("Data inserted successfully");

// println!("Received data: {:?}", body);
// Respond to the client (optional)
let response = "Data received!";
stream
Expand Down
17 changes: 9 additions & 8 deletions runtime/lite/src/modules/kuska_ssb_client/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ impl Client {

loop {
let (id, msg) = self.rpc_reader.recv().await?;
let ctx = ctx.lock().unwrap();
let db = ctx.get_db();
let logger = ctx.get_logger();
let state_manager = ctx.get_state_manager();

if id == req_no {
match msg {
Expand All @@ -234,12 +238,9 @@ impl Client {
match serde_json::from_str::<serde_json::Value>(&x.text)
{
Ok(mut event) => {
println!("{:#?}", event);

let ctx = ctx.lock().unwrap();
let db = ctx.get_db();
let logger = ctx.get_logger();
let state_manager = ctx.get_state_manager();

logger.info(&format!("Event: {:#?}", event));

match db.get_request_body(
&x.mentions.unwrap()[0].link,
Expand Down Expand Up @@ -268,18 +269,18 @@ impl Client {
Err(e) => logger.error(&e.to_string()),
}
}
Err(e) => println!("{:#?}", e),
Err(e) => logger.error(&e.to_string()),
}
}
Err(e) => println!("{:#?}", e),
Err(e) => logger.error(&e.to_string()),
}
}
}
Err(err) => {
let body = std::str::from_utf8(&body).unwrap();

if body == "{\"sync\":true}" {
println!("Syncing Successful");
logger.info("Syncing Successful");
is_synced = true;
} else {
return std::result::Result::Err(err);
Expand Down
6 changes: 0 additions & 6 deletions runtime/lite/src/modules/kuska_ssb_client/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,6 @@ mod tests {
let block = block.unwrap();

let block_number = block.header().number;
let block_hash = block.hash();

println!("Block #{block_number}:");
println!(" Hash: {block_hash}");
println!(" Extrinsics:");

if block_number == 10 {
let dest = dev::bob().public_key().into();
Expand All @@ -300,7 +295,6 @@ mod tests {
let from_addr = transfer.from.to_string();
let to_addr = transfer.from.to_string();
let amount = transfer.amount;
println!("{from_addr:?}");

let value = format!(
"{{\"from\":\"{}\",\"to\":\"{}\",\"amount\":\"{}\"}}",
Expand Down
2 changes: 0 additions & 2 deletions runtime/lite/src/modules/storage/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ mod tests {

while retries < 3 {
if let Err(_) = std::fs::remove_file(lock_file_path) {
println!("Failed to remove lock file: {}", lock_file_path);
retries += 1;

// Wait for 1 second before retrying
Expand All @@ -41,7 +40,6 @@ mod tests {
.set_data("test_key", b"test_value".to_vec())
.unwrap();
let result = core_storage.get_data("test_key").unwrap();
println!("{:?}", result);
let deserialized_value: Vec<u8> = result;
fs::remove_dir_all(std::path::Path::new("test2")).unwrap();
assert_eq!(deserialized_value, b"test_value");
Expand Down

0 comments on commit 98571bc

Please sign in to comment.