Skip to content

Commit

Permalink
feat: get rid of PublishedMessage derive
Browse files Browse the repository at this point in the history
  • Loading branch information
hseeberger committed Oct 12, 2023
1 parent 840c38d commit 97e414f
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 141 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
[workspace]
members = [
"pub-sub-client",
"pub-sub-client-derive",
"pub-sub-client-tests",
]
resolver = "2"
Expand Down
20 changes: 0 additions & 20 deletions pub-sub-client-derive/Cargo.toml

This file was deleted.

27 changes: 0 additions & 27 deletions pub-sub-client-derive/src/lib.rs

This file was deleted.

2 changes: 1 addition & 1 deletion pub-sub-client-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ documentation = { workspace = true }
publish = false

[dev-dependencies]
pub-sub-client = { version = "=0.11.1-alpha.0", path = "../pub-sub-client", features = [ "derive" ] }
pub-sub-client = { version = "=0.11.1-alpha.0", path = "../pub-sub-client" }
anyhow = { workspace = true }
base64 = { workspace = true }
reqwest = { workspace = true, features = [ "json" ] }
Expand Down
4 changes: 2 additions & 2 deletions pub-sub-client-tests/examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use pub_sub_client::{Error, PubSubClient, PublishedMessage};
use pub_sub_client::{Error, PubSubClient};
use serde::{Deserialize, Serialize};
use std::{env, error::Error as _, time::Duration};

const TOPIC_ID: &str = "test";
const SUBSCRIPTION_ID: &str = "test";

#[derive(Debug, Deserialize, Serialize, PublishedMessage)]
#[derive(Debug, Serialize, Deserialize)]
struct Message {
text: String,
}
Expand Down
5 changes: 2 additions & 3 deletions pub-sub-client-tests/examples/transform.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use anyhow::anyhow;
use base64::{engine::general_purpose::STANDARD, Engine};
use pub_sub_client::{
Error, PubSubClient, PublishedMessage, PulledMessage, RawPublishedMessage,
RawPulledMessageEnvelope,
Error, PubSubClient, PulledMessage, RawPublishedMessage, RawPulledMessageEnvelope,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
Expand All @@ -11,7 +10,7 @@ use std::{collections::HashMap, env, error::Error as _, time::Duration};
const TOPIC_ID: &str = "test";
const SUBSCRIPTION_ID: &str = "test";

#[derive(Debug, Deserialize, Serialize, PublishedMessage)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
#[allow(dead_code)]
enum Message {
Expand Down
6 changes: 3 additions & 3 deletions pub-sub-client-tests/tests/test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use base64::{engine::general_purpose::STANDARD, Engine};
use pub_sub_client::{PubSubClient, PublishedMessage, RawPublishedMessage};
use pub_sub_client::{PubSubClient, RawPublishedMessage};
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json::json;
Expand All @@ -12,7 +12,7 @@ const TOPIC_ID: &str = "test";
const SUBSCRIPTION_ID: &str = "test";
const TEXT: &str = "test-text";

#[derive(Debug, Deserialize, Serialize, PublishedMessage, PartialEq)]
#[derive(Debug, PartialEq, Serialize, Deserialize)]
enum Message {
Foo { text: String },
Bar { text: String },
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn test() {
HashMap::from([("version".to_string(), "v1".to_string())]),
)];
let result = pub_sub_client
.publish(TOPIC_ID, messages, None, Some(Duration::from_secs(10)))
.publish::<Message, _>(TOPIC_ID, messages, None, Some(Duration::from_secs(10)))
.await;
assert!(result.is_ok());
let result = result.unwrap();
Expand Down
22 changes: 9 additions & 13 deletions pub-sub-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,16 @@ documentation = { workspace = true }
publish = { workspace = true }
exclude = [ "tests" ]

[features]
derive = [ "pub-sub-client-derive" ]

[dependencies]
pub-sub-client-derive = { version = "=0.11.1-alpha.0", optional = true, path = "../pub-sub-client-derive" }
base64 = { workspace = true }
goauth = { workspace = true }
reqwest = { workspace = true, features = [ "json" ] }
serde = { workspace = true, features = [ "derive" ] }
serde_json = { workspace = true }
smpl_jwt = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true, features = [ "serde-well-known" ] }
tracing = { workspace = true }
base64 = { workspace = true }
goauth = { workspace = true }
reqwest = { workspace = true, features = [ "json" ] }
serde = { workspace = true, features = [ "derive" ] }
serde_json = { workspace = true }
smpl_jwt = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true, features = [ "serde-well-known" ] }
tracing = { workspace = true }

