Skip to content

Commit

Permalink
feat: adjuted event bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Jun 12, 2024
1 parent 42feb2c commit e9e1665
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 60 deletions.
16 changes: 9 additions & 7 deletions src/domain/management/events.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone)]
pub enum Event {
NamespaceCreate(NamespaceCreate),
}

#[derive(Debug, Clone)]
pub struct NamespaceCreate {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Namespace {
pub name: String,
pub slug: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Event {
NamespaceCreation(Namespace),
}

#[async_trait::async_trait]
pub trait EventBridge {
async fn dispatch(&self, event: Event) -> Result<()>;
Expand Down
49 changes: 35 additions & 14 deletions src/domain/management/project.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{Error, Result};
use std::sync::Arc;

use super::events::{Event, EventBridge, NamespaceCreate};
use super::events::{Event, EventBridge, Namespace};

pub async fn create(
cache: Arc<dyn ProjectCache>,
Expand All @@ -12,25 +12,33 @@ pub async fn create(
return Err(Error::msg("invalid project slug"));
}

cache.create(&project).await?;
let namespace = Event::NamespaceCreation(Namespace {
name: project.name,
slug: project.slug,
});

event.dispatch(project.into()).await?;
event.dispatch(namespace).await?;

Ok(())
}

pub async fn create_cache(cache: Arc<dyn ProjectCache>, namespace: Namespace) -> Result<()> {
cache.create(&namespace.into()).await?;

Ok(())
}

#[derive(Debug, Clone)]
pub struct Project {
pub name: String,
pub description: String,
pub slug: String,
}
impl From<Project> for Event {
fn from(value: Project) -> Self {
Event::NamespaceCreate(NamespaceCreate {
slug: value.slug,
impl From<Namespace> for Project {
fn from(value: Namespace) -> Self {
Self {
name: value.name,
})
slug: value.slug,
}
}
}

Expand Down Expand Up @@ -69,7 +77,6 @@ mod tests {
fn default() -> Self {
Self {
name: "New Project".into(),
description: "Project to mock".into(),
slug: "sonic-vegas".into(),
}
}
Expand All @@ -81,7 +88,6 @@ mod tests {
project_cache
.expect_find_by_slug()
.return_once(|_| Ok(None));
project_cache.expect_create().return_once(|_| Ok(()));

let mut event_bridge = MockFakeEventBridge::new();
event_bridge.expect_dispatch().return_once(|_| Ok(()));
Expand All @@ -100,10 +106,8 @@ mod tests {
project_cache
.expect_find_by_slug()
.return_once(|_| Ok(Some(Project::default())));
project_cache.expect_create().return_once(|_| Ok(()));

let mut event_bridge = MockFakeEventBridge::new();
event_bridge.expect_dispatch().return_once(|_| Ok(()));
let event_bridge = MockFakeEventBridge::new();

let project = Project::default();

Expand All @@ -112,4 +116,21 @@ mod tests {
unreachable!("Fail to validate when the slug is duplicated")
}
}

#[tokio::test]
async fn it_should_create_project_cache() {
let mut project_cache = MockFakeProjectCache::new();
project_cache.expect_create().return_once(|_| Ok(()));

let project = Project::default();
let namespace = Namespace {
name: project.name,
slug: project.slug,
};

let result = create_cache(Arc::new(project_cache), namespace).await;
if let Err(err) = result {
unreachable!("{err}")
}
}
}
1 change: 0 additions & 1 deletion src/driven/cache/migrations/20240606_tables.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
CREATE TABLE IF NOT EXISTS projects (
slug TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
description TEXT NOT NULL
);
9 changes: 3 additions & 6 deletions src/driven/cache/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ impl ProjectCache for SqliteProjectCache {
async fn create(&self, project: &Project) -> Result<()> {
sqlx::query!(
r#"
INSERT INTO projects (slug, name, description)
VALUES ($1, $2, $3)
INSERT INTO projects (slug, name)
VALUES ($1, $2)
"#,
project.slug,
project.name,
project.description
)
.execute(&self.sqlite.db)
.await?;
Expand All @@ -33,7 +32,7 @@ impl ProjectCache for SqliteProjectCache {
async fn find_by_slug(&self, slug: &str) -> Result<Option<Project>> {
let result = sqlx::query!(
r#"
SELECT slug, name, description
SELECT slug, name
FROM projects WHERE slug = $1;
"#,
slug
Expand All @@ -45,8 +44,6 @@ impl ProjectCache for SqliteProjectCache {
return Ok(None);
}

//let x = result.unwrap();

Ok(None)
}
}
11 changes: 3 additions & 8 deletions src/driven/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use kafka::{
client::KafkaClient,
producer::{Producer, Record},
};
use serde_json::json;

use crate::domain::management::events::{Event, EventBridge};

Expand All @@ -28,13 +27,9 @@ impl KafkaEventBridge {
}
#[async_trait::async_trait]
impl EventBridge for KafkaEventBridge {
async fn dispatch(&self, _event: Event) -> Result<()> {
let json = json!({
"test": "123"
});
let value = serde_json::to_vec(&json)?;

let record = Record::from_key_value(&self.topic, "test".as_bytes(), value);
async fn dispatch(&self, event: Event) -> Result<()> {
let data = serde_json::to_vec(&event)?;
let record = Record::from_value(&self.topic, data);

let mut producer = Producer::from_hosts(self.hosts.clone()).create()?;
producer.send(&record)?;
Expand Down
50 changes: 31 additions & 19 deletions src/drivers/event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
use anyhow::Result;
use kafka::{client::{FetchOffset, GroupOffsetStorage}, consumer::Consumer};
use kafka::{
client::{FetchOffset, GroupOffsetStorage},
consumer::Consumer,
};
use std::{path::Path, sync::Arc};
use tracing::info;

use crate::{
domain::management::{events::Event, project::create_cache},
driven::cache::{project::SqliteProjectCache, SqliteCache},
};

pub async fn subscribe() -> Result<()> {
let sqlite_cache = Arc::new(SqliteCache::new(Path::new("dev.db")).await?);
sqlite_cache.migrate().await?;

let project_cache = Arc::new(SqliteProjectCache::new(sqlite_cache));

let topic = "events".to_string();
let hosts = &["localhost:19092".into()];

Expand All @@ -16,24 +30,22 @@ pub async fn subscribe() -> Result<()> {
info!("Event Driver started listening");

loop {
let result = consumer.poll();
if let Err(err) = result {
dbg!(&err);
return Err(err.into());
let mss = consumer.poll()?;
if mss.is_empty() {
continue;
}

for ms in mss.iter() {
for m in ms.messages() {
let event: Event = serde_json::from_slice(m.value)?;
match event {
Event::NamespaceCreation(namespace) => {
create_cache(project_cache.clone(), namespace).await?;
}
};
}
consumer.consume_messageset(ms)?;
}
//let mss = consumer.poll()?;
//dbg!(&mss);
//if mss.is_empty() {
// println!("No messages available right now.");
// return Ok(());
//}
//
//for ms in mss.iter() {
// for m in ms.messages() {
// dbg!(m);
// }
// let _ = consumer.consume_messageset(ms);
//}
//consumer.commit_consumed()?;
consumer.commit_consumed()?;
}
}
7 changes: 2 additions & 5 deletions src/drivers/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use crate::driven::kafka::KafkaEventBridge;

pub async fn server() -> Result<()> {
let sqlite_cache = Arc::new(SqliteCache::new(Path::new("dev.db")).await?);
sqlite_cache.migrate().await?;

let project_state = Arc::new(SqliteProjectCache::new(sqlite_cache));
let project_cache = Arc::new(SqliteProjectCache::new(sqlite_cache));

let event_bridge = Arc::new(KafkaEventBridge::new(
&["localhost:19092".into()],
Expand All @@ -28,10 +26,9 @@ pub async fn server() -> Result<()> {
let project = Project {
name: "test name".into(),
slug,
description: "test description".into(),
};

management::project::create(project_state, event_bridge, project).await?;
management::project::create(project_cache, event_bridge, project).await?;

Ok(())
}

0 comments on commit e9e1665

Please sign in to comment.