Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove nexus/pgerror #898

Merged
merged 3 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 0 additions & 41 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ members = [
"peer-cursor",
"peer-postgres",
"peer-snowflake",
"pgerror",
"postgres-connection",
"pt",
"server",
Expand Down
1 change: 0 additions & 1 deletion nexus/peer-bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ dashmap = "5.0"
futures = { version = "0.3.28", features = ["executor"] }
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
pgerror = { path = "../pgerror" }
pgwire = "0.18"
pt = { path = "../pt" }
rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] }
Expand Down
31 changes: 6 additions & 25 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use gcp_bigquery_client::{
};
use peer_connections::PeerConnectionTracker;
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef};
use pgerror::PgError;
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pt::peerdb_peers::BigqueryConfig;
use sqlparser::ast::{CloseCursor, Expr, FetchDirection, Statement, Value};
Expand Down Expand Up @@ -76,9 +75,7 @@ impl BigQueryQueryExecutor {
.await
.map_err(|err| {
tracing::error!("error tracking query: {}", err);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
PgWireError::ApiError(err.into())
})?;

let result_set = self
Expand All @@ -88,16 +85,12 @@ impl BigQueryQueryExecutor {
.await
.map_err(|err| {
tracing::error!("error running query: {}", err);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
PgWireError::ApiError(err.into())
})?;

token.end().await.map_err(|err| {
tracing::error!("error closing tracking token: {}", err);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
PgWireError::ApiError(err.into())
})?;

Ok(result_set)
Expand All @@ -116,11 +109,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
bq_ast
.rewrite(&self.dataset_id, &mut query)
.context("unable to rewrite query")
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
})?;
.map_err(|err| PgWireError::ApiError(err.into()))?;

let query = query.to_string();
tracing::info!("bq rewritten query: {}", query);
Expand Down Expand Up @@ -170,11 +159,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
// If parsing the count resulted in an error, return an internal error
let count = match count {
Ok(c) => c,
Err(err) => {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
})))
}
Err(err) => return Err(PgWireError::ApiError(err.into())),
};

tracing::info!("fetching {} rows", count);
Expand Down Expand Up @@ -226,11 +211,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
bq_ast
.rewrite(&self.dataset_id, &mut query)
.context("unable to rewrite query")
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: err.to_string(),
}))
})?;
.map_err(|err| PgWireError::ApiError(err.into()))?;

// add LIMIT 0 to the root level query.
// this is a workaround for the bigquery API not supporting DESCRIBE
Expand Down
21 changes: 7 additions & 14 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use gcp_bigquery_client::model::{
field_type::FieldType, query_response::ResultSet, table_field_schema::TableFieldSchema,
};
use peer_cursor::{Record, RecordStream, Schema, SchemaRef};
use pgerror::PgError;
use pgwire::{
api::{
results::{FieldFormat, FieldInfo},
Expand Down Expand Up @@ -181,19 +180,13 @@ impl BqRecordStream {
impl Stream for BqRecordStream {
type Item = PgWireResult<Record>;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.result_set.next_row() {
true => {
let record = this.convert_result_set_item(&this.result_set);
let result = record.map_err(|e| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error getting curent row: {}", e),
}))
});
Poll::Ready(Some(result))
}
false => Poll::Ready(None),
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.result_set.next_row() {
let record = self.convert_result_set_item(&self.result_set);
let result = record.map_err(|e| PgWireError::ApiError(e.into()));
Poll::Ready(Some(result))
} else {
Poll::Ready(None)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion nexus/peer-cursor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ edition = "2021"
anyhow = "1.0"
async-trait = "0.1"
futures = "0.3"
pgerror = { path = "../pgerror" }
pgwire = "0.18"
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
tokio = { version = "1.0", features = ["full"] }
Expand Down
16 changes: 7 additions & 9 deletions nexus/peer-cursor/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use futures::{stream, StreamExt};
use pgerror::PgError;
use pgwire::{
api::results::{DataRowEncoder, FieldInfo, QueryResponse, Response},
error::{PgWireError, PgWireResult},
Expand Down Expand Up @@ -48,14 +47,13 @@ fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()>
let s = u.to_string();
builder.encode_field(&s)
}
Value::Enum(_) | Value::Hstore(_) => {
Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"cannot write value {:?} in postgres protocol: unimplemented",
&value
),
})))
}
Value::Enum(_) | Value::Hstore(_) => Err(PgWireError::ApiError(
format!(
"cannot write value {:?} in postgres protocol: unimplemented",
&value
)
.into(),
)),
}
}

