Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Apr 30, 2024
1 parent dfec628 commit cb69597
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 128 deletions.
15 changes: 8 additions & 7 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::time::Duration;
use anyhow::Context;
use gcp_bigquery_client::{
model::{query_request::QueryRequest, query_response::ResultSet},
yup_oauth2,
Client,
yup_oauth2, Client,
};
use peer_connections::PeerConnectionTracker;
use peer_cursor::{CursorManager, CursorModification, QueryExecutor, QueryOutput, Schema};
Expand Down Expand Up @@ -159,11 +158,13 @@ impl QueryExecutor for BigQueryQueryExecutor {
} => n
.parse::<usize>()
.map_err(|err| PgWireError::ApiError(err.into()))?,
_ => return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"fdw_error".to_owned(),
"only FORWARD count and COUNT count are supported in FETCH".to_owned(),
)))),
_ => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"fdw_error".to_owned(),
"only FORWARD count and COUNT count are supported in FETCH".to_owned(),
))))
}
};

tracing::info!("fetching {} rows", count);
Expand Down
19 changes: 13 additions & 6 deletions nexus/peer-mysql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ impl QueryExecutor for MySqlQueryExecutor {
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput> {
// only support SELECT statements
match stmt {
Statement::Explain { analyze, format, statement, .. } => {
Statement::Explain {
analyze,
format,
statement,
..
} => {
if let Statement::Query(ref query) = **statement {
let mut query = query.clone();
ast::rewrite_query(&self.peer_name, &mut query);
Expand Down Expand Up @@ -145,11 +150,13 @@ impl QueryExecutor for MySqlQueryExecutor {
} => n
.parse::<usize>()
.map_err(|err| PgWireError::ApiError(err.into()))?,
_ => return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"fdw_error".to_owned(),
"only FORWARD count and COUNT count are supported in FETCH".to_owned(),
)))),
_ => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"fdw_error".to_owned(),
"only FORWARD count and COUNT count are supported in FETCH".to_owned(),
))))
}
};

