Skip to content

Commit

Permalink
feat: add functionality for consumer and producer to accept invitation (
Browse files Browse the repository at this point in the history
#114)

* chore: Add functionality to accept invitation in consumer

* chore: add code for producer to accept invitation
  • Loading branch information
shanithkk authored Mar 29, 2024
1 parent af6010c commit 4539344
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 29 deletions.
2 changes: 1 addition & 1 deletion runtime/lite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ kuska-sodiumoxide = '0.2.5-0'
base64 = '0.11.0'
log = '0.4.8'
dirs = '2.0'
subxt-signer = '0.34.0'
slog = '2.7.0'
slog-async = '2.8.0'
slog-term = '2.9.1'
Expand Down Expand Up @@ -72,3 +71,4 @@ path = '../../workflow/test_util'

[dev-dependencies]
subxt = '0.34.0'
subxt-signer = '0.34.0'
7 changes: 4 additions & 3 deletions runtime/lite/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ This runtime harnesses the capabilities of Secure Scuttlebutt, a decentralized m
5. Move to lite directort and create a .env file and export the secret path port for consumer.

SECRET=../aurras/runtime/lite/scripts/secret/consumer_secret
PORT=8015
PORT=8014

6. Run the consumer client

Expand All @@ -48,7 +48,7 @@ This runtime harnesses the capabilities of Secure Scuttlebutt, a decentralized m

let mut stream = TcpStream::connect("127.0.0.1:8080").unwrap();
let wasm = fs::read("<wasm-path>").unwrap();
let buf = RequestBody{wasm: wasm,invite: "text".to_string(),pub_id: "<pub-id>".to_string(),input: serde_json::json!({"test": "test"}),allowed_hosts: None,};
let buf = RequestBody{wasm: wasm,invite: "<invite_code_of_pub>",pub_id: "<pub-id>".to_string(),input: serde_json::json!({"test": "test"}),allowed_hosts: None,};
let data = serde_json::to_vec(&buf).unwrap();stream.write_all(&data).unwrap();`
stream.shutdown(std::net::Shutdown::Write).unwrap();

Expand All @@ -62,7 +62,8 @@ This runtime harnesses the capabilities of Secure Scuttlebutt, a decentralized m

PUB_ADDRESS="@pjrBmtifFU9P9NhoHRiPbn3O3xGUXtsLWJXhxLEpkug=.ed25519"
PRODUCER_SECRET=./runtime/lite/scripts/secret/consumer_secret
PRODUCER_PORT=8014
PRODUCER_PORT=8015
PRODUCER_INVITE=<PRODUCER_INVITE>
*Note:* Get the pub address using script `./ssb-up.sh pub-whoami`

cargo run --example basic-producer
Expand Down
3 changes: 3 additions & 0 deletions runtime/lite/examples/basic-producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async fn main() {

let port = std::env::var("PRODUCER_PORT").unwrap_or("8014".to_string());
let pub_address = std::env::var("PUB_ADDRESS").expect("Pub address must be provided");
let invite = std::env::var("PRODUCER_INVITE").expect("invite address must be provided");

let mut file = async_std::fs::File::open(secret).await.unwrap();
let key = read_patchwork_config(&mut file).await.unwrap();
Expand All @@ -25,6 +26,8 @@ async fn main() {
.await
.unwrap();

let invite_res = client.accept_invite(&invite).await;
logger.info(&format!("{:?}", invite_res));
logger.info("producer successfully started✅");

use subxt::{OnlineClient, PolkadotConfig};
Expand Down
4 changes: 2 additions & 2 deletions runtime/lite/scripts/ssb-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ copy_secrets_to(){

mkdir -p ./secret

docker cp $consumer_container:/root/.ssb/secret ./secret/producer_secret
docker cp $producer_container:/root/.ssb/secret ./secret/consumer_secret
docker cp $consumer_container:/root/.ssb/secret ./secret/consumer_secret
docker cp $producer_container:/root/.ssb/secret ./secret/producer_secret
docker cp $pubs_container:/home/node/.ssb/secret ./secret/pubs_secret
}
start_specific_service() {
Expand Down
57 changes: 41 additions & 16 deletions runtime/lite/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use kuska_ssb::keystore::read_patchwork_config;
use runtime::{
common::RequestBody,
common::{constants::FOLLOW_LIST_KEY, RequestBody},
logger::CoreLogger,
state_manager::GlobalState,
modules::kuska_ssb_client::client::Client,
state_manager::GlobalState,
storage::{CoreStorage, Storage},
Ctx, Logger,
};
Expand All @@ -19,17 +19,13 @@ use dotenv::dotenv;
#[tokio::main]
async fn main() {
dotenv().ok();
let db = CoreStorage::new("runtime").unwrap();
let db = CoreStorage::new("runtime_db").unwrap();
let logger = CoreLogger::new(Some("./ssb-consumer.log"));
let state_manager = GlobalState::new(logger.clone());

logger.info("starting consumer...");

let context = Arc::new(Mutex::new(Context::new(
logger.clone(),
db,
state_manager
)));
let context = Arc::new(Mutex::new(Context::new(logger.clone(), db, state_manager)));

let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| {
let home_dir = dirs::home_dir().unwrap();
Expand All @@ -41,14 +37,19 @@ async fn main() {

let ssb_context = context.clone();

let (channel_invite_tx, channel_invite_rx) = std::sync::mpsc::channel::<String>();
let mut client = Client::new(Some(key), "0.0.0.0".to_string(), port)
.await
.unwrap();
// Spawn the SSB feed listener task
tokio::spawn(async move{
let mut client = Client::new(Some(key), "0.0.0.0".to_string(), port)
.await
.unwrap();

tokio::spawn(async move {
let logger = ssb_context.clone().lock().unwrap().get_logger().clone();
logger.info("consumer successfully started✅");
{
let invite = channel_invite_rx.recv().unwrap();
let res = client.accept_invite(&invite).await;
logger.info(&format!("Invite Accepted: {:?}", res));
}

client
.live_feed_with_execution_of_workflow(true, ssb_context)
Expand All @@ -63,7 +64,7 @@ async fn main() {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
handle_client(stream, context.clone());
handle_client(stream, context.clone(), channel_invite_tx.clone());
}
Err(e) => {
logger.error(&format!("Error accepting connection: {}", e));
Expand All @@ -78,7 +79,11 @@ async fn main() {

use std::io::{Read, Write};

fn handle_client(mut stream: TcpStream, ctx: Arc<Mutex<dyn Ctx>>) {
fn handle_client(
mut stream: TcpStream,
ctx: Arc<Mutex<dyn Ctx>>,
channel_invite_tx: std::sync::mpsc::Sender<String>,
) {
let mut buffer = Vec::new();
stream.read_to_end(&mut buffer).expect("Error reading data");

Expand All @@ -89,7 +94,27 @@ fn handle_client(mut stream: TcpStream, ctx: Arc<Mutex<dyn Ctx>>) {
logger.info("Data Deserialized");
let db = ctx.get_db();

db.insert(&body.pub_id.clone(), body).unwrap();
let get_follow_list = db.get_value::<Vec<String>>(FOLLOW_LIST_KEY).unwrap();
match get_follow_list {
Some(mut list) => {
if list.contains(&body.pub_id) {
logger.info("Already Following the publisher");
} else {
list.push(body.pub_id.clone());
let _res = channel_invite_tx.send(body.invite.clone());
db.insert_value(FOLLOW_LIST_KEY, list).unwrap();
logger.info(&format!("Following the publisher {}", body.pub_id));
}
}
None => {
let _res = channel_invite_tx.send(body.invite.clone());
db.insert_value(FOLLOW_LIST_KEY, vec![body.pub_id.clone()])
.unwrap();
logger.info(&format!("Following the publisher {}", body.pub_id));
}
}

db.insert_value(&body.pub_id.clone(), body).unwrap();
logger.info("Data inserted successfully");

// Respond to the client (optional)
Expand Down
2 changes: 2 additions & 0 deletions runtime/lite/src/modules/common/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

pub const FOLLOW_LIST_KEY: &str = "follow_list";
3 changes: 2 additions & 1 deletion runtime/lite/src/modules/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub mod constants;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct RequestBody {
pub wasm: Vec<u8>,
pub invite: String,
Expand Down
13 changes: 9 additions & 4 deletions runtime/lite/src/modules/kuska_ssb_client/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::{Arc, Mutex};

use crate::{
modules::logger::Logger, modules::storage::Storage, modules::wasmtime_wasi_module, Ctx,
modules::{logger::Logger, storage::Storage, wasmtime_wasi_module},
Ctx,
};

use super::*;
Expand Down Expand Up @@ -187,7 +188,13 @@ impl Client {
Ok(())
}

pub async fn publish_event(&mut self, event: &str, section: &str, content: &str, mentions: Option<Vec<Mention>>) -> Result<()> {
pub async fn publish_event(
&mut self,
event: &str,
section: &str,
content: &str,
mentions: Option<Vec<Mention>>,
) -> Result<()> {
let _req_id = self
.api
.publish_req_send(TypedMessage::Event {
Expand Down Expand Up @@ -234,8 +241,6 @@ impl Client {
match serde_json::from_str::<serde_json::Value>(&x.text)
{
Ok(mut event) => {


logger.info(&format!("Event: {:#?}", event));

match db.get(&x.mentions.unwrap()[0].link) {
Expand Down
27 changes: 26 additions & 1 deletion runtime/lite/src/modules/storage/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use super::traits::Storage;
use rocksdb::Error as RocksDBError;
use rocksdb::DB;
use serde::Serialize;
use std::{fmt, io};

#[derive(Debug)]
Expand All @@ -36,7 +37,7 @@ impl From<io::Error> for CustomError {

impl From<CustomError> for io::Error {
fn from(error: CustomError) -> Self {
match error {
match error {
CustomError::RocksDB(e) => io::Error::new(io::ErrorKind::Other, e),
CustomError::Io(e) => e,
CustomError::Custom(e) => io::Error::new(io::ErrorKind::Other, e),
Expand Down Expand Up @@ -199,4 +200,28 @@ impl Storage for CoreStorage {
None => Err(CustomError::Custom("Data not found".to_string())),
}
}

fn get_value<T: serde::de::DeserializeOwned>(
&self,
key: &str,
) -> Result<Option<T>, CustomError> {
let res = self
.db
.get(key)
.map_err(|e| CustomError::Custom(e.to_string()))?;
match res {
Some(bytes) => {
let value = serde_json::from_slice(&bytes)
.map_err(|e| CustomError::Custom(e.to_string()))?;
Ok(Some(value))
}
None => Ok(None),
}
}
fn insert_value<T: Serialize>(&self, key: &str, value: T) -> Result<(), CustomError> {
let bytes = serde_json::to_vec(&value).map_err(|e| CustomError::Custom(e.to_string()))?;
self.db
.put(key, bytes)
.map_err(|e| CustomError::Custom(e.to_string()))
}
}
19 changes: 19 additions & 0 deletions runtime/lite/src/modules/storage/test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[cfg(test)]
mod tests {
use crate::common::RequestBody;
use crate::modules::storage::CoreStorage;
use crate::modules::storage::Storage;
pub use rocksdb::DB;
Expand Down Expand Up @@ -123,4 +124,22 @@ mod tests {
fs::remove_dir_all(std::path::Path::new("test9")).unwrap();
result.unwrap();
}
#[test]
fn test_get_value() {
let core_storage = CoreStorage::new("test10").unwrap();
let req = RequestBody {
wasm: vec![0, 1, 3],
invite: "todo!()".to_string(),
pub_id: "todo!()".to_string(),
allowed_hosts: Some(vec!["todo!()".to_string()]),
input: serde_json::json!({ "test": "test"}),
};

let key = "test";
core_storage.insert_value(key, &req).unwrap();

let result = core_storage.get_value::<RequestBody>(key).unwrap();
fs::remove_dir_all(std::path::Path::new("test10")).unwrap();
assert_eq!(result, Some(req));
}
}
6 changes: 5 additions & 1 deletion runtime/lite/src/modules/storage/traits.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use serde::{de::DeserializeOwned, Serialize};

use crate::{common::RequestBody, modules::storage::CustomError};

pub trait Storage {
fn get_data(&self, key: &str) -> Result<Vec<u8>, CustomError>;
fn set_data(&self, key: &str, value: Vec<u8>) -> Result<(), CustomError>;
fn modify_data(&self, key: &str, value: Vec<u8>) -> Result<(), CustomError>;
fn delete_data(&self, key: &str) -> Result<(), CustomError>;
fn store_wasm(&self, key: &str, wasm : &[u8]) -> Result<(), CustomError>;
fn store_wasm(&self, key: &str, wasm: &[u8]) -> Result<(), CustomError>;
fn get_wasm(&self, key: &str) -> Result<Vec<u8>, CustomError>;
fn insert(&self, key: &str, value: RequestBody) -> Result<(), CustomError>;
fn get(&self, key: &str) -> Result<RequestBody, CustomError>;
fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CustomError>;
fn insert_value<T: Serialize>(&self, key: &str, value: T) -> Result<(), CustomError>;
}

0 comments on commit 4539344

Please sign in to comment.