Skip to content

Commit

Permalink
refactor: split Serde trait into separate ones, implement serde::Conv…
Browse files Browse the repository at this point in the history
…ert and serde::ProtoJson
  • Loading branch information
ar3s3ru committed Feb 23, 2024
1 parent cfbcd4c commit cd24e87
Show file tree
Hide file tree
Showing 13 changed files with 326 additions and 231 deletions.
106 changes: 40 additions & 66 deletions eventually-postgres/src/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,43 @@
//! This module contains the implementation of the [eventually::aggregate::Repository] trait,
//! to work specifically with PostgreSQL databases.
//!
//! Check out the [Repository] type for more information.
use std::marker::PhantomData;

use anyhow::anyhow;
use async_trait::async_trait;
use eventually::aggregate::Aggregate;
use eventually::serde::Serde;
use eventually::version::Version;
use eventually::{aggregate, version};
use eventually::{aggregate, serde, version};
use sqlx::{PgPool, Postgres, Row};

/// Implements the [eventually::aggregate::Repository] trait for
/// PostgreSQL databases.
#[derive(Debug, Clone)]
pub struct Repository<T, OutT, OutEvt, TSerde, EvtSerde>
pub struct Repository<T, Serde, EvtSerde>
where
T: Aggregate,
<T as Aggregate>::Id: ToString,
OutT: From<T>,
OutEvt: From<T::Event>,
TSerde: Serde<OutT>,
EvtSerde: Serde<OutEvt>,
Serde: serde::Serde<T>,
EvtSerde: serde::Serde<T::Event>,
{
pool: PgPool,
aggregate_serde: TSerde,
aggregate_serde: Serde,
event_serde: EvtSerde,
t: PhantomData<T>,
out_t: PhantomData<OutT>,
out_evt: PhantomData<OutEvt>,
}

impl<T, OutT, OutEvt, TSerde, EvtSerde> Repository<T, OutT, OutEvt, TSerde, EvtSerde>
impl<T, Serde, EvtSerde> Repository<T, Serde, EvtSerde>
where
T: Aggregate,
<T as Aggregate>::Id: ToString,
OutT: From<T>,
OutEvt: From<T::Event>,
TSerde: Serde<OutT>,
EvtSerde: Serde<OutEvt>,
Serde: serde::Serde<T>,
EvtSerde: serde::Serde<T::Event>,
{
pub async fn new(
pool: PgPool,
aggregate_serde: TSerde,
aggregate_serde: Serde,
event_serde: EvtSerde,
) -> Result<Self, sqlx::migrate::MigrateError> {
// Make sure the latest migrations are used before using the Repository instance.
Expand All @@ -48,20 +48,16 @@ where
aggregate_serde,
event_serde,
t: PhantomData,
out_t: PhantomData,
out_evt: PhantomData,
})
}
}

