diff --git a/nexus/peer-bigquery/src/lib.rs b/nexus/peer-bigquery/src/lib.rs index dbcad94551..a7a8fb8f2f 100644 --- a/nexus/peer-bigquery/src/lib.rs +++ b/nexus/peer-bigquery/src/lib.rs @@ -3,8 +3,7 @@ use std::time::Duration; use anyhow::Context; use gcp_bigquery_client::{ model::{query_request::QueryRequest, query_response::ResultSet}, - yup_oauth2, - Client, + yup_oauth2, Client, }; use peer_connections::PeerConnectionTracker; use peer_cursor::{CursorManager, CursorModification, QueryExecutor, QueryOutput, Schema}; @@ -159,11 +158,13 @@ impl QueryExecutor for BigQueryQueryExecutor { } => n .parse::() .map_err(|err| PgWireError::ApiError(err.into()))?, - _ => return Err(PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "fdw_error".to_owned(), - "only FORWARD count and COUNT count are supported in FETCH".to_owned(), - )))), + _ => { + return Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "fdw_error".to_owned(), + "only FORWARD count and COUNT count are supported in FETCH".to_owned(), + )))) + } }; tracing::info!("fetching {} rows", count); diff --git a/nexus/peer-mysql/src/lib.rs b/nexus/peer-mysql/src/lib.rs index bfa7c81c50..d57e83b932 100644 --- a/nexus/peer-mysql/src/lib.rs +++ b/nexus/peer-mysql/src/lib.rs @@ -63,7 +63,12 @@ impl QueryExecutor for MySqlQueryExecutor { async fn execute(&self, stmt: &Statement) -> PgWireResult { // only support SELECT statements match stmt { - Statement::Explain { analyze, format, statement, .. } => { + Statement::Explain { + analyze, + format, + statement, + .. + } => { if let Statement::Query(ref query) = **statement { let mut query = query.clone(); ast::rewrite_query(&self.peer_name, &mut query); @@ -145,11 +150,13 @@ impl QueryExecutor for MySqlQueryExecutor { } => n .parse::() .map_err(|err| PgWireError::ApiError(err.into()))?, - _ => return Err(PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "fdw_error".to_owned(), - "only FORWARD count and COUNT count are supported in FETCH".to_owned(), - )))), + _ => { + return Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "fdw_error".to_owned(), + "only FORWARD count and COUNT count are supported in FETCH".to_owned(), + )))) + } }; tracing::info!("fetching {} rows", count); diff --git a/nexus/peer-mysql/src/stream.rs b/nexus/peer-mysql/src/stream.rs index 1cb791fe63..13f3c03cf4 100644 --- a/nexus/peer-mysql/src/stream.rs +++ b/nexus/peer-mysql/src/stream.rs @@ -110,50 +110,51 @@ pub fn mysql_row_to_values(row: Row) -> Vec { row.unwrap() .into_iter() .zip(columns.iter()) - .map(|(val, col)| + .map(|(val, col)| { if val == mysql_async::Value::NULL { Value::Null } else { - match col.column_type() { - ColumnType::MYSQL_TYPE_NULL | ColumnType::MYSQL_TYPE_UNKNOWN => Value::Null, - ColumnType::MYSQL_TYPE_TINY => Value::TinyInt(from_value(val)), - ColumnType::MYSQL_TYPE_SHORT | ColumnType::MYSQL_TYPE_YEAR => { - Value::SmallInt(from_value(val)) - } - ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_INT24 => { - Value::Integer(from_value(val)) - } - ColumnType::MYSQL_TYPE_LONGLONG => Value::BigInt(from_value(val)), - ColumnType::MYSQL_TYPE_FLOAT => Value::Float(from_value(val)), - ColumnType::MYSQL_TYPE_DOUBLE => Value::Double(from_value(val)), - ColumnType::MYSQL_TYPE_DECIMAL | ColumnType::MYSQL_TYPE_NEWDECIMAL => { - Value::Numeric(from_value(val)) - } - ColumnType::MYSQL_TYPE_VARCHAR - | ColumnType::MYSQL_TYPE_VAR_STRING - | ColumnType::MYSQL_TYPE_STRING - | ColumnType::MYSQL_TYPE_ENUM - | ColumnType::MYSQL_TYPE_SET => Value::Text(from_value(val)), - ColumnType::MYSQL_TYPE_TINY_BLOB - | ColumnType::MYSQL_TYPE_MEDIUM_BLOB - | ColumnType::MYSQL_TYPE_LONG_BLOB - | ColumnType::MYSQL_TYPE_BLOB - | ColumnType::MYSQL_TYPE_BIT - | ColumnType::MYSQL_TYPE_GEOMETRY => { - Value::Binary(from_value::>(val).into()) - } - ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_NEWDATE => { - Value::Date(from_value(val)) - } - ColumnType::MYSQL_TYPE_TIME | ColumnType::MYSQL_TYPE_TIME2 => { - Value::Time(from_value(val)) - } - ColumnType::MYSQL_TYPE_TIMESTAMP - | ColumnType::MYSQL_TYPE_TIMESTAMP2 - | ColumnType::MYSQL_TYPE_DATETIME - | ColumnType::MYSQL_TYPE_DATETIME2 => Value::PostgresTimestamp(from_value(val)), - ColumnType::MYSQL_TYPE_JSON => Value::JsonB(from_value(val)), - ColumnType::MYSQL_TYPE_TYPED_ARRAY => Value::Null, + match col.column_type() { + ColumnType::MYSQL_TYPE_NULL | ColumnType::MYSQL_TYPE_UNKNOWN => Value::Null, + ColumnType::MYSQL_TYPE_TINY => Value::TinyInt(from_value(val)), + ColumnType::MYSQL_TYPE_SHORT | ColumnType::MYSQL_TYPE_YEAR => { + Value::SmallInt(from_value(val)) + } + ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_INT24 => { + Value::Integer(from_value(val)) + } + ColumnType::MYSQL_TYPE_LONGLONG => Value::BigInt(from_value(val)), + ColumnType::MYSQL_TYPE_FLOAT => Value::Float(from_value(val)), + ColumnType::MYSQL_TYPE_DOUBLE => Value::Double(from_value(val)), + ColumnType::MYSQL_TYPE_DECIMAL | ColumnType::MYSQL_TYPE_NEWDECIMAL => { + Value::Numeric(from_value(val)) + } + ColumnType::MYSQL_TYPE_VARCHAR + | ColumnType::MYSQL_TYPE_VAR_STRING + | ColumnType::MYSQL_TYPE_STRING + | ColumnType::MYSQL_TYPE_ENUM + | ColumnType::MYSQL_TYPE_SET => Value::Text(from_value(val)), + ColumnType::MYSQL_TYPE_TINY_BLOB + | ColumnType::MYSQL_TYPE_MEDIUM_BLOB + | ColumnType::MYSQL_TYPE_LONG_BLOB + | ColumnType::MYSQL_TYPE_BLOB + | ColumnType::MYSQL_TYPE_BIT + | ColumnType::MYSQL_TYPE_GEOMETRY => { + Value::Binary(from_value::>(val).into()) + } + ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_NEWDATE => { + Value::Date(from_value(val)) + } + ColumnType::MYSQL_TYPE_TIME | ColumnType::MYSQL_TYPE_TIME2 => { + Value::Time(from_value(val)) + } + ColumnType::MYSQL_TYPE_TIMESTAMP + | ColumnType::MYSQL_TYPE_TIMESTAMP2 + | ColumnType::MYSQL_TYPE_DATETIME + | ColumnType::MYSQL_TYPE_DATETIME2 => Value::PostgresTimestamp(from_value(val)), + ColumnType::MYSQL_TYPE_JSON => Value::JsonB(from_value(val)), + ColumnType::MYSQL_TYPE_TYPED_ARRAY => Value::Null, + } } }) .collect() diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index 1e145a7beb..d04a369d32 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -357,11 +357,13 @@ impl QueryExecutor for SnowflakeQueryExecutor { } => n .parse::() .map_err(|err| PgWireError::ApiError(err.into()))?, - _ => return Err(PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "fdw_error".to_owned(), - "only FORWARD count and COUNT count are supported in FETCH".to_owned(), - )))), + _ => { + return Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "fdw_error".to_owned(), + "only FORWARD count and COUNT count are supported in FETCH".to_owned(), + )))) + } }; tracing::info!("fetching {} rows", count); diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index c4a0440b8d..51d2999890 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -26,11 +26,13 @@ use pgwire::{ md5pass::{hash_md5_password, MakeMd5PasswordAuthStartupHandler}, AuthSource, LoginInfo, Password, ServerParameterProvider, }, - portal::{Portal}, + portal::Portal, query::{ExtendedQueryHandler, SimpleQueryHandler}, - results::{DescribeResponse, DescribePortalResponse, DescribeStatementResponse, Response, Tag}, + results::{ + DescribePortalResponse, DescribeResponse, DescribeStatementResponse, Response, Tag, + }, stmt::StoredStatement, - ClientInfo, MakeHandler, Type + ClientInfo, MakeHandler, Type, }, error::{ErrorInfo, PgWireError, PgWireResult}, tokio::process_socket, @@ -861,11 +863,7 @@ impl NexusBackend { }) } - async fn do_describe( - &self, - stmt: &NexusParsedStatement, - ) -> PgWireResult> - { + async fn do_describe(&self, stmt: &NexusParsedStatement) -> PgWireResult> { tracing::info!("[eqp] do_describe: {}", stmt.query); let stmt = &stmt.statement; match stmt { @@ -875,61 +873,47 @@ impl NexusBackend { NexusStatement::Rollback { .. } => Ok(None), NexusStatement::PeerQuery { stmt, assoc } => { let schema: Option = match assoc { - QueryAssociation::Peer(peer) => { - match &peer.config { - Some(Config::BigqueryConfig(_)) => { - let executor = - self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError( - format!("unable to get peer executor: {:?}", err) - .into(), - ) - })?; - executor.describe(stmt).await? - } - Some(Config::MysqlConfig(_)) => { - let executor = - self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError( - format!("unable to get peer executor: {:?}", err) - .into(), - ) - })?; - executor.describe(stmt).await? - } - Some(Config::PostgresConfig(_)) => { - let executor = - self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError( - format!("unable to get peer executor: {:?}", err) - .into(), - ) - })?; - executor.describe(stmt).await? - } - Some(Config::SnowflakeConfig(_)) => { - let executor = - self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError( - format!("unable to get peer executor: {:?}", err) - .into(), - ) - })?; - executor.describe(stmt).await? - } - _ => { - panic!("peer type not supported: {:?}", peer) - } + QueryAssociation::Peer(peer) => match &peer.config { + Some(Config::BigqueryConfig(_)) => { + let executor = self.get_peer_executor(peer).await.map_err(|err| { + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) + })?; + executor.describe(stmt).await? } - } + Some(Config::MysqlConfig(_)) => { + let executor = self.get_peer_executor(peer).await.map_err(|err| { + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) + })?; + executor.describe(stmt).await? + } + Some(Config::PostgresConfig(_)) => { + let executor = self.get_peer_executor(peer).await.map_err(|err| { + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) + })?; + executor.describe(stmt).await? + } + Some(Config::SnowflakeConfig(_)) => { + let executor = self.get_peer_executor(peer).await.map_err(|err| { + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) + })?; + executor.describe(stmt).await? + } + _ => { + panic!("peer type not supported: {:?}", peer) + } + }, QueryAssociation::Catalog => self.catalog.describe(stmt).await?, }; - Ok(if self.peerdb_fdw_mode { - None - } else { - schema - }) + Ok(if self.peerdb_fdw_mode { None } else { schema }) } } } @@ -1037,11 +1021,13 @@ impl ExtendedQueryHandler for NexusBackend { where C: ClientInfo + Unpin + Send + Sync, { - Ok(if let Some(schema) = self.do_describe(&target.statement.statement).await? { - DescribePortalResponse::new((*schema).clone()) - } else { - DescribePortalResponse::no_data() - }) + Ok( + if let Some(schema) = self.do_describe(&target.statement.statement).await? { + DescribePortalResponse::new((*schema).clone()) + } else { + DescribePortalResponse::no_data() + }, + ) } async fn do_describe_statement( @@ -1052,11 +1038,13 @@ impl ExtendedQueryHandler for NexusBackend { where C: ClientInfo + Unpin + Send + Sync, { - Ok(if let Some(schema) = self.do_describe(&target.statement).await? { - DescribeStatementResponse::new(target.parameter_types.clone(), (*schema).clone()) - } else { - DescribeStatementResponse::no_data() - }) + Ok( + if let Some(schema) = self.do_describe(&target.statement).await? { + DescribeStatementResponse::new(target.parameter_types.clone(), (*schema).clone()) + } else { + DescribeStatementResponse::no_data() + }, + ) } }