tracing::info!("fetching {} rows", count);
Expand Down
83 changes: 42 additions & 41 deletions nexus/peer-mysql/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,50 +110,51 @@ pub fn mysql_row_to_values(row: Row) -> Vec<Value> {
row.unwrap()
.into_iter()
.zip(columns.iter())
.map(|(val, col)|
.map(|(val, col)| {
if val == mysql_async::Value::NULL {
Value::Null
} else {
match col.column_type() {
ColumnType::MYSQL_TYPE_NULL | ColumnType::MYSQL_TYPE_UNKNOWN => Value::Null,
ColumnType::MYSQL_TYPE_TINY => Value::TinyInt(from_value(val)),
ColumnType::MYSQL_TYPE_SHORT | ColumnType::MYSQL_TYPE_YEAR => {
Value::SmallInt(from_value(val))
}
ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_INT24 => {
Value::Integer(from_value(val))
}
ColumnType::MYSQL_TYPE_LONGLONG => Value::BigInt(from_value(val)),
ColumnType::MYSQL_TYPE_FLOAT => Value::Float(from_value(val)),
ColumnType::MYSQL_TYPE_DOUBLE => Value::Double(from_value(val)),
ColumnType::MYSQL_TYPE_DECIMAL | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
Value::Numeric(from_value(val))
}
ColumnType::MYSQL_TYPE_VARCHAR
| ColumnType::MYSQL_TYPE_VAR_STRING
| ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_ENUM
| ColumnType::MYSQL_TYPE_SET => Value::Text(from_value(val)),
ColumnType::MYSQL_TYPE_TINY_BLOB
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
| ColumnType::MYSQL_TYPE_LONG_BLOB
| ColumnType::MYSQL_TYPE_BLOB
| ColumnType::MYSQL_TYPE_BIT
| ColumnType::MYSQL_TYPE_GEOMETRY => {
Value::Binary(from_value::<Vec<u8>>(val).into())
}
ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_NEWDATE => {
Value::Date(from_value(val))
}
ColumnType::MYSQL_TYPE_TIME | ColumnType::MYSQL_TYPE_TIME2 => {
Value::Time(from_value(val))
}
ColumnType::MYSQL_TYPE_TIMESTAMP
| ColumnType::MYSQL_TYPE_TIMESTAMP2
| ColumnType::MYSQL_TYPE_DATETIME
| ColumnType::MYSQL_TYPE_DATETIME2 => Value::PostgresTimestamp(from_value(val)),
ColumnType::MYSQL_TYPE_JSON => Value::JsonB(from_value(val)),
ColumnType::MYSQL_TYPE_TYPED_ARRAY => Value::Null,
match col.column_type() {
ColumnType::MYSQL_TYPE_NULL | ColumnType::MYSQL_TYPE_UNKNOWN => Value::Null,
ColumnType::MYSQL_TYPE_TINY => Value::TinyInt(from_value(val)),
ColumnType::MYSQL_TYPE_SHORT | ColumnType::MYSQL_TYPE_YEAR => {
Value::SmallInt(from_value(val))
}
ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_INT24 => {
Value::Integer(from_value(val))
}
ColumnType::MYSQL_TYPE_LONGLONG => Value::BigInt(from_value(val)),
ColumnType::MYSQL_TYPE_FLOAT => Value::Float(from_value(val)),
ColumnType::MYSQL_TYPE_DOUBLE => Value::Double(from_value(val)),
ColumnType::MYSQL_TYPE_DECIMAL | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
Value::Numeric(from_value(val))
}
ColumnType::MYSQL_TYPE_VARCHAR
| ColumnType::MYSQL_TYPE_VAR_STRING
| ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_ENUM
| ColumnType::MYSQL_TYPE_SET => Value::Text(from_value(val)),
ColumnType::MYSQL_TYPE_TINY_BLOB
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
| ColumnType::MYSQL_TYPE_LONG_BLOB
| ColumnType::MYSQL_TYPE_BLOB
| ColumnType::MYSQL_TYPE_BIT
| ColumnType::MYSQL_TYPE_GEOMETRY => {
Value::Binary(from_value::<Vec<u8>>(val).into())
}
ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_NEWDATE => {
Value::Date(from_value(val))
}
ColumnType::MYSQL_TYPE_TIME | ColumnType::MYSQL_TYPE_TIME2 => {
Value::Time(from_value(val))
}
ColumnType::MYSQL_TYPE_TIMESTAMP
| ColumnType::MYSQL_TYPE_TIMESTAMP2
| ColumnType::MYSQL_TYPE_DATETIME
| ColumnType::MYSQL_TYPE_DATETIME2 => Value::PostgresTimestamp(from_value(val)),
ColumnType::MYSQL_TYPE_JSON => Value::JsonB(from_value(val)),
ColumnType::MYSQL_TYPE_TYPED_ARRAY => Value::Null,
}
}
})
.collect()
Expand Down
12 changes: 7 additions & 5 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,13 @@ impl QueryExecutor for SnowflakeQueryExecutor {
} => n
.parse::<usize>()
.map_err(|err| PgWireError::ApiError(err.into()))?,
_ => return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"fdw_error".to_owned(),
"only FORWARD count and COUNT count are supported in FETCH".to_owned(),
)))),
_ => {
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"fdw_error".to_owned(),
"only FORWARD count and COUNT count are supported in FETCH".to_owned(),
))))
}
};

tracing::info!("fetching {} rows", count);
Expand Down
126 changes: 57 additions & 69 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ use pgwire::{
md5pass::{hash_md5_password, MakeMd5PasswordAuthStartupHandler},
AuthSource, LoginInfo, Password, ServerParameterProvider,
},
portal::{Portal},
portal::Portal,
query::{ExtendedQueryHandler, SimpleQueryHandler},
results::{DescribeResponse, DescribePortalResponse, DescribeStatementResponse, Response, Tag},
results::{
DescribePortalResponse, DescribeResponse, DescribeStatementResponse, Response, Tag,
},
stmt::StoredStatement,
ClientInfo, MakeHandler, Type
ClientInfo, MakeHandler, Type,
},
error::{ErrorInfo, PgWireError, PgWireResult},
tokio::process_socket,
Expand Down Expand Up @@ -861,11 +863,7 @@ impl NexusBackend {
})
}