impl<T, OutT, OutEvt, TSerde, EvtSerde> Repository<T, OutT, OutEvt, TSerde, EvtSerde>
impl<T, Serde, EvtSerde> Repository<T, Serde, EvtSerde>
where
T: Aggregate + Send + Sync,
<T as Aggregate>::Id: ToString,
OutT: From<T> + Send + Sync,
OutEvt: From<T::Event>,
TSerde: Serde<OutT> + Send + Sync,
EvtSerde: Serde<OutEvt>,
Serde: serde::Serde<T> + Send + Sync,
EvtSerde: serde::Serde<T::Event> + Send + Sync,
{
async fn save_aggregate_state(
&self,
Expand All @@ -70,8 +66,11 @@ where
expected_version: Version,
root: &mut aggregate::Root<T>,
) -> Result<(), aggregate::repository::SaveError> {
let out_state = root.to_aggregate_type::<OutT>();
let bytes_state = self.aggregate_serde.serialize(out_state);
let out_state = root.to_aggregate_type::<T>();
let bytes_state = self
.aggregate_serde
.serialize(out_state)
.map_err(|err| anyhow!("failed to serialize aggregate root state: {}", err))?;

sqlx::query("CALL upsert_aggregate($1, $2, $3, $4, $5)")
.bind(aggregate_id)
Expand All @@ -84,16 +83,12 @@ where
.map_err(|err| match crate::check_for_conflict_error(&err) {
Some(err) => aggregate::repository::SaveError::Conflict(err),
None => match err.as_database_error().and_then(|err| err.code()) {
Some(code) if code == "40001" => {
aggregate::repository::SaveError::Conflict(version::ConflictError {
expected: expected_version,
actual: root.version(),
})
},
_ => aggregate::repository::SaveError::Internal(anyhow!(
"failed to save aggregate state: {}",
err
)),
Some(code) if code == "40001" => version::ConflictError {
expected: expected_version,
actual: root.version(),
}
.into(),
_ => anyhow!("failed to save aggregate state: {}", err).into(),
},
})?;

Expand All @@ -102,17 +97,12 @@ where
}

#[async_trait]
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::repository::Getter<T>
for Repository<T, OutT, OutEvt, TSerde, EvtSerde>
impl<T, Serde, EvtSerde> aggregate::repository::Getter<T> for Repository<T, Serde, EvtSerde>
where
T: Aggregate + TryFrom<OutT> + Send + Sync,
T: Aggregate + Send + Sync,
<T as Aggregate>::Id: ToString,
<T as TryFrom<OutT>>::Error: std::error::Error + Send + Sync + 'static,
OutT: From<T> + Send + Sync,
OutEvt: From<T::Event> + Send + Sync,
TSerde: Serde<OutT> + Send + Sync,
<TSerde as Serde<OutT>>::Error: std::error::Error + Send + Sync + 'static,
EvtSerde: Serde<OutEvt> + Send + Sync,
Serde: serde::Serde<T> + Send + Sync,
EvtSerde: serde::Serde<T::Event> + Send + Sync,
{
async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, aggregate::repository::GetError> {
let aggregate_id = id.to_string();
Expand All @@ -128,10 +118,7 @@ where
.await
.map_err(|err| match err {
sqlx::Error::RowNotFound => aggregate::repository::GetError::NotFound,
_ => aggregate::repository::GetError::Internal(anyhow!(
"failed to fetch the aggregate state row: {}",
err
)),
_ => anyhow!("failed to fetch the aggregate state row: {}", err).into(),
})?;

let version: i32 = row
Expand All @@ -144,20 +131,12 @@ where

let aggregate: T = self
.aggregate_serde
.deserialize(bytes_state)
.deserialize(&bytes_state)
.map_err(|err| {
anyhow!(
"failed to deserialize the aggregate state from the database row: {}",
err
)
})
.and_then(|out_t| {
T::try_from(out_t).map_err(|err| {
anyhow!(
"failed to convert the aggregate state into its domain type: {}",
err
)
})
})?;

Ok(aggregate::Root::rehydrate_from_state(
Expand All @@ -168,17 +147,12 @@ where
}

#[async_trait]
impl<T, OutT, OutEvt, TSerde, EvtSerde> aggregate::repository::Saver<T>
for Repository<T, OutT, OutEvt, TSerde, EvtSerde>
impl<T, Serde, EvtSerde> aggregate::repository::Saver<T> for Repository<T, Serde, EvtSerde>
where
T: Aggregate + TryFrom<OutT> + Send + Sync,
T: Aggregate + Send + Sync,
<T as Aggregate>::Id: ToString,
<T as TryFrom<OutT>>::Error: std::error::Error + Send + Sync + 'static,
OutT: From<T> + Send + Sync,
OutEvt: From<T::Event> + Send + Sync,
TSerde: Serde<OutT> + Send + Sync,
<TSerde as Serde<OutT>>::Error: std::error::Error + Send + Sync + 'static,
EvtSerde: Serde<OutEvt> + Send + Sync,
Serde: serde::Serde<T> + Send + Sync,
EvtSerde: serde::Serde<T::Event> + Send + Sync,
{
async fn save(
&self,
Expand Down
80 changes: 29 additions & 51 deletions eventually-postgres/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,17 @@ use anyhow::anyhow;
use async_trait::async_trait;
use chrono::Utc;
use eventually::message::{Message, Metadata};
use eventually::serde::Serde;
use eventually::version::Version;
use eventually::{event, version};
use eventually::{event, serde, version};
use futures::future::ready;
use futures::{StreamExt, TryStreamExt};
use sqlx::postgres::PgRow;
use sqlx::{PgPool, Postgres, Row, Transaction};

#[derive(Debug, thiserror::Error)]
pub enum StreamError {
#[error("failed to convert domain event from its serialization type: {0}")]
ConvertEvent(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("failed to deserialize event from database: {0}")]
DeserializeEvent(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
DeserializeEvent(#[source] anyhow::Error),
#[error("failed to get column '{name}' from result row: {error}")]
ReadColumn {
name: &'static str,
Expand All @@ -29,22 +26,22 @@ pub enum StreamError {
Database(#[source] sqlx::Error),
}

pub(crate) async fn append_domain_event<Evt, OutEvt>(
pub(crate) async fn append_domain_event<Evt>(
tx: &mut Transaction<'_, Postgres>,
serde: &impl Serde<OutEvt>,
serde: &impl serde::Serializer<Evt>,
event_stream_id: &str,
event_version: i32,
new_event_stream_version: i32,
event: event::Envelope<Evt>,
) -> Result<(), sqlx::Error>
) -> anyhow::Result<()>
where
Evt: Message,
OutEvt: From<Evt>,
{
let event_type = event.message.name();
let out_event = OutEvt::from(event.message);
let serialized_event = serde.serialize(out_event);
let mut metadata = event.metadata;
let serialized_event = serde
.serialize(event.message)
.map_err(|err| anyhow!("failed to serialize event message: {}", err))?;

metadata.insert("Recorded-At".to_owned(), Utc::now().to_rfc3339());
metadata.insert(
Expand All @@ -66,16 +63,15 @@ where
Ok(())
}

pub(crate) async fn append_domain_events<Evt, OutEvt>(
pub(crate) async fn append_domain_events<Evt>(
tx: &mut Transaction<'_, Postgres>,
serde: &impl Serde<OutEvt>,
serde: &impl serde::Serializer<Evt>,
event_stream_id: &str,
new_version: i32,
events: Vec<event::Envelope<Evt>>,
) -> Result<(), sqlx::Error>
) -> anyhow::Result<()>
where
Evt: Message,
OutEvt: From<Evt>,
{
let current_event_stream_version = new_version - (events.len() as i32);

Expand All @@ -97,28 +93,23 @@ where
}

#[derive(Debug, Clone)]
pub struct Store<Id, Evt, OutEvt, S>
pub struct Store<Id, Evt, Serde>
where
Id: ToString + Clone,
Evt: TryFrom<OutEvt>,
OutEvt: From<Evt>,
S: Serde<OutEvt>,
Serde: serde::Serde<Evt>,
{
pool: PgPool,
serde: S,
serde: Serde,
id_type: PhantomData<Id>,
evt_type: PhantomData<Evt>,
out_evt_type: PhantomData<OutEvt>,
}

impl<Id, Evt, OutEvt, S> Store<Id, Evt, OutEvt, S>
impl<Id, Evt, Serde> Store<Id, Evt, Serde>
where
Id: ToString + Clone,
Evt: TryFrom<OutEvt>,
OutEvt: From<Evt>,
S: Serde<OutEvt>,
Serde: serde::Serde<Evt>,
{
pub async fn new(pool: PgPool, serde: S) -> Result<Self, sqlx::migrate::MigrateError> {
pub async fn new(pool: PgPool, serde: Serde) -> Result<Self, sqlx::migrate::MigrateError> {
// Make sure the latest migrations are used before using the Store instance.
crate::MIGRATIONS.run(&pool).await?;

Expand All @@ -127,7 +118,6 @@ where
serde,
id_type: PhantomData,
evt_type: PhantomData,
out_evt_type: PhantomData,
})
}
}
Expand All @@ -140,14 +130,11 @@ where
.map_err(|err| StreamError::ReadColumn { name, error: err })
}

impl<Id, Evt, OutEvt, S> Store<Id, Evt, OutEvt, S>
impl<Id, Evt, Serde> Store<Id, Evt, Serde>
where
Id: ToString + Clone + Send + Sync,
Evt: TryFrom<OutEvt> + Message + Send + Sync,
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
OutEvt: From<Evt> + Send + Sync,
S: Serde<OutEvt> + Send + Sync,
<S as Serde<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
Evt: Message + Send + Sync,
Serde: serde::Serde<Evt> + Send + Sync,
{
fn event_row_to_persisted_event(
&self,
Expand All @@ -160,31 +147,25 @@ where

let deserialized_event = self
.serde
.deserialize(event_column)
.map_err(|err| StreamError::DeserializeEvent(Box::new(err)))?;

let converted_event = Evt::try_from(deserialized_event)
.map_err(|err| StreamError::ConvertEvent(Box::new(err)))?;
.deserialize(&event_column)
.map_err(StreamError::DeserializeEvent)?;

Ok(event::Persisted {
stream_id,
version: version_column as Version,
event: event::Envelope {
message: converted_event,
message: deserialized_event,
metadata: metadata_column.0,
},
})
}
}

impl<Id, Evt, OutEvt, S> event::store::Streamer<Id, Evt> for Store<Id, Evt, OutEvt, S>
impl<Id, Evt, Serde> event::store::Streamer<Id, Evt> for Store<Id, Evt, Serde>
where
Id: ToString + Clone + Send + Sync,
Evt: TryFrom<OutEvt> + Message + std::fmt::Debug + Send + Sync,
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
OutEvt: From<Evt> + Send + Sync,
S: Serde<OutEvt> + Send + Sync,
<S as Serde<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
Evt: Message + Send + Sync,
Serde: serde::Serde<Evt> + Send + Sync,
{
type Error = StreamError;

Expand Down Expand Up @@ -214,14 +195,11 @@ where
}

#[async_trait]
impl<Id, Evt, OutEvt, S> event::store::Appender<Id, Evt> for Store<Id, Evt, OutEvt, S>
impl<Id, Evt, Serde> event::store::Appender<Id, Evt> for Store<Id, Evt, Serde>
where
Id: ToString + Clone + Send + Sync,
Evt: TryFrom<OutEvt> + Message + std::fmt::Debug + Send + Sync,
<Evt as TryFrom<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
OutEvt: From<Evt> + Send + Sync,
S: Serde<OutEvt> + Send + Sync,
<S as Serde<OutEvt>>::Error: std::error::Error + Send + Sync + 'static,
Evt: Message + Send + Sync,
Serde: serde::Serde<Evt> + Send + Sync,
{
async fn append(
&self,
Expand Down
Loading

0 comments on commit cd24e87

Please sign in to comment.