Expand Down
1 change: 0 additions & 1 deletion nexus/peer-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
pgerror = { path = "../pgerror" }
pgwire = "0.18"
postgres-connection = { path = "../postgres-connection" }
pt = { path = "../pt" }
Expand Down
21 changes: 5 additions & 16 deletions nexus/peer-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use peer_cursor::{QueryExecutor, QueryOutput, Schema, SchemaRef};
use pgerror::PgError;
use pgwire::{
api::results::{FieldFormat, FieldInfo},
error::{PgWireError, PgWireResult},
Expand Down Expand Up @@ -71,9 +70,7 @@ impl QueryExecutor for PostgresQueryExecutor {
.await
.map_err(|e| {
tracing::error!("error getting schema: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error getting schema: {}", e),
}))
PgWireError::ApiError(format!("error getting schema: {}", e).into())
})?;

tracing::info!("[peer-postgres] rewritten query: {}", rewritten_query);
Expand All @@ -86,9 +83,7 @@ impl QueryExecutor for PostgresQueryExecutor {
.await
.map_err(|e| {
tracing::error!("error executing query: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error executing query: {}", e),
}))
PgWireError::ApiError(format!("error executing query: {}", e).into())
})?;

// log that raw query execution has completed
Expand All @@ -101,9 +96,7 @@ impl QueryExecutor for PostgresQueryExecutor {
let mut rewritten_stmt = stmt.clone();
ast.rewrite_statement(&mut rewritten_stmt).map_err(|e| {
tracing::error!("error rewriting statement: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error rewriting statement: {}", e),
}))
PgWireError::ApiError(format!("error rewriting statement: {}", e).into())
})?;
let rewritten_query = rewritten_stmt.to_string();
tracing::info!("[peer-postgres] rewritten statement: {}", rewritten_query);
Expand All @@ -113,9 +106,7 @@ impl QueryExecutor for PostgresQueryExecutor {
.await
.map_err(|e| {
tracing::error!("error executing query: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error executing query: {}", e),
}))
PgWireError::ApiError(format!("error executing query: {}", e).into())
})?;
Ok(QueryOutput::AffectedRows(rows_affected as usize))
}
Expand All @@ -130,9 +121,7 @@ impl QueryExecutor for PostgresQueryExecutor {
.await
.map_err(|e| {
tracing::error!("error getting schema: {}", e);
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error getting schema: {}", e),
}))
PgWireError::ApiError(format!("error getting schema: {}", e).into())
})?;
Ok(Some(schema))
}
Expand Down
6 changes: 1 addition & 5 deletions nexus/peer-postgres/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use bytes::Bytes;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use futures::Stream;
use peer_cursor::{Record, RecordStream, SchemaRef};
use pgerror::PgError;
use pgwire::error::{PgWireError, PgWireResult};
use postgres_inet::MaskedIpAddr;
use rust_decimal::Decimal;
Expand Down Expand Up @@ -268,10 +267,7 @@ impl Stream for PgRecordStream {
Poll::Ready(Some(Ok(record)))
}
Poll::Ready(Some(Err(e))) => {
let err = Box::new(PgError::Internal {
err_msg: e.to_string(),
});
let err = PgWireError::ApiError(err);
let err = PgWireError::ApiError(Box::new(e));
Poll::Ready(Some(Err(err)))
}
Poll::Ready(None) => Poll::Ready(None),
Expand Down
1 change: 0 additions & 1 deletion nexus/peer-snowflake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ peer-cursor = { path = "../peer-cursor" }
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git" }
value = { path = "../value" }
tracing = "0.1"
pgerror = { path = "../pgerror" }
secrecy = { version = "0.8.0" }
async-trait = "0.1.57"
jsonwebtoken = { version = "9.0", features = ["use_pem"] }
Expand Down
7 changes: 3 additions & 4 deletions nexus/peer-snowflake/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{
use anyhow::Context;
use base64::prelude::{Engine as _, BASE64_STANDARD};
use jsonwebtoken::{encode as jwt_encode, Algorithm, EncodingKey, Header};
use rsa::RsaPrivateKey;
use rsa::pkcs1::EncodeRsaPrivateKey;
use rsa::pkcs8::{DecodePrivateKey, EncodePublicKey};
use rsa::RsaPrivateKey;
use secrecy::{Secret, SecretString};
use serde::Serialize;
use sha2::{Digest, Sha256};
Expand Down Expand Up @@ -101,9 +101,8 @@ impl SnowflakeAuth {

#[tracing::instrument(name = "peer_sflake::auth_refresh_jwt", skip_all)]
fn refresh_jwt(&mut self) -> anyhow::Result<()> {
let private_key_jwt: EncodingKey = EncodingKey::from_rsa_der(
self.private_key.to_pkcs1_der()?.as_bytes(),
);
let private_key_jwt: EncodingKey =
EncodingKey::from_rsa_der(self.private_key.to_pkcs1_der()?.as_bytes());
self.last_refreshed = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
info!(
"Refreshing SnowFlake JWT for account: {} and user: {} at time {}",
Expand Down
Loading
Loading