From 8bc7c4da66ef013ce4d3d39f4b337f3ba8cb5928 Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Thu, 12 Oct 2023 14:57:01 +0200 Subject: [PATCH] feat: get rid of PublishedMessage derive (#18) * feat: get rid of PublishedMessage derive * fix * fixes --- .blackbox/blackbox-files.txt | 2 +- .github/workflows/ci.yaml | 2 +- .gitignore | 2 +- Cargo.toml | 44 ++++++-------- README.md | 6 +- .../examples => examples}/simple.rs | 4 +- .../examples => examples}/transform.rs | 5 +- pub-sub-client-derive/Cargo.toml | 20 ------- pub-sub-client-derive/src/lib.rs | 27 --------- pub-sub-client-tests/Cargo.toml | 24 -------- pub-sub-client/Cargo.toml | 31 ---------- pub-sub-client/src/error/mod.rs | 54 ------------------ release.toml | 4 +- .../active-road-365118-2eca6b7b8fd9.json.gpg | Bin src/error/mod.rs | 52 +++++++++++++++++ {pub-sub-client/src => src}/lib.rs | 53 ++++++++--------- {pub-sub-client/src => src}/publisher/mod.rs | 23 +++++--- {pub-sub-client/src => src}/subscriber/mod.rs | 31 +++++----- .../test.rs => tests/integration_test.rs | 6 +- .../tests => tests}/invalid_key.json | 0 20 files changed, 139 insertions(+), 251 deletions(-) rename {pub-sub-client-tests/examples => examples}/simple.rs (93%) rename {pub-sub-client-tests/examples => examples}/transform.rs (95%) delete mode 100644 pub-sub-client-derive/Cargo.toml delete mode 100644 pub-sub-client-derive/src/lib.rs delete mode 100644 pub-sub-client-tests/Cargo.toml delete mode 100644 pub-sub-client/Cargo.toml delete mode 100644 pub-sub-client/src/error/mod.rs rename {pub-sub-client-tests/secrets => secrets}/active-road-365118-2eca6b7b8fd9.json.gpg (100%) create mode 100644 src/error/mod.rs rename {pub-sub-client/src => src}/lib.rs (82%) rename {pub-sub-client/src => src}/publisher/mod.rs (89%) rename {pub-sub-client/src => src}/subscriber/mod.rs (92%) rename pub-sub-client-tests/tests/test.rs => tests/integration_test.rs (96%) rename {pub-sub-client/tests => tests}/invalid_key.json (100%) diff --git a/.blackbox/blackbox-files.txt b/.blackbox/blackbox-files.txt index b91f55a..9af42a3 100644 --- a/.blackbox/blackbox-files.txt +++ b/.blackbox/blackbox-files.txt @@ -1 +1 @@ -pub-sub-client-tests/secrets/active-road-365118-2eca6b7b8fd9.json +secrets/active-road-365118-2eca6b7b8fd9.json diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 096fc85..61e5cbb 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -44,5 +44,5 @@ jobs: env: GCP_SERVICE_ACCOUNT: ${{ secrets.GCP_SERVICE_ACCOUNT }} run: | - printenv GCP_SERVICE_ACCOUNT > pub-sub-client-tests/secrets/active-road-365118-2eca6b7b8fd9.json + printenv GCP_SERVICE_ACCOUNT > secrets/active-road-365118-2eca6b7b8fd9.json just test diff --git a/.gitignore b/.gitignore index fd842a1..8ed8ebe 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,4 @@ target /.blackbox/pubring.gpg~ /.blackbox/pubring.kbx~ /.blackbox/secring.gpg -/pub-sub-client-tests/secrets/active-road-365118-2eca6b7b8fd9.json +active-road-365118-2eca6b7b8fd9.json diff --git a/Cargo.toml b/Cargo.toml index 995e1a9..7c64515 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,14 +1,6 @@ -[workspace] -members = [ - "pub-sub-client", - "pub-sub-client-derive", - "pub-sub-client-tests", -] -resolver = "2" - -[workspace.package] -# No use to define version here, because cargo release ignores it! -# version = "0.10.1-alpha.0" +[package] +name = "pub-sub-client" +version = "0.11.1-alpha.0" edition = "2021" description = "Google Cloud Pub/Sub client library" authors = [ "Heiko Seeberger " ] @@ -17,23 +9,21 @@ readme = "README.md" homepage = "https://github.com/hseeberger/pub-sub-client" repository = "https://github.com/hseeberger/pub-sub-client" documentation = "https://github.com/hseeberger/pub-sub-client" -publish = true -[workspace.dependencies] +[dependencies] +base64 = { version = "0.21" } +goauth = { version = "0.13" } +reqwest = { version = "0.11", features = [ "json" ] } +serde = { version = "1.0", features = [ "derive" ] } +serde_json = { version = "1.0" } +smpl_jwt = { version = "0.7" } +thiserror = { version = "1.0" } +time = { version = "0.3", features = [ "serde-well-known" ] } +tracing = { version = "0.1" } + +[dev-dependencies] anyhow = { version = "1.0" } -base64 = { version = "0.21" } -goauth = { version = "0.13" } -proc-macro2 = { version = "1.0" } -quote = { version = "1.0" } -reqwest = { version = "0.11" } -serde = { version = "1.0" } -serde_json = { version = "1.0" } -smpl_jwt = { version = "0.7" } -syn = { version = "2.0" } testcontainers = { version = "0.15" } testcontainers-modules = { version = "0.1", features = [ "google_cloud_sdk_emulators" ] } -thiserror = { version = "1.0" } -time = { version = "0.3" } -tokio = { version = "1" } -tracing = { version = "0.1" } -tracing-subscriber = { version = "0.3" } +tokio = { version = "1", features = [ "macros", "rt-multi-thread" ] } +tracing-subscriber = { version = "0.3", features = [ "env-filter", "fmt", "json" ] } diff --git a/README.md b/README.md index fd39014..3716ec5 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Google Cloud Pub/Sub client library in [Rust](https://www.rust-lang.org/). Currently publishing, pulling and acknowledging are supported, but no management tasks like creating topics or subscriptions. -Messages can either be published/pulled as raw or, if the payload is JSON data, serialized from/deserialized into domain messages (structs or enums) via [Serde](https://serde.rs/) and [Serde JSON](https://docs.serde.rs/serde_json). Both raw `ReceivedMessages` and "typed" `PulledMessages` expose metadata like message ID, acknowledge ID, attributes, etc. +Messages can either be published/pulled as raw or, if the payload is JSON data, serialized from/deserialized into domain messages (structs or enums) via [Serde](https://serde.rs/) and [Serde JSON](https://docs.serde.rs/serde_json). Both raw `RawPulledMessage`s and "typed" `PulledMessage`s expose metadata like message ID, acknowledge ID, attributes, etc. Aside from straight forward deserialization it is also possible to first transform the pulled JSON values before deserizlizing into domain messages which allows for generally adjusting the JSON structure as well as schema evolution. @@ -22,13 +22,13 @@ Aside from straight forward deserialization it is also possible to first transfo Typically we want to use domain message: ``` rust -#[derive(Debug, Deserialize, Serialize, PublishedMessage)] +#[derive(Debug, Serialize, Deserialize)] struct Message { text: String, } ``` -In order to publish `Message`, we need to derive `Serialize` and `PublishedMessage` and to pull it we need to derive `Deserialize`. +To publish `Message` we need to derive `Serialize` and to pull it we need to derive `Deserialize`. First create a `PubSubClient`, giving the path to a service account key file and the duration to refresh access tokens before they expire: diff --git a/pub-sub-client-tests/examples/simple.rs b/examples/simple.rs similarity index 93% rename from pub-sub-client-tests/examples/simple.rs rename to examples/simple.rs index 73c0d0b..8b10d87 100644 --- a/pub-sub-client-tests/examples/simple.rs +++ b/examples/simple.rs @@ -1,11 +1,11 @@ -use pub_sub_client::{Error, PubSubClient, PublishedMessage}; +use pub_sub_client::{Error, PubSubClient}; use serde::{Deserialize, Serialize}; use std::{env, error::Error as _, time::Duration}; const TOPIC_ID: &str = "test"; const SUBSCRIPTION_ID: &str = "test"; -#[derive(Debug, Deserialize, Serialize, PublishedMessage)] +#[derive(Debug, Serialize, Deserialize)] struct Message { text: String, } diff --git a/pub-sub-client-tests/examples/transform.rs b/examples/transform.rs similarity index 95% rename from pub-sub-client-tests/examples/transform.rs rename to examples/transform.rs index d2de8ea..52e3a8d 100644 --- a/pub-sub-client-tests/examples/transform.rs +++ b/examples/transform.rs @@ -1,8 +1,7 @@ use anyhow::anyhow; use base64::{engine::general_purpose::STANDARD, Engine}; use pub_sub_client::{ - Error, PubSubClient, PublishedMessage, PulledMessage, RawPublishedMessage, - RawPulledMessageEnvelope, + Error, PubSubClient, PulledMessage, RawPublishedMessage, RawPulledMessageEnvelope, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -11,7 +10,7 @@ use std::{collections::HashMap, env, error::Error as _, time::Duration}; const TOPIC_ID: &str = "test"; const SUBSCRIPTION_ID: &str = "test"; -#[derive(Debug, Deserialize, Serialize, PublishedMessage)] +#[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type")] #[allow(dead_code)] enum Message { diff --git a/pub-sub-client-derive/Cargo.toml b/pub-sub-client-derive/Cargo.toml deleted file mode 100644 index 56f8f4c..0000000 --- a/pub-sub-client-derive/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "pub-sub-client-derive" -version = "0.11.1-alpha.0" -description = { workspace = true } -edition = { workspace = true } -authors = { workspace = true } -license = { workspace = true } -readme = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -documentation = { workspace = true } -publish = { workspace = true } - -[lib] -proc-macro = true - -[dependencies] -proc-macro2 = { workspace = true } -quote = { workspace = true } -syn = { workspace = true } diff --git a/pub-sub-client-derive/src/lib.rs b/pub-sub-client-derive/src/lib.rs deleted file mode 100644 index 7f87c57..0000000 --- a/pub-sub-client-derive/src/lib.rs +++ /dev/null @@ -1,27 +0,0 @@ -use proc_macro::TokenStream; -use proc_macro2::Span; -use quote::quote; -use syn::{parse_macro_input, Data, DeriveInput, Error, Ident}; - -#[proc_macro_derive(PublishedMessage)] -pub fn message_macro_derive(input: TokenStream) -> TokenStream { - let derive_input = parse_macro_input!(input as DeriveInput); - - match derive_input.data { - Data::Union(_) => no_union(), - _ => impl_published_message(derive_input.ident), - } -} - -fn no_union() -> TokenStream { - let e = Error::new( - Span::call_site(), - "pub-sub-client and Serde do not support derive for unions", - ); - let e = e.to_compile_error(); - quote!(#e).into() -} - -fn impl_published_message(name: Ident) -> TokenStream { - quote!(impl PublishedMessage for #name {}).into() -} diff --git a/pub-sub-client-tests/Cargo.toml b/pub-sub-client-tests/Cargo.toml deleted file mode 100644 index 9d9fdc4..0000000 --- a/pub-sub-client-tests/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "pub-sub-client-tests" -version = "0.11.1-alpha.0" -description = { workspace = true } -edition = { workspace = true } -authors = { workspace = true } -license = { workspace = true } -readme = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -documentation = { workspace = true } -publish = false - -[dev-dependencies] -pub-sub-client = { version = "=0.11.1-alpha.0", path = "../pub-sub-client", features = [ "derive" ] } -anyhow = { workspace = true } -base64 = { workspace = true } -reqwest = { workspace = true, features = [ "json" ] } -serde = { workspace = true, features = [ "derive" ] } -serde_json = { workspace = true } -testcontainers = { workspace = true } -testcontainers-modules = { workspace = true } -tokio = { workspace = true, features = [ "full" ] } -tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt", "json", "tracing-log" ] } diff --git a/pub-sub-client/Cargo.toml b/pub-sub-client/Cargo.toml deleted file mode 100644 index 515da08..0000000 --- a/pub-sub-client/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "pub-sub-client" -version = "0.11.1-alpha.0" -description = { workspace = true } -edition = { workspace = true } -authors = { workspace = true } -license = { workspace = true } -readme = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -documentation = { workspace = true } -publish = { workspace = true } -exclude = [ "tests" ] - -[features] -derive = [ "pub-sub-client-derive" ] - -[dependencies] -pub-sub-client-derive = { version = "=0.11.1-alpha.0", optional = true, path = "../pub-sub-client-derive" } -base64 = { workspace = true } -goauth = { workspace = true } -reqwest = { workspace = true, features = [ "json" ] } -serde = { workspace = true, features = [ "derive" ] } -serde_json = { workspace = true } -smpl_jwt = { workspace = true } -thiserror = { workspace = true } -time = { workspace = true, features = [ "serde-well-known" ] } -tracing = { workspace = true } - -[dev-dependencies] -anyhow = { workspace = true } diff --git a/pub-sub-client/src/error/mod.rs b/pub-sub-client/src/error/mod.rs deleted file mode 100644 index d3c42b3..0000000 --- a/pub-sub-client/src/error/mod.rs +++ /dev/null @@ -1,54 +0,0 @@ -use reqwest::Response; -use serde_json::Value; -use std::{convert::identity, error::Error as StdError}; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum Error { - #[error("Initialization error: {reason}")] - Initialization { - reason: String, - source: Box, - }, - - #[error("Getting authentication token failed")] - TokenFetch { source: Box }, - - #[error("HTTP communication with Pub/Sub service failed")] - HttpServiceCommunication { source: reqwest::Error }, - #[error("Unexpected HTTP status code `{0}` from Pub/Sub service: {1}")] - UnexpectedHttpStatusCode(reqwest::StatusCode, String), - #[error("Unexpected HTTP response from Pub/Sub service")] - UnexpectedHttpResponse { source: reqwest::Error }, - - #[error("Decoding data of received message as Base64 failed")] - NoBase64 { source: base64::DecodeError }, - #[error("PubSubMessage contains no data")] - NoData, - #[error("Deserializing data of received message failed")] - Deserialize { source: serde_json::Error }, - #[error("Serializing of message to be published failed")] - Serialize { source: serde_json::Error }, - #[error("Failed to transform JSON value")] - Transform { - source: Box, - }, -} - -impl Error { - pub async fn unexpected_http_status_code(response: Response) -> Error { - Error::UnexpectedHttpStatusCode( - response.status(), - response - .text() - .await - .map_err(|e| format!("Failed to get response body as text: {e}")) - .and_then(|text| { - serde_json::from_str::(&text) - .map(|v| v["error"]["message"].to_string()) - .map_err(|e| format!("Failed to parse error response: {e}")) - }) - .unwrap_or_else(identity), - ) - } -} diff --git a/release.toml b/release.toml index dad2ece..0f3257c 100644 --- a/release.toml +++ b/release.toml @@ -1,3 +1 @@ -pre-release-commit-message = "release: released {{crate_name}} v{{version}}" -post-release-commit-message = "release: bumping {{crate_name}} to v{{next_version}}" -dev-version = true +pre-release-commit-message = "release: v{{version}}" diff --git a/pub-sub-client-tests/secrets/active-road-365118-2eca6b7b8fd9.json.gpg b/secrets/active-road-365118-2eca6b7b8fd9.json.gpg similarity index 100% rename from pub-sub-client-tests/secrets/active-road-365118-2eca6b7b8fd9.json.gpg rename to secrets/active-road-365118-2eca6b7b8fd9.json.gpg diff --git a/src/error/mod.rs b/src/error/mod.rs new file mode 100644 index 0000000..b25f338 --- /dev/null +++ b/src/error/mod.rs @@ -0,0 +1,52 @@ +use reqwest::Response; +use serde_json::Value; +use std::{convert::identity, error::Error as StdError}; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("initialization error: {reason}")] + Initialization { + reason: String, + source: Box, + }, + + #[error("getting authentication token failed")] + TokenFetch(#[from] Box), + + #[error("HTTP communication with Pub/Sub service failed")] + HttpServiceCommunication(#[source] reqwest::Error), + #[error("unexpected HTTP status code `{0}` from Pub/Sub service: {1}")] + UnexpectedHttpStatusCode(reqwest::StatusCode, String), + #[error("unexpected HTTP response from Pub/Sub service")] + UnexpectedHttpResponse(#[source] reqwest::Error), + + #[error("decoding data of received message as Base64 failed")] + DecodeBase64(#[source] base64::DecodeError), + #[error("message contains no data")] + NoData, + #[error("deserializing data of received message failed")] + Deserialize(#[source] serde_json::Error), + #[error("serializing of message to be published failed")] + Serialize(#[source] serde_json::Error), + #[error("failed to transform JSON value")] + Transform(#[source] Box), +} + +impl Error { + pub async fn unexpected_http_status_code(response: Response) -> Error { + Error::UnexpectedHttpStatusCode( + response.status(), + response + .text() + .await + .map_err(|e| format!("failed to get response body as text: {e}")) + .and_then(|text| { + serde_json::from_str::(&text) + .map_err(|e| format!("failed to parse error response: {e}")) + .map(|v| v["error"]["message"].to_string()) + }) + .unwrap_or_else(identity), + ) + } +} diff --git a/pub-sub-client/src/lib.rs b/src/lib.rs similarity index 82% rename from pub-sub-client/src/lib.rs rename to src/lib.rs index 8ffe9fc..0c48d6f 100644 --- a/pub-sub-client/src/lib.rs +++ b/src/lib.rs @@ -6,14 +6,15 @@ pub use error::*; pub use publisher::*; pub use subscriber::*; -#[cfg(feature = "derive")] -pub use pub_sub_client_derive::*; - use goauth::{auth::JwtClaims, credentials::Credentials, fetcher::TokenFetcher, scopes::Scope}; use reqwest::Response; use serde::Serialize; use smpl_jwt::Jwt; -use std::{env, time::Duration}; +use std::{ + env, + fmt::{self, Debug, Formatter}, + time::Duration, +}; const BASE_URL_ENV_VAR: &str = "PUB_SUB_BASE_URL"; const DEFAULT_BASE_URL: &str = "https://pubsub.googleapis.com"; @@ -24,20 +25,15 @@ pub struct PubSubClient { reqwest_client: reqwest::Client, } -impl std::fmt::Debug for PubSubClient { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PubSubClient") - .field("project_url", &self.project_url) - .finish() - } -} - impl PubSubClient { - pub fn new>(key_path: T, refresh_buffer: Duration) -> Result { + pub fn new(key_path: T, refresh_buffer: Duration) -> Result + where + T: AsRef, + { let key_path = key_path.as_ref(); let credentials = Credentials::from_file(key_path).map_err(|source| Error::Initialization { - reason: format!("Missing or malformed service account key at `{key_path}`"), + reason: format!("missing or malformed service account key at `{key_path}`"), source: source.into(), })?; @@ -56,7 +52,7 @@ impl PubSubClient { credentials .rsa_key() .map_err(|source| Error::Initialization { - reason: format!("Malformed private key in service account key at `{key_path}`"), + reason: format!("malformed private key in service account key at `{key_path}`"), source: source.into(), })?, None, @@ -65,7 +61,7 @@ impl PubSubClient { let refresh_buffer = refresh_buffer .try_into() .map_err(|source| Error::Initialization { - reason: format!("Invalid refresh_buffer `{refresh_buffer:?}`"), + reason: format!("invalid refresh_buffer `{refresh_buffer:?}`"), source: Box::new(source), })?; @@ -76,19 +72,16 @@ impl PubSubClient { }) } - async fn send_request( + async fn send_request( &self, url: &str, request: &R, timeout: Option, - ) -> Result { - let token = self - .token_fetcher - .fetch_token() - .await - .map_err(|source| Error::TokenFetch { - source: Box::new(source), - })?; + ) -> Result + where + R: Serialize, + { + let token = self.token_fetcher.fetch_token().await.map_err(Box::new)?; let request = self .reqwest_client @@ -100,7 +93,15 @@ impl PubSubClient { request .send() .await - .map_err(|source| Error::HttpServiceCommunication { source }) + .map_err(Error::HttpServiceCommunication) + } +} + +impl Debug for PubSubClient { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("PubSubClient") + .field("project_url", &self.project_url) + .finish() } } diff --git a/pub-sub-client/src/publisher/mod.rs b/src/publisher/mod.rs similarity index 89% rename from pub-sub-client/src/publisher/mod.rs rename to src/publisher/mod.rs index c988bf6..f2c64b2 100644 --- a/pub-sub-client/src/publisher/mod.rs +++ b/src/publisher/mod.rs @@ -4,14 +4,18 @@ use serde::{Deserialize, Serialize}; use std::{collections::HashMap, fmt::Debug, time::Duration}; use tracing::debug; -pub trait PublishedMessage: Serialize {} - -pub struct PublishedMessageEnvelope { +pub struct PublishedMessageEnvelope +where + M: Serialize, +{ message: M, attributes: Option>, } -impl From for PublishedMessageEnvelope { +impl From for PublishedMessageEnvelope +where + M: Serialize, +{ fn from(message: M) -> Self { Self { message, @@ -20,7 +24,10 @@ impl From for PublishedMessageEnvelope { } } -impl From<(M, HashMap)> for PublishedMessageEnvelope { +impl From<(M, HashMap)> for PublishedMessageEnvelope +where + M: Serialize, +{ fn from((message, attributes): (M, HashMap)) -> Self { Self { message, @@ -84,7 +91,7 @@ impl PubSubClient { timeout: Option, ) -> Result, Error> where - M: PublishedMessage, + M: Serialize, E: Into> + Debug, { let bytes = envelopes @@ -99,7 +106,7 @@ impl PubSubClient { .collect::, _>>(); let messages = bytes - .map_err(|source| Error::Serialize { source })? + .map_err(Error::Serialize)? .into_iter() .map(|(bytes, attributes)| RawPublishedMessage { data: Some(STANDARD.encode(bytes)), @@ -130,7 +137,7 @@ impl PubSubClient { let message_ids = response .json::() .await - .map_err(|source| Error::UnexpectedHttpResponse { source })? + .map_err(Error::UnexpectedHttpResponse)? .message_ids; debug!( message = "Request was successful", diff --git a/pub-sub-client/src/subscriber/mod.rs b/src/subscriber/mod.rs similarity index 92% rename from pub-sub-client/src/subscriber/mod.rs rename to src/subscriber/mod.rs index bb5f144..6701732 100644 --- a/pub-sub-client/src/subscriber/mod.rs +++ b/src/subscriber/mod.rs @@ -7,7 +7,10 @@ use time::OffsetDateTime; use tracing::debug; #[derive(Debug)] -pub struct PulledMessage { +pub struct PulledMessage +where + M: DeserializeOwned, +{ pub ack_id: String, pub message: Result, pub attributes: Option>, @@ -59,12 +62,15 @@ struct AcknowledgeRequest<'a> { impl PubSubClient { #[tracing::instrument] - pub async fn pull( + pub async fn pull( &self, subscription_id: &str, max_messages: u32, timeout: Option, - ) -> Result>, Error> { + ) -> Result>, Error> + where + M: DeserializeOwned + Debug, + { self.pull_with_transform(subscription_id, max_messages, timeout, |_, value| Ok(value)) .await } @@ -110,7 +116,7 @@ impl PubSubClient { let envelopes = response .json::() .await - .map_err(|source| Error::UnexpectedHttpResponse { source })? + .map_err(Error::UnexpectedHttpResponse)? .envelopes; Ok(envelopes) @@ -165,21 +171,12 @@ where .data .as_ref() .ok_or(Error::NoData) - .and_then(|data| { - STANDARD - .decode(data) - .map_err(|source| Error::NoBase64 { source }) - }) + .and_then(|data| STANDARD.decode(data).map_err(Error::DecodeBase64)) .and_then(|bytes| { - serde_json::from_slice::(&bytes) - .map_err(|source| Error::Deserialize { source }) - }) - .and_then(|value| { - transform(&envelope, value).map_err(|source| Error::Transform { source }) + serde_json::from_slice::(&bytes).map_err(Error::Deserialize) }) - .and_then(|value| { - serde_json::from_value(value).map_err(|source| Error::Deserialize { source }) - }); + .and_then(|value| transform(&envelope, value).map_err(Error::Transform)) + .and_then(|value| serde_json::from_value(value).map_err(Error::Deserialize)); let RawPulledMessageEnvelope { ack_id, message: diff --git a/pub-sub-client-tests/tests/test.rs b/tests/integration_test.rs similarity index 96% rename from pub-sub-client-tests/tests/test.rs rename to tests/integration_test.rs index 3c4242f..f3020a7 100644 --- a/pub-sub-client-tests/tests/test.rs +++ b/tests/integration_test.rs @@ -1,5 +1,5 @@ use base64::{engine::general_purpose::STANDARD, Engine}; -use pub_sub_client::{PubSubClient, PublishedMessage, RawPublishedMessage}; +use pub_sub_client::{PubSubClient, RawPublishedMessage}; use reqwest::{Client, StatusCode}; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -12,7 +12,7 @@ const TOPIC_ID: &str = "test"; const SUBSCRIPTION_ID: &str = "test"; const TEXT: &str = "test-text"; -#[derive(Debug, Deserialize, Serialize, PublishedMessage, PartialEq)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] enum Message { Foo { text: String }, Bar { text: String }, @@ -198,7 +198,7 @@ async fn test() { HashMap::from([("version".to_string(), "v1".to_string())]), )]; let result = pub_sub_client - .publish(TOPIC_ID, messages, None, Some(Duration::from_secs(10))) + .publish::(TOPIC_ID, messages, None, Some(Duration::from_secs(10))) .await; assert!(result.is_ok()); let result = result.unwrap(); diff --git a/pub-sub-client/tests/invalid_key.json b/tests/invalid_key.json similarity index 100% rename from pub-sub-client/tests/invalid_key.json rename to tests/invalid_key.json