Skip to content

Commit

Permalink
refactor: simplify Repository and Serde traits, introduce AnyReposito…
Browse files Browse the repository at this point in the history
…ry, remove ProtobufSerde (#270)
  • Loading branch information
ar3s3ru authored Dec 11, 2023
1 parent ba65aed commit 406edf8
Show file tree
Hide file tree
Showing 24 changed files with 519 additions and 592 deletions.
37 changes: 11 additions & 26 deletions eventually-postgres/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::marker::PhantomData;

use async_trait::async_trait;
use eventually::aggregate::Aggregate;
use eventually::serde::{Deserializer, Serde, Serializer};
use eventually::serde::Serde;
use eventually::version::Version;
use eventually::{aggregate, version};
use sqlx::{PgPool, Postgres, Row};
Expand All @@ -15,7 +15,7 @@ where
OutT: From<T>,
OutEvt: From<T::Event>,
TSerde: Serde<OutT>,
EvtSerde: Serializer<OutEvt>,
EvtSerde: Serde<OutEvt>,
{
pool: PgPool,
aggregate_serde: TSerde,
Expand All @@ -32,7 +32,7 @@ where
OutT: From<T>,
OutEvt: From<T::Event>,
TSerde: Serde<OutT>,
EvtSerde: Serializer<OutEvt>,
EvtSerde: Serde<OutEvt>,
{
pub async fn new(
pool: PgPool,
Expand Down Expand Up @@ -100,7 +100,7 @@ where
OutT: From<T> + Send + Sync,
OutEvt: From<T::Event>,
TSerde: Serde<OutT> + Send + Sync,
EvtSerde: Serializer<OutEvt>,
EvtSerde: Serde<OutEvt>,
{
async fn save_aggregate_state(
&self,
Expand Down Expand Up @@ -138,7 +138,7 @@ where
}

#[async_trait]
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::repository::Getter<T>
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::Repository<T>
for Repository<T, OutT, OutEvt, TSerde, EvtSerde>
where
T: Aggregate + TryFrom<OutT> + Send + Sync,
Expand All @@ -147,15 +147,16 @@ where
OutT: From<T> + Send + Sync,
OutEvt: From<T::Event> + Send + Sync,
TSerde: Serde<OutT> + Send + Sync,
<TSerde as Deserializer<OutT>>::Error: std::error::Error + Send + Sync + 'static,
EvtSerde: Serializer<OutEvt> + Send + Sync,
<TSerde as Serde<OutT>>::Error: std::error::Error + Send + Sync + 'static,
EvtSerde: Serde<OutEvt> + Send + Sync,
{
type Error = GetError;
type GetError = GetError;
type SaveError = SaveError;

async fn get(
&self,
id: &T::Id,
) -> Result<aggregate::Root<T>, aggregate::repository::GetError<Self::Error>> {
) -> Result<aggregate::Root<T>, aggregate::repository::GetError<Self::GetError>> {
let aggregate_id = id.to_string();

let row = sqlx::query(
Expand Down Expand Up @@ -188,24 +189,8 @@ where
aggregate,
))
}
}

#[async_trait]
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::repository::Saver<T>
for Repository<T, OutT, OutEvt, TSerde, EvtSerde>
where
T: Aggregate + TryFrom<OutT> + Send + Sync,
<T as Aggregate>::Id: ToString,
<T as TryFrom<OutT>>::Error: std::error::Error + Send + Sync + 'static,
OutT: From<T> + Send + Sync,
OutEvt: From<T::Event> + Send + Sync,
TSerde: Serde<OutT> + Send + Sync,
<TSerde as Deserializer<OutT>>::Error: std::error::Error + Send + Sync + 'static,
EvtSerde: Serializer<OutEvt> + Send + Sync,
{
type Error = SaveError;

async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), Self::Error> {
async fn save(&self, root: &mut aggregate::Root<T>) -> Result<(), Self::SaveError> {
let events_to_commit = root.take_uncommitted_events();

if events_to_commit.is_empty() {
Expand Down
13 changes: 6 additions & 7 deletions eventually-postgres/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use std::string::ToString;
use async_trait::async_trait;
use chrono::Utc;
use eventually::message::{Message, Metadata};
use eventually::serde::{Deserializer, Serde, Serializer};
use eventually::serde::Serde;
use eventually::version::Version;
use eventually::{event, version};
use futures::future::ready;
use futures::{StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use regex::Regex;

Check warning on line 12 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `regex::Regex`

warning: unused import: `regex::Regex` --> eventually-postgres/src/event.rs:12:5 | 12 | use regex::Regex; | ^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
use sqlx::postgres::{PgDatabaseError, PgRow};

Check warning on line 13 in eventually-postgres/src/event.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `PgDatabaseError`

warning: unused import: `PgDatabaseError` --> eventually-postgres/src/event.rs:13:22 | 13 | use sqlx::postgres::{PgDatabaseError, PgRow}; | ^^^^^^^^^^^^^^^
use sqlx::{PgPool, Postgres, Row, Transaction};
Expand Down Expand Up @@ -60,7 +59,7 @@ impl From<AppendError> for Option<version::ConflictError> {

pub(crate) async fn append_domain_event<Evt, OutEvt>(
tx: &mut Transaction<'_, Postgres>,
serde: &impl Serializer<OutEvt>,
serde: &impl Serde<OutEvt>,
event_stream_id: &str,
event_version: i32,
new_event_stream_version: i32,
Expand Down Expand Up @@ -97,7 +96,7 @@ where

pub(crate) async fn append_domain_events<Evt, OutEvt>(
tx: &mut Transaction<'_, Postgres>,
serde: &impl Serializer<OutEvt>,
serde: &impl Serde<OutEvt>,
event_stream_id: &str,
new_version: i32,
events: Vec<event::Envelope<Evt>>,
Expand Down Expand Up @@ -176,7 +175,7 @@ where
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
OutEvt: From<Evt> + Send + Sync,
S: Serde<OutEvt> + Send + Sync,
<S as Deserializer<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
<S as Serde<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
{
fn event_row_to_persisted_event(
&self,
Expand Down Expand Up @@ -213,7 +212,7 @@ where
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
OutEvt: From<Evt> + Send + Sync,
S: Serde<OutEvt> + Send + Sync,
<S as Deserializer<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
<S as Serde<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
{
type Error = StreamError;

Expand Down Expand Up @@ -250,7 +249,7 @@ where
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
OutEvt: From<Evt> + Send + Sync,
S: Serde<OutEvt> + Send + Sync,
<S as Deserializer<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
<S as Serde<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
{
type Error = AppendError;

Expand Down
13 changes: 7 additions & 6 deletions eventually-postgres/tests/aggregate_repository.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use eventually::aggregate::repository::{GetError, Getter, Saver};
use eventually::serde::json::Json;
use eventually::aggregate::repository::GetError;
use eventually::aggregate::Repository;
use eventually::serde::json::JsonSerde;
use eventually::version;
use eventually_postgres::aggregate;
use futures::TryFutureExt;
Expand All @@ -15,8 +16,8 @@ async fn it_works() {

let aggregate_repository = aggregate::Repository::new(
pool,
Json::<setup::TestAggregate>::default(),
Json::<setup::TestDomainEvent>::default(),
JsonSerde::<setup::TestAggregate>::default(),
JsonSerde::<setup::TestDomainEvent>::default(),
)
.await
.unwrap();
Expand Down Expand Up @@ -64,8 +65,8 @@ async fn it_detects_data_races_and_returns_conflict_error() {

let aggregate_repository = aggregate::Repository::new(
pool,
Json::<setup::TestAggregate>::default(),
Json::<setup::TestDomainEvent>::default(),
JsonSerde::<setup::TestAggregate>::default(),
JsonSerde::<setup::TestDomainEvent>::default(),
)
.await
.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions eventually-postgres/tests/event_store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::time::{SystemTime, UNIX_EPOCH};

use eventually::event::{Appender, Persisted, StreamVersionExpected, Streamer, VersionSelect};
use eventually::serde::json::Json;
use eventually::serde::json::JsonSerde;
use eventually::version;
use eventually::version::Version;
use eventually_postgres::event;
use futures::{TryFutureExt, TryStreamExt};
use futures::TryStreamExt;
use rand::Rng;

mod setup;
Expand All @@ -16,7 +16,7 @@ async fn append_with_no_version_check_works() {
.await
.expect("connection to the database should work");

let event_store = event::Store::new(pool, Json::<setup::TestDomainEvent>::default())
let event_store = event::Store::new(pool, JsonSerde::<setup::TestDomainEvent>::default())
.await
.unwrap();

Expand Down Expand Up @@ -72,7 +72,7 @@ async fn it_works_with_version_check_for_conflict() {
.await
.expect("connection to the database should work");

let event_store = event::Store::new(pool, Json::<setup::TestDomainEvent>::default())
let event_store = event::Store::new(pool, JsonSerde::<setup::TestDomainEvent>::default())
.await
.unwrap();

Expand Down Expand Up @@ -148,7 +148,7 @@ async fn it_handles_concurrent_writes_to_the_same_stream() {
.await
.expect("connection to the database should work");

let event_store = event::Store::new(pool, Json::<setup::TestDomainEvent>::default())
let event_store = event::Store::new(pool, JsonSerde::<setup::TestDomainEvent>::default())
.await
.unwrap();

Expand Down
6 changes: 2 additions & 4 deletions eventually/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ default = []
tracing = ["dep:tracing"]
serde-prost = ["dep:prost"]
serde-json = ["dep:serde_json"]
serde-protobuf = ["dep:protobuf", "dep:protobuf-json-mapping"]
full = ["serde-prost", "serde-json", "serde-protobuf", "tracing"]
full = ["serde-prost", "serde-json", "tracing"]

[dependencies]
async-trait = "0.1.74"
Expand All @@ -31,9 +30,8 @@ thiserror = "1.0.50"
prost = { version = "0.12.1", optional = true }
serde_json = { version = "1.0.108", optional = true }
serde = { version = "1.0.192", features = ["derive"] }
protobuf = { version = "3.3.0", optional = true }
protobuf-json-mapping = { version = "3.3.0", optional = true }
tracing = { version = "0.1.40", features = ["async-await"], optional = true }
anyhow = "1.0.75"

[dev-dependencies]
# NOTE: this is only used for test components and assertions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ pub(crate) mod test_user_domain {
mod test {
use std::error::Error;

use crate::aggregate::repository::{Getter, Saver};
use crate::aggregate::test_user_domain::{User, UserEvent};
use crate::aggregate::Repository;
use crate::event::store::EventStoreExt;
use crate::{aggregate, event, version};

Expand Down
Loading

0 comments on commit 406edf8

Please sign in to comment.