Skip to content

Commit

Permalink
Improve URI connection error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
grtlr committed Feb 14, 2025
1 parent dd7f1ef commit bd72529
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 66 deletions.
4 changes: 4 additions & 0 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
pub mod message_proxy;
pub use message_proxy::MessageProxyUrl;
use redap::ConnectionError;

#[cfg(feature = "redap")]
pub mod redap;
Expand Down Expand Up @@ -41,6 +42,9 @@ pub enum StreamError {
#[error(transparent)]
Transport(#[from] tonic::transport::Error),

#[error(transparent)]
ConnectionError(#[from] ConnectionError),

#[error(transparent)]
TonicStatus(#[from] TonicStatusError),

Expand Down
114 changes: 84 additions & 30 deletions crates/store/re_grpc_client/src/redap/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,35 @@
//! `rerun://`, which is an alias for `rerun+https://`. These schemes are then
//! converted on the fly to either `http://` or `https://`.
use re_protos::remote_store::v0::storage_node_client::StorageNodeClient;
use std::net::Ipv4Addr;

/// The given url is not a valid Rerun storage node URL.
#[derive(thiserror::Error, Debug)]
#[error("URL {url:?} should follow rerun://host:port/recording/12345 for recording or rerun://host:port/catalog for catalog")]
pub struct InvalidRedapAddress {
url: String,
msg: String,
pub enum ConnectionError {
#[error("Connection error: {0}")]
Tonic(#[from] tonic::transport::Error),

#[error(transparent)]
ParseError(#[from] url::ParseError),

#[error("server is expecting an unencrypted connection (try `rerun+http://` if you are sure)")]
UnencryptedServer,

#[error("invalid or missing scheme (expected `rerun(+http|+https)://`)")]
InvalidScheme,

#[error("unexpected endpoint: {0}")]
UnexpectedEndpoint(String),

#[error("unexpected opaque origin: {0}")]
UnexpectedOpaqueOrigin(String),

#[error("unexpected base URL: {0}")]
UnexpectedBaseUrl(String),

/// The given url is not a valid Rerun storage node URL.
#[error("URL {url:?} should follow rerun://host:port/recording/12345 for recording or rerun://host:port/catalog for catalog")]
InvalidAddress { url: String, msg: String },
}

/// The different schemes supported by Rerun.
Expand Down Expand Up @@ -52,15 +73,65 @@ pub struct Origin {
}

impl Origin {
// TODO(#8411): figure out the right size for this
const MAX_DECODING_MESSAGE_SIZE: usize = usize::MAX;

// Converts an entire [`Origin`] to a `http` or `https` URL.
pub fn to_http_scheme(&self) -> String {
fn to_http_scheme(&self) -> String {
format!(
"{}://{}:{}",
self.scheme.to_http_scheme(),
self.host,
self.port
)
}

fn coerce_http_scheme(&self) -> String {
format!("http://{}:{}", self.host, self.port)
}

#[cfg(target_arch = "wasm32")]
pub fn client(
&self,
) -> Result<StorageNodeClient<tonic_web_wasm_client::Client>, ConnectionError> {
let tonic_client = tonic_web_wasm_client::Client::new_with_options(
self.to_http_scheme(),
tonic_web_wasm_client::options::FetchOptions::new(),
);

Ok(StorageNodeClient::new(tonic_client)
.max_decoding_message_size(Self::MAX_DECODING_MESSAGE_SIZE))
}

#[cfg(not(target_arch = "wasm32"))]
pub async fn client(
&self,
) -> Result<StorageNodeClient<tonic::transport::Channel>, ConnectionError> {
use tonic::transport::Endpoint;

match Endpoint::new(self.to_http_scheme())?
.tls_config(tonic::transport::ClientTlsConfig::new().with_enabled_roots())?
.connect()
.await
{
Ok(client) => Ok(StorageNodeClient::new(client)
.max_decoding_message_size(Self::MAX_DECODING_MESSAGE_SIZE)),
Err(original_error) => {
// If we can't establish a connection, we probe if the server is
// expecting unencrypted traffic. If that is the case, we return
// a more meaningful error message.
let Ok(endpoint) = Endpoint::new(self.coerce_http_scheme()) else {
return Err(ConnectionError::Tonic(original_error));
};

if endpoint.connect().await.is_ok() {
Err(ConnectionError::UnencryptedServer)
} else {
Err(ConnectionError::Tonic(original_error))
}
}
}
}
}

impl std::fmt::Display for Origin {
Expand Down Expand Up @@ -94,7 +165,7 @@ impl std::fmt::Display for RedapAddress {
}

impl TryFrom<&str> for RedapAddress {
type Error = InvalidRedapAddress;
type Error = ConnectionError;

fn try_from(value: &str) -> Result<Self, Self::Error> {
let (scheme, rewritten) = if value.starts_with("rerun://") {
Expand All @@ -107,26 +178,16 @@ impl TryFrom<&str> for RedapAddress {
value.replace("rerun+https://", "https://"),
))
} else {
Err(InvalidRedapAddress {
url: value.to_owned(),
msg: "Invalid scheme, expected `rerun://`,`rerun+http://`, or `rerun+https://`"
.to_owned(),
})
Err(ConnectionError::InvalidScheme)
}?;

// We have to first rewrite the endpoint, because `Url` does not allow
// `.set_scheme()` for non-opaque origins, nor does it return a proper
// `Origin` in that case.
let redap_endpoint = url::Url::parse(&rewritten).map_err(|err| InvalidRedapAddress {
url: value.to_owned(),
msg: err.to_string(),
})?;
let redap_endpoint = url::Url::parse(&rewritten)?;

let url::Origin::Tuple(_, host, port) = redap_endpoint.origin() else {
return Err(InvalidRedapAddress {
url: value.to_owned(),
msg: "Opaque origin".to_owned(),
});
return Err(ConnectionError::UnexpectedOpaqueOrigin(value.to_owned()));
};

if host == url::Host::<String>::Ipv4(Ipv4Addr::UNSPECIFIED) {
Expand All @@ -139,10 +200,7 @@ impl TryFrom<&str> for RedapAddress {
// adjusted when adding additional resources.
let segments = redap_endpoint
.path_segments()
.ok_or_else(|| InvalidRedapAddress {
url: value.to_owned(),
msg: "Cannot be a base URL".to_owned(),
})?
.ok_or_else(|| ConnectionError::UnexpectedBaseUrl(value.to_owned()))?
.take(2)
.collect::<Vec<_>>();

Expand All @@ -151,12 +209,8 @@ impl TryFrom<&str> for RedapAddress {
origin,
recording_id: (*recording_id).to_owned(),
}),
["catalog"] => Ok(Self::Catalog { origin }),

_ => Err(InvalidRedapAddress {
url: value.to_owned(),
msg: "Missing path'".to_owned(),
}),
["catalog"] | [] => Ok(Self::Catalog { origin }),
[unknown, ..] => Err(ConnectionError::UnexpectedEndpoint(format!("{unknown}/"))),
}
}
}
Expand Down
42 changes: 6 additions & 36 deletions crates/store/re_grpc_client/src/redap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use re_log_types::{
use re_protos::{
common::v0::RecordingId,
remote_store::v0::{
storage_node_client::StorageNodeClient, CatalogFilter, FetchRecordingRequest,
QueryCatalogRequest, CATALOG_APP_ID_FIELD_NAME, CATALOG_ID_FIELD_NAME,
CATALOG_START_TIME_FIELD_NAME,
CatalogFilter, FetchRecordingRequest, QueryCatalogRequest, CATALOG_APP_ID_FIELD_NAME,
CATALOG_ID_FIELD_NAME, CATALOG_START_TIME_FIELD_NAME,
},
};
use re_types::{
Expand All @@ -36,7 +35,7 @@ use re_types::{

mod address;

pub use address::{InvalidRedapAddress, RedapAddress};
pub use address::{ConnectionError, RedapAddress};

use crate::spawn_future;
use crate::StreamError;
Expand All @@ -54,7 +53,7 @@ const CATALOG_APPLICATION_ID: &str = "redap_catalog";
pub fn stream_from_redap(
url: String,
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> Result<re_smart_channel::Receiver<LogMsg>, InvalidRedapAddress> {
) -> Result<re_smart_channel::Receiver<LogMsg>, ConnectionError> {
re_log::debug!("Loading {url}…");

let address = url.as_str().try_into()?;
Expand Down Expand Up @@ -100,22 +99,7 @@ async fn stream_recording_async(
use tokio_stream::StreamExt as _;

re_log::debug!("Connecting to {origin}…");
let mut client = {
#[cfg(target_arch = "wasm32")]
let tonic_client = tonic_web_wasm_client::Client::new_with_options(
origin.to_http_scheme(),
tonic_web_wasm_client::options::FetchOptions::new(),
);

#[cfg(not(target_arch = "wasm32"))]
let tonic_client = tonic::transport::Endpoint::new(origin.to_http_scheme())?
.tls_config(tonic::transport::ClientTlsConfig::new().with_enabled_roots())?
.connect()
.await?;

// TODO(#8411): figure out the right size for this
StorageNodeClient::new(tonic_client).max_decoding_message_size(usize::MAX)
};
let mut client = origin.client().await?;

re_log::debug!("Fetching catalog data for {recording_id}…");

Expand Down Expand Up @@ -261,21 +245,7 @@ async fn stream_catalog_async(
use tokio_stream::StreamExt as _;

re_log::debug!("Connecting to {origin}…");
let mut client = {
#[cfg(target_arch = "wasm32")]
let tonic_client = tonic_web_wasm_client::Client::new_with_options(
origin.to_http_scheme(),
tonic_web_wasm_client::options::FetchOptions::new(),
);

#[cfg(not(target_arch = "wasm32"))]
let tonic_client = tonic::transport::Endpoint::new(origin.to_http_scheme())?
.tls_config(tonic::transport::ClientTlsConfig::new().with_enabled_roots())?
.connect()
.await?;

StorageNodeClient::new(tonic_client)
};
let mut client = origin.client().await?;

re_log::debug!("Fetching catalog…");

Expand Down

0 comments on commit bd72529

Please sign in to comment.