Skip to content

Commit

Permalink
fix: run clippy and fix all issues
Browse files Browse the repository at this point in the history
  • Loading branch information
ar3s3ru committed Feb 23, 2024
1 parent 69c9806 commit 4e4480c
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 55 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"rust-analyzer.cargo.features": "all"
"rust-analyzer.cargo.features": "all",
"rust-analyzer.check.command": "clippy"
}
13 changes: 13 additions & 0 deletions eventually-macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
[package]
name = "eventually-macros"
description = "Macros for eventually crate"
version = "0.1.0"
edition = "2021"
authors = ["Danilo Cianfrone <[email protected]>"]
license = "MIT"
readme = "../README.md"
repository = "https://github.com/get-eventually/eventually-rs"

categories = [
"rust-patterns",
"web-programming",
"asynchronous",
"data-structures",
]
keywords = ["architecture", "ddd", "event-sourcing", "cqrs", "es"]

[lib]
proc-macro = true
Expand Down
12 changes: 8 additions & 4 deletions eventually-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, AttributeArgs, Fields, ItemStruct, Meta, NestedMeta, Path};

/// Implements a newtype to use the [eventually::aggregate::Root] instance with
/// user-defined [eventually::aggregate::Aggregate] types.
/// Implements a newtype to use the [`eventually::aggregate::Root`] instance with
/// user-defined [`eventually::aggregate::Aggregate`] types.
///
/// # Context
///
/// The eventually API uses [aggregate::Root][eventually::aggregate::Root]
/// The eventually API uses [`aggregate::Root`][eventually::aggregate::Root]
/// to manage the versioning and list of events to commit for an `Aggregate` instance.
/// Domain commands are to be implemented on the `aggregate::Root<T>` instance, as it gives
/// access to use `Root<T>.record_that` or `Root<T>.record_new` to record Domain Events.
Expand All @@ -24,7 +24,11 @@ use syn::{parse_macro_input, AttributeArgs, Fields, ItemStruct, Meta, NestedMeta
///
/// This attribute macro makes the implementation of a newtype easy, as it Implements
/// conversion traits from and to `aggregate::Root<T>` and implements automatic deref
/// through [std::ops::Deref] and [std::ops::DerefMut].
/// through [`std::ops::Deref`] and [`std::ops::DerefMut`].
///
/// # Panics
///
/// This method will panic if the Aggregate Root type is not provided as a macro parameter.
#[proc_macro_attribute]
pub fn aggregate_root(args: TokenStream, item: TokenStream) -> TokenStream {
let args = parse_macro_input!(args as AttributeArgs);
Expand Down
22 changes: 17 additions & 5 deletions eventually-postgres/src/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! This module contains the implementation of the [eventually::aggregate::Repository] trait,
//! to work specifically with PostgreSQL databases.
//! This module contains the implementation of the [`eventually::aggregate::Repository`] trait,
//! to work specifically with `PostgreSQL` databases.
//!
//! Check out the [Repository] type for more information.
Expand All @@ -12,8 +12,8 @@ use eventually::version::Version;
use eventually::{aggregate, serde, version};
use sqlx::{PgPool, Postgres, Row};

/// Implements the [eventually::aggregate::Repository] trait for
/// PostgreSQL databases.
/// Implements the [`eventually::aggregate::Repository`] trait for
/// `PostgreSQL` databases.
#[derive(Debug, Clone)]
pub struct Repository<T, Serde, EvtSerde>
where
Expand All @@ -35,6 +35,12 @@ where
Serde: serde::Serde<T>,
EvtSerde: serde::Serde<T::Event>,
{
/// Runs the latest migrations necessary for the implementation to work,
/// then returns a new [`Repository`] instance.
///
/// # Errors
///
/// An error is returned if the migrations fail to run.
pub async fn new(
pool: PgPool,
aggregate_serde: Serde,
Expand Down Expand Up @@ -72,6 +78,7 @@ where
.serialize(out_state)
.map_err(|err| anyhow!("failed to serialize aggregate root state: {}", err))?;

#[allow(clippy::cast_possible_truncation)]
sqlx::query("CALL upsert_aggregate($1, $2, $3, $4, $5)")
.bind(aggregate_id)
.bind(T::type_name())
Expand All @@ -82,7 +89,10 @@ where
.await
.map_err(|err| match crate::check_for_conflict_error(&err) {
Some(err) => aggregate::repository::SaveError::Conflict(err),
None => match err.as_database_error().and_then(|err| err.code()) {
None => match err
.as_database_error()
.and_then(sqlx::error::DatabaseError::code)
{
Some(code) if code == "40001" => version::ConflictError {
expected: expected_version,
actual: root.version(),
Expand Down Expand Up @@ -139,6 +149,7 @@ where
)
})?;

#[allow(clippy::cast_sign_loss)]
Ok(aggregate::Root::rehydrate_from_state(
version as Version,
aggregate,
Expand Down Expand Up @@ -181,6 +192,7 @@ where
self.save_aggregate_state(&mut tx, &aggregate_id, expected_root_version, root)
.await?;

#[allow(clippy::cast_possible_truncation)]
crate::event::append_domain_events(
&mut tx,
&self.event_serde,
Expand Down
32 changes: 24 additions & 8 deletions eventually-postgres/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ pub(crate) async fn append_domain_events<Evt>(
where
Evt: Message,
{
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let current_event_stream_version = new_version - (events.len() as i32);

for (i, event) in events.into_iter().enumerate() {
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let event_version = current_event_stream_version + (i as i32) + 1;

append_domain_event(
Expand Down Expand Up @@ -109,6 +111,12 @@ where
Id: ToString + Clone,
Serde: serde::Serde<Evt>,
{
/// Runs the latest migrations necessary for the implementation to work,
/// then returns a new [`Store`] instance.
///
/// # Errors
///
/// An error is returned if the migrations fail to run.
pub async fn new(pool: PgPool, serde: Serde) -> Result<Self, sqlx::migrate::MigrateError> {
// Make sure the latest migrations are used before using the Store instance.
crate::MIGRATIONS.run(&pool).await?;
Expand Down Expand Up @@ -139,17 +147,18 @@ where
fn event_row_to_persisted_event(
&self,
stream_id: Id,
row: PgRow,
row: &PgRow,
) -> Result<event::Persisted<Id, Evt>, StreamError> {
let version_column: i32 = try_get_column(&row, "version")?;
let event_column: Vec<u8> = try_get_column(&row, "event")?;
let metadata_column: sqlx::types::Json<Metadata> = try_get_column(&row, "metadata")?;
let version_column: i32 = try_get_column(row, "version")?;
let event_column: Vec<u8> = try_get_column(row, "event")?;
let metadata_column: sqlx::types::Json<Metadata> = try_get_column(row, "metadata")?;

let deserialized_event = self
.serde
.deserialize(&event_column)
.map_err(StreamError::DeserializeEvent)?;

#[allow(clippy::cast_sign_loss)]
Ok(event::Persisted {
stream_id,
version: version_column as Version,
Expand All @@ -170,16 +179,17 @@ where
type Error = StreamError;

fn stream(&self, id: &Id, select: event::VersionSelect) -> event::Stream<Id, Evt, Self::Error> {
#[allow(clippy::cast_possible_truncation)]
let from_version: i32 = match select {
event::VersionSelect::All => 0,
event::VersionSelect::From(v) => v as i32,
};

let query = sqlx::query(
r#"SELECT version, event, metadata
r"SELECT version, event, metadata
FROM events
WHERE event_stream_id = $1 AND version >= $2
ORDER BY version"#,
ORDER BY version",
);

let id = id.clone();
Expand All @@ -189,7 +199,7 @@ where
.bind(from_version)
.fetch(&self.pool)
.map_err(StreamError::Database)
.and_then(move |row| ready(self.event_row_to_persisted_event(id.clone(), row)))
.and_then(move |row| ready(self.event_row_to_persisted_event(id.clone(), &row)))
.boxed()
}
}
Expand Down Expand Up @@ -222,6 +232,7 @@ where

let new_version: i32 = match version_check {
version::Check::Any => {
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let events_len = events.len() as i32;

sqlx::query("SELECT * FROM upsert_event_stream_with_no_version_check($1, $2)")
Expand All @@ -235,6 +246,7 @@ where
version::Check::MustBe(v) => {
let new_version = v + (events.len() as Version);

#[allow(clippy::cast_possible_truncation)]
sqlx::query("CALL upsert_event_stream($1, $2, $3)")
.bind(&string_id)
.bind(v as i32)
Expand All @@ -243,7 +255,10 @@ where
.await
.map_err(|err| match crate::check_for_conflict_error(&err) {
Some(err) => event::store::AppendError::Conflict(err),
None => match err.as_database_error().and_then(|err| err.code()) {
None => match err
.as_database_error()
.and_then(sqlx::error::DatabaseError::code)
{
Some(code) if code == "40001" => {
event::store::AppendError::Conflict(version::ConflictError {
expected: v,
Expand All @@ -268,6 +283,7 @@ where
.await
.map_err(|err| anyhow!("failed to commit transaction: {}", err))?;

#[allow(clippy::cast_sign_loss)]
Ok(new_version as Version)
}
}
13 changes: 8 additions & 5 deletions eventually-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! `eventually-postgres` contains different implementations of traits
//! from the [eventually] crate that are specific for PostgreSQL databases.
//! from the [eventually] crate that are specific for `PostgreSQL` databases.
//!
//! Check out the [aggregate::Repository] and [event::Store] implementations
//! Check out the [`aggregate::Repository`] and [`event::Store`] implementations
//! to know more.
#![deny(unsafe_code, unused_qualifications, trivial_casts)]
Expand All @@ -11,15 +11,15 @@
pub mod aggregate;
pub mod event;

Check warning on line 12 in eventually-postgres/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test (stable)

missing documentation for a module

Check warning on line 12 in eventually-postgres/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

missing documentation for a module

pub static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("./migrations");
pub(crate) static MIGRATIONS: sqlx::migrate::Migrator = sqlx::migrate!("./migrations");

use eventually::version::{ConflictError, Version};
use lazy_static::lazy_static;
use regex::Regex;

lazy_static! {
static ref CONFLICT_ERROR_REGEX: Regex =
Regex::new(r#"version check failed, expected: (?P<expected>\d), got: (?P<got>\d)"#)
Regex::new(r"version check failed, expected: (?P<expected>\d), got: (?P<got>\d)")
.expect("regex compiles successfully");
}

Expand All @@ -32,7 +32,10 @@ pub(crate) fn check_for_conflict_error(err: &sqlx::Error) -> Option<ConflictErro
.parse::<i32>()
.expect("field should be a valid integer");

v as Version
#[allow(clippy::cast_sign_loss)]
{
v as Version
}
}

if let sqlx::Error::Database(ref pg_err) = err {
Expand Down
6 changes: 2 additions & 4 deletions eventually/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ where
}
}

/// List of possible errors that can be returned by [Root::rehydrate_async].
/// List of possible errors that can be returned by [`Root::rehydrate_async`].
#[derive(Debug, thiserror::Error)]
pub enum RehydrateError<T, I> {
/// Error returned during rehydration when the [Aggregate Root][Root]
Expand Down Expand Up @@ -422,9 +422,7 @@ pub(crate) mod test_user_domain {
return Err(UserError::EmptyPassword);
}

Ok(Self::record_new(
UserEvent::WasCreated { email, password }.into(),
)?)
Self::record_new(UserEvent::WasCreated { email, password }.into())
}

pub(crate) fn change_password(&mut self, password: String) -> Result<(), UserError> {
Expand Down
6 changes: 3 additions & 3 deletions eventually/src/aggregate/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! Aggregate Roots from a data store.
//!
//! If you are looking for the Event-sourced implementation of an Aggregate Repository,
//! take a look at [EventSourced].
//! take a look at [`EventSourced`].
use std::fmt::Debug;
use std::marker::PhantomData;
Expand All @@ -13,7 +13,7 @@ use futures::TryStreamExt;
use crate::aggregate::Aggregate;
use crate::{aggregate, event, version};

/// All possible errors returned by [Getter::get].
/// All possible errors returned by [`Getter::get`].
#[derive(Debug, thiserror::Error)]
pub enum GetError {
/// Error returned when the [Aggregate Root][aggregate::Root] could not be found in the data store.
Expand All @@ -36,7 +36,7 @@ where
async fn get(&self, id: &T::Id) -> Result<aggregate::Root<T>, GetError>;
}

/// All possible errors returned by [Saver::save].
/// All possible errors returned by [`Saver::save`].
#[derive(Debug, thiserror::Error)]
pub enum SaveError {
/// Error returned when [Saver::save] encounters a conflict error while saving the new Aggregate Root.
Expand Down
9 changes: 8 additions & 1 deletion eventually/src/aggregate/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ where
///
/// Use this branch when testing actions/mutations that modify the state
/// of an [Aggregate Root][Root] that already exists, by specifying its
/// current state using [Scenario::given].
/// current state using [`Scenario::given`].
///
/// # Panics
///
/// Please note: as this method expects that an [Aggregate Root][Root] instance
/// is available when executing the domain method, it will panic if a `Root` instance
/// could not be obtained by rehydrating the [`Aggregate`] state through the events
/// provided in [`Scenario::given`].
#[must_use]
pub fn when<R, F, Err>(self, f: F) -> ScenarioWhen<T, R, impl Fn() -> Result<R, Err>, Err>
where
Expand Down
Loading

0 comments on commit 4e4480c

Please sign in to comment.