[dev-dependencies]
anyhow = { workspace = true }
38 changes: 18 additions & 20 deletions pub-sub-client/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,32 @@ use thiserror::Error;

#[derive(Debug, Error)]
pub enum Error {
#[error("Initialization error: {reason}")]
#[error("initialization error: {reason}")]
Initialization {
reason: String,
source: Box<dyn StdError + Send + Sync + 'static>,
},

#[error("Getting authentication token failed")]
TokenFetch { source: Box<goauth::GoErr> },
#[error("getting authentication token failed")]
TokenFetch(#[from] Box<goauth::GoErr>),

#[error("HTTP communication with Pub/Sub service failed")]
HttpServiceCommunication { source: reqwest::Error },
#[error("Unexpected HTTP status code `{0}` from Pub/Sub service: {1}")]
HttpServiceCommunication(#[source] reqwest::Error),
#[error("unexpected HTTP status code `{0}` from Pub/Sub service: {1}")]
UnexpectedHttpStatusCode(reqwest::StatusCode, String),
#[error("Unexpected HTTP response from Pub/Sub service")]
UnexpectedHttpResponse { source: reqwest::Error },
#[error("unexpected HTTP response from Pub/Sub service")]
UnexpectedHttpResponse(#[source] reqwest::Error),

#[error("Decoding data of received message as Base64 failed")]
NoBase64 { source: base64::DecodeError },
#[error("PubSubMessage contains no data")]
#[error("decoding data of received message as Base64 failed")]
DecodeBase64(#[source] base64::DecodeError),
#[error("message contains no data")]
NoData,
#[error("Deserializing data of received message failed")]
Deserialize { source: serde_json::Error },
#[error("Serializing of message to be published failed")]
Serialize { source: serde_json::Error },
#[error("Failed to transform JSON value")]
Transform {
source: Box<dyn StdError + Send + Sync + 'static>,
},
#[error("deserializing data of received message failed")]
Deserialize(#[source] serde_json::Error),
#[error("serializing of message to be published failed")]
Serialize(#[source] serde_json::Error),
#[error("failed to transform JSON value")]
Transform(#[source] Box<dyn StdError + Send + Sync + 'static>),
}

impl Error {
Expand All @@ -42,11 +40,11 @@ impl Error {
response
.text()
.await
.map_err(|e| format!("Failed to get response body as text: {e}"))
.map_err(|e| format!("failed to get response body as text: {e}"))
.and_then(|text| {
serde_json::from_str::<Value>(&text)
.map_err(|e| format!("failed to parse error response: {e}"))
.map(|v| v["error"]["message"].to_string())
.map_err(|e| format!("Failed to parse error response: {e}"))
})
.unwrap_or_else(identity),
)
Expand Down
53 changes: 27 additions & 26 deletions pub-sub-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ pub use error::*;
pub use publisher::*;
pub use subscriber::*;

#[cfg(feature = "derive")]
pub use pub_sub_client_derive::*;

use goauth::{auth::JwtClaims, credentials::Credentials, fetcher::TokenFetcher, scopes::Scope};
use reqwest::Response;
use serde::Serialize;
use smpl_jwt::Jwt;
use std::{env, time::Duration};
use std::{
env,
fmt::{self, Debug, Formatter},
time::Duration,
};

const BASE_URL_ENV_VAR: &str = "PUB_SUB_BASE_URL";
const DEFAULT_BASE_URL: &str = "https://pubsub.googleapis.com";
Expand All @@ -24,20 +25,15 @@ pub struct PubSubClient {
reqwest_client: reqwest::Client,
}

impl std::fmt::Debug for PubSubClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PubSubClient")
.field("project_url", &self.project_url)
.finish()
}
}

impl PubSubClient {
pub fn new<T: AsRef<str>>(key_path: T, refresh_buffer: Duration) -> Result<Self, Error> {
pub fn new<T>(key_path: T, refresh_buffer: Duration) -> Result<Self, Error>
where
T: AsRef<str>,
{
let key_path = key_path.as_ref();
let credentials =
Credentials::from_file(key_path).map_err(|source| Error::Initialization {
reason: format!("Missing or malformed service account key at `{key_path}`"),
reason: format!("missing or malformed service account key at `{key_path}`"),
source: source.into(),
})?;

Expand All @@ -56,7 +52,7 @@ impl PubSubClient {
credentials
.rsa_key()
.map_err(|source| Error::Initialization {
reason: format!("Malformed private key in service account key at `{key_path}`"),
reason: format!("malformed private key in service account key at `{key_path}`"),
source: source.into(),
})?,
None,
Expand All @@ -65,7 +61,7 @@ impl PubSubClient {
let refresh_buffer = refresh_buffer
.try_into()
.map_err(|source| Error::Initialization {
reason: format!("Invalid refresh_buffer `{refresh_buffer:?}`"),
reason: format!("invalid refresh_buffer `{refresh_buffer:?}`"),
source: Box::new(source),
})?;

Expand All @@ -76,19 +72,16 @@ impl PubSubClient {
})
}

async fn send_request<R: Serialize>(
async fn send_request<R>(
&self,
url: &str,
request: &R,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let token = self
.token_fetcher
.fetch_token()
.await
.map_err(|source| Error::TokenFetch {
source: Box::new(source),
})?;
) -> Result<Response, Error>
where
R: Serialize,
{
let token = self.token_fetcher.fetch_token().await.map_err(Box::new)?;

let request = self
.reqwest_client
Expand All @@ -100,7 +93,15 @@ impl PubSubClient {
request
.send()
.await
.map_err(|source| Error::HttpServiceCommunication { source })
.map_err(Error::HttpServiceCommunication)
}
}

impl Debug for PubSubClient {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("PubSubClient")
.field("project_url", &self.project_url)
.finish()
}
}

Expand Down
23 changes: 15 additions & 8 deletions pub-sub-client/src/publisher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt::Debug, time::Duration};
use tracing::debug;

pub trait PublishedMessage: Serialize {}

pub struct PublishedMessageEnvelope<M: PublishedMessage> {
pub struct PublishedMessageEnvelope<M>
where
M: Serialize,
{
message: M,
attributes: Option<HashMap<String, String>>,
}

impl<M: PublishedMessage> From<M> for PublishedMessageEnvelope<M> {
impl<M> From<M> for PublishedMessageEnvelope<M>
where
M: Serialize,
{
fn from(message: M) -> Self {
Self {
message,
Expand All @@ -20,7 +24,10 @@ impl<M: PublishedMessage> From<M> for PublishedMessageEnvelope<M> {
}
}

impl<M: PublishedMessage> From<(M, HashMap<String, String>)> for PublishedMessageEnvelope<M> {
impl<M> From<(M, HashMap<String, String>)> for PublishedMessageEnvelope<M>
where
M: Serialize,
{
fn from((message, attributes): (M, HashMap<String, String>)) -> Self {
Self {
message,
Expand Down Expand Up @@ -84,7 +91,7 @@ impl PubSubClient {
timeout: Option<Duration>,
) -> Result<Vec<String>, Error>
where
M: PublishedMessage,
M: Serialize,
E: Into<PublishedMessageEnvelope<M>> + Debug,
{
let bytes = envelopes
Expand All @@ -99,7 +106,7 @@ impl PubSubClient {
.collect::<Result<Vec<_>, _>>();

let messages = bytes
.map_err(|source| Error::Serialize { source })?
.map_err(Error::Serialize)?
.into_iter()
.map(|(bytes, attributes)| RawPublishedMessage {
data: Some(STANDARD.encode(bytes)),
Expand Down Expand Up @@ -130,7 +137,7 @@ impl PubSubClient {
let message_ids = response
.json::<PublishResponse>()
.await
.map_err(|source| Error::UnexpectedHttpResponse { source })?
.map_err(Error::UnexpectedHttpResponse)?
.message_ids;
debug!(
message = "Request was successful",
Expand Down
Loading

0 comments on commit 97e414f

Please sign in to comment.