Skip to content

Commit

Permalink
Remove nexus/pgerror (#898)
Browse files Browse the repository at this point in the history
There's no need to implement a string error since `Box<dyn Error>` implements `From<String>`/`From<&'static str>`

In a few places I opted to box original error rather than box original error's error string
  • Loading branch information
serprex authored Dec 26, 2023
1 parent 7ffce7c commit a8e07d9
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 338 deletions.
41 changes: 0 additions & 41 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ members = [
"peer-cursor",
"peer-postgres",
"peer-snowflake",
"pgerror",
"postgres-connection",
"pt",
"server",
Expand Down
1 change: 0 additions & 1 deletion nexus/peer-bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ dashmap = "5.0"
futures = { version = "0.3.28", features = ["executor"] }
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
pgerror = { path = "../pgerror" }
pgwire = "0.18"
pt = { path = "../pt" }
rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] }
Expand Down
31 changes: 6 additions & 25 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use gcp_bigquery_client::{
};
use peer_connections::PeerConnectionTracker;
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef};
use pgerror::PgError;
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pt::peerdb_peers::BigqueryConfig;
use sqlparser::ast::{CloseCursor, Expr, FetchDirection, Statement, Value};
Expand Down Expand Up @@ -76,9 +75,7 @@ impl BigQueryQueryExecutor {
.await
.map_err(|err| {
tracing::error!("error tracking query: {}", err);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
PgWireError::ApiError(err.into())
})?;

let result_set = self
Expand All @@ -88,16 +85,12 @@ impl BigQueryQueryExecutor {
.await
.map_err(|err| {
tracing::error!("error running query: {}", err);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
PgWireError::ApiError(err.into())
})?;

token.end().await.map_err(|err| {
tracing::error!("error closing tracking token: {}", err);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
PgWireError::ApiError(err.into())
})?;

Ok(result_set)
Expand All @@ -116,11 +109,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
bq_ast
.rewrite(&self.dataset_id, &mut query)
.context("unable to rewrite query")
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
})?;
.map_err(|err| PgWireError::ApiError(err.into()))?;

let query = query.to_string();
tracing::info!("bq rewritten query: {}", query);
Expand Down Expand Up @@ -170,11 +159,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
// If parsing the count resulted in an error, return an internal error
let count = match count {
Ok(c) => c,
Err(err) => {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
})))
}
Err(err) => return Err(PgWireError::ApiError(err.into())),
};

tracing::info!("fetching {} rows", count);
Expand Down Expand Up @@ -226,11 +211,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
bq_ast
.rewrite(&self.dataset_id, &mut query)
.context("unable to rewrite query")
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
})?;
.map_err(|err| PgWireError::ApiError(err.into()))?;

// add LIMIT 0 to the root level query.
// this is a workaround for the bigquery API not supporting DESCRIBE
Expand Down
21 changes: 7 additions & 14 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use gcp_bigquery_client::model::{
field_type::FieldType, query_response::ResultSet, table_field_schema::TableFieldSchema,
};
use peer_cursor::{Record, RecordStream, Schema, SchemaRef};
use pgerror::PgError;
use pgwire::{
api::{
results::{FieldFormat, FieldInfo},
Expand Down Expand Up @@ -181,19 +180,13 @@ impl BqRecordStream {
impl Stream for BqRecordStream {
type Item = PgWireResult<Record>;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.result_set.next_row() {
true => {
let record = this.convert_result_set_item(&this.result_set);
let result = record.map_err(|e| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error getting curent row: {}", e),
}))
});
Poll::Ready(Some(result))
}
false => Poll::Ready(None),
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.result_set.next_row() {
let record = self.convert_result_set_item(&self.result_set);
let result = record.map_err(|e| PgWireError::ApiError(e.into()));
Poll::Ready(Some(result))
} else {
Poll::Ready(None)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion nexus/peer-cursor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ edition = "2021"
anyhow = "1.0"
async-trait = "0.1"
futures = "0.3"
pgerror = { path = "../pgerror" }
pgwire = "0.18"
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
tokio = { version = "1.0", features = ["full"] }
Expand Down
16 changes: 7 additions & 9 deletions nexus/peer-cursor/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use futures::{stream, StreamExt};
use pgerror::PgError;
use pgwire::{
api::results::{DataRowEncoder, FieldInfo, QueryResponse, Response},
error::{PgWireError, PgWireResult},
Expand Down Expand Up @@ -48,14 +47,13 @@ fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()>
let s = u.to_string();
builder.encode_field(&s)
}
Value::Enum(_) | Value::Hstore(_) => {
Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"cannot write value {:?} in postgres protocol: unimplemented",
&value
),
})))
}
Value::Enum(_) | Value::Hstore(_) => Err(PgWireError::ApiError(
format!(
"cannot write value {:?} in postgres protocol: unimplemented",
&value
)
.into(),
)),
}
}

Expand Down
1 change: 0 additions & 1 deletion nexus/peer-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
pgerror = { path = "../pgerror" }
pgwire = "0.18"
postgres-connection = { path = "../postgres-connection" }
pt = { path = "../pt" }
Expand Down
21 changes: 5 additions & 16 deletions nexus/peer-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use peer_cursor::{QueryExecutor, QueryOutput, Schema, SchemaRef};
use pgerror::PgError;
use pgwire::{
api::results::{FieldFormat, FieldInfo},
error::{PgWireError, PgWireResult},
Expand Down Expand Up @@ -71,9 +70,7 @@ impl QueryExecutor for PostgresQueryExecutor {
.await
.map_err(|e| {
tracing::error!("error getting schema: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error getting schema: {}", e),
}))
PgWireError::ApiError(format!("error getting schema: {}", e).into())
})?;

tracing::info!("[peer-postgres] rewritten query: {}", rewritten_query);
Expand All @@ -86,9 +83,7 @@ impl QueryExecutor for PostgresQueryExecutor {
.await
.map_err(|e| {
tracing::error!("error executing query: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error executing query: {}", e),
}))
PgWireError::ApiError(format!("error executing query: {}", e).into())
})?;

// log that raw query execution has completed
Expand All @@ -101,9 +96,7 @@ impl QueryExecutor for PostgresQueryExecutor {
let mut rewritten_stmt = stmt.clone();
ast.rewrite_statement(&mut rewritten_stmt).map_err(|e| {
tracing::error!("error rewriting statement: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error rewriting statement: {}", e),
}))
PgWireError::ApiError(format!("error rewriting statement: {}", e).into())
})?;
let rewritten_query = rewritten_stmt.to_string();
tracing::info!("[peer-postgres] rewritten statement: {}", rewritten_query);
Expand All @@ -113,9 +106,7 @@ impl QueryExecutor for PostgresQueryExecutor {
.await
.map_err(|e| {
tracing::error!("error executing query: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error executing query: {}", e),
}))
PgWireError::ApiError(format!("error executing query: {}", e).into())
})?;
Ok(QueryOutput::AffectedRows(rows_affected as usize))
}
Expand All @@ -130,9 +121,7 @@ impl QueryExecutor for PostgresQueryExecutor {
.await
.map_err(|e| {
tracing::error!("error getting schema: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error getting schema: {}", e),
}))
PgWireError::ApiError(format!("error getting schema: {}", e).into())
})?;
Ok(Some(schema))
}
Expand Down
6 changes: 1 addition & 5 deletions nexus/peer-postgres/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use bytes::Bytes;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use futures::Stream;
use peer_cursor::{Record, RecordStream, SchemaRef};
use pgerror::PgError;
use pgwire::error::{PgWireError, PgWireResult};
use postgres_inet::MaskedIpAddr;
use rust_decimal::Decimal;
Expand Down Expand Up @@ -268,10 +267,7 @@ impl Stream for PgRecordStream {
Poll::Ready(Some(Ok(record)))
}
Poll::Ready(Some(Err(e))) => {
let err = Box::new(PgError::Internal {
err_msg: e.to_string(),
});
let err = PgWireError::ApiError(err);
let err = PgWireError::ApiError(Box::new(e));
Poll::Ready(Some(Err(err)))
}
Poll::Ready(None) => Poll::Ready(None),
Expand Down
1 change: 0 additions & 1 deletion nexus/peer-snowflake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ peer-cursor = { path = "../peer-cursor" }
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
value = { path = "../value" }
tracing = "0.1"
pgerror = { path = "../pgerror" }
secrecy = { version = "0.8.0" }
async-trait = "0.1.57"
jsonwebtoken = { version = "9.0", features = ["use_pem"] }
Expand Down
7 changes: 3 additions & 4 deletions nexus/peer-snowflake/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{
use anyhow::Context;
use base64::prelude::{Engine as _, BASE64_STANDARD};
use jsonwebtoken::{encode as jwt_encode, Algorithm, EncodingKey, Header};
use rsa::RsaPrivateKey;
use rsa::pkcs1::EncodeRsaPrivateKey;
use rsa::pkcs8::{DecodePrivateKey, EncodePublicKey};
use rsa::RsaPrivateKey;
use secrecy::{Secret, SecretString};
use serde::Serialize;
use sha2::{Digest, Sha256};
Expand Down Expand Up @@ -101,9 +101,8 @@ impl SnowflakeAuth {

#[tracing::instrument(name = "peer_sflake::auth_refresh_jwt", skip_all)]
fn refresh_jwt(&mut self) -> anyhow::Result<()> {
let private_key_jwt: EncodingKey = EncodingKey::from_rsa_der(
self.private_key.to_pkcs1_der()?.as_bytes(),
);
let private_key_jwt: EncodingKey =
EncodingKey::from_rsa_der(self.private_key.to_pkcs1_der()?.as_bytes());
self.last_refreshed = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
info!(
"Refreshing SnowFlake JWT for account: {} and user: {} at time {}",
Expand Down
Loading

0 comments on commit a8e07d9

Please sign in to comment.