async fn do_describe(
&self,
stmt: &NexusParsedStatement,
) -> PgWireResult<Option<Schema>>
{
async fn do_describe(&self, stmt: &NexusParsedStatement) -> PgWireResult<Option<Schema>> {
tracing::info!("[eqp] do_describe: {}", stmt.query);
let stmt = &stmt.statement;
match stmt {
Expand All @@ -875,61 +873,47 @@ impl NexusBackend {
NexusStatement::Rollback { .. } => Ok(None),
NexusStatement::PeerQuery { stmt, assoc } => {
let schema: Option<Schema> = match assoc {
QueryAssociation::Peer(peer) => {
match &peer.config {
Some(Config::BigqueryConfig(_)) => {
let executor =
self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err)
.into(),
)
})?;
executor.describe(stmt).await?
}
Some(Config::MysqlConfig(_)) => {
let executor =
self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err)
.into(),
)
})?;
executor.describe(stmt).await?
}
Some(Config::PostgresConfig(_)) => {
let executor =
self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err)
.into(),
)
})?;
executor.describe(stmt).await?
}
Some(Config::SnowflakeConfig(_)) => {
let executor =
self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err)
.into(),
)
})?;
executor.describe(stmt).await?
}
_ => {
panic!("peer type not supported: {:?}", peer)
}
QueryAssociation::Peer(peer) => match &peer.config {
Some(Config::BigqueryConfig(_)) => {
let executor = self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err).into(),
)
})?;
executor.describe(stmt).await?
}
}
Some(Config::MysqlConfig(_)) => {
let executor = self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err).into(),
)
})?;
executor.describe(stmt).await?
}
Some(Config::PostgresConfig(_)) => {
let executor = self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err).into(),
)
})?;
executor.describe(stmt).await?
}
Some(Config::SnowflakeConfig(_)) => {
let executor = self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err).into(),
)
})?;
executor.describe(stmt).await?
}
_ => {
panic!("peer type not supported: {:?}", peer)
}
},
QueryAssociation::Catalog => self.catalog.describe(stmt).await?,
};

Ok(if self.peerdb_fdw_mode {
None
} else {
schema
})
Ok(if self.peerdb_fdw_mode { None } else { schema })
}
}
}
Expand Down Expand Up @@ -1037,11 +1021,13 @@ impl ExtendedQueryHandler for NexusBackend {
where
C: ClientInfo + Unpin + Send + Sync,
{
Ok(if let Some(schema) = self.do_describe(&target.statement.statement).await? {
DescribePortalResponse::new((*schema).clone())
} else {
DescribePortalResponse::no_data()
})
Ok(
if let Some(schema) = self.do_describe(&target.statement.statement).await? {
DescribePortalResponse::new((*schema).clone())
} else {
DescribePortalResponse::no_data()
},
)
}

async fn do_describe_statement<C>(
Expand All @@ -1052,11 +1038,13 @@ impl ExtendedQueryHandler for NexusBackend {
where
C: ClientInfo + Unpin + Send + Sync,
{
Ok(if let Some(schema) = self.do_describe(&target.statement).await? {
DescribeStatementResponse::new(target.parameter_types.clone(), (*schema).clone())
} else {
DescribeStatementResponse::no_data()
})
Ok(
if let Some(schema) = self.do_describe(&target.statement).await? {
DescribeStatementResponse::new(target.parameter_types.clone(), (*schema).clone())
} else {
DescribeStatementResponse::no_data()
},
)
}
}

Expand Down

0 comments on commit cb69597

Please sign in to comment.