From 490e14f5fde0b04306aa070eeb88e0eacdfc5939 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 9 Jan 2024 16:02:53 +0000 Subject: [PATCH] Replace Schema/SchemaRef with type alias to Arc> SchemaRef was used everywhere, pgwire doesn't name the type, so when the api wanted an `Arc>` while we had `Arc }>` it required constructing `Arc::new(schema.fields.clone())`, an unnecessary clone --- nexus/analyzer/src/lib.rs | 8 ++++---- nexus/catalog/src/lib.rs | 9 ++++++--- nexus/peer-bigquery/src/cursor.rs | 4 ++-- nexus/peer-bigquery/src/lib.rs | 4 ++-- nexus/peer-bigquery/src/stream.rs | 15 ++++++++------- nexus/peer-cursor/src/lib.rs | 15 +++++---------- nexus/peer-cursor/src/util.rs | 21 +++++++-------------- nexus/peer-postgres/src/lib.rs | 8 ++++---- nexus/peer-postgres/src/stream.rs | 8 ++++---- nexus/peer-snowflake/src/cursor.rs | 4 ++-- nexus/peer-snowflake/src/lib.rs | 4 ++-- nexus/peer-snowflake/src/stream.rs | 16 ++++++++-------- nexus/server/src/main.rs | 6 +++--- 13 files changed, 57 insertions(+), 65 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 6e469eddcf..bbbbf531d8 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -10,8 +10,8 @@ use anyhow::Context; use pt::{ flow_model::{FlowJob, FlowJobTableMapping, QRepFlowJob}, peerdb_peers::{ - peer::Config, BigqueryConfig, DbType, EventHubConfig, MongoConfig, Peer, PostgresConfig, - S3Config, SnowflakeConfig, SqlServerConfig,ClickhouseConfig + peer::Config, BigqueryConfig, ClickhouseConfig, DbType, EventHubConfig, MongoConfig, Peer, + PostgresConfig, S3Config, SnowflakeConfig, SqlServerConfig, }, }; use qrep::process_options; @@ -798,7 +798,7 @@ fn parse_db_options( .get("s3_integration") .map(|s| s.to_string()) .unwrap_or_default(); - + let clickhouse_config = ClickhouseConfig { host: opts.get("host").context("no host specified")?.to_string(), port: opts @@ -822,7 +822,7 @@ fn parse_db_options( }; let config = Config::ClickhouseConfig(clickhouse_config); Some(config) - } + } }; Ok(config) diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index bb36277043..9dedbb7a33 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -143,7 +143,7 @@ impl Catalog { let config_len = clickhouse_config.encoded_len(); buf.reserve(config_len); clickhouse_config.encode(&mut buf)?; - } + } }; buf @@ -340,11 +340,14 @@ impl Catalog { Ok(Some(Config::EventhubGroupConfig(eventhub_group_config))) } Some(DbType::Clickhouse) => { - let err = format!("unable to decode {} options for peer {}", "clickhouse", name); + let err = format!( + "unable to decode {} options for peer {}", + "clickhouse", name + ); let clickhouse_config = pt::peerdb_peers::ClickhouseConfig::decode(options).context(err)?; Ok(Some(Config::ClickhouseConfig(clickhouse_config))) - } + } None => Ok(None), } } diff --git a/nexus/peer-bigquery/src/cursor.rs b/nexus/peer-bigquery/src/cursor.rs index 23812a382a..6d1459e83d 100644 --- a/nexus/peer-bigquery/src/cursor.rs +++ b/nexus/peer-bigquery/src/cursor.rs @@ -2,7 +2,7 @@ use dashmap::DashMap; use tokio::sync::Mutex; use futures::StreamExt; -use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream}; +use peer_cursor::{QueryExecutor, QueryOutput, Records, Schema, SendableStream}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use sqlparser::ast::Statement; @@ -11,7 +11,7 @@ use crate::BigQueryQueryExecutor; pub struct BigQueryCursor { position: usize, stream: Mutex, - schema: SchemaRef, + schema: Schema, } pub struct BigQueryCursorManager { diff --git a/nexus/peer-bigquery/src/lib.rs b/nexus/peer-bigquery/src/lib.rs index 29d58fb24f..e0f9fa99f3 100644 --- a/nexus/peer-bigquery/src/lib.rs +++ b/nexus/peer-bigquery/src/lib.rs @@ -7,7 +7,7 @@ use gcp_bigquery_client::{ Client, }; use peer_connections::PeerConnectionTracker; -use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef}; +use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use pt::peerdb_peers::BigqueryConfig; use sqlparser::ast::{CloseCursor, Expr, FetchDirection, Statement, Value}; @@ -200,7 +200,7 @@ impl QueryExecutor for BigQueryQueryExecutor { } // describe the output of the query - async fn describe(&self, stmt: &Statement) -> PgWireResult> { + async fn describe(&self, stmt: &Statement) -> PgWireResult> { // print the statement tracing::info!("[bigquery] describe: {}", stmt); // only support SELECT statements diff --git a/nexus/peer-bigquery/src/stream.rs b/nexus/peer-bigquery/src/stream.rs index a831f6818f..b5738d914f 100644 --- a/nexus/peer-bigquery/src/stream.rs +++ b/nexus/peer-bigquery/src/stream.rs @@ -1,6 +1,7 @@ use std::{ pin::Pin, str::FromStr, + sync::Arc, task::{Context, Poll}, }; @@ -9,7 +10,7 @@ use futures::Stream; use gcp_bigquery_client::model::{ field_type::FieldType, query_response::ResultSet, table_field_schema::TableFieldSchema, }; -use peer_cursor::{Record, RecordStream, Schema, SchemaRef}; +use peer_cursor::{Record, RecordStream, Schema}; use pgwire::{ api::{ results::{FieldFormat, FieldInfo}, @@ -22,7 +23,7 @@ use value::Value; #[derive(Debug)] pub struct BqSchema { - schema: SchemaRef, + schema: Schema, fields: Vec, } @@ -68,15 +69,15 @@ impl BqSchema { .as_ref() .expect("Schema fields are not present"); - let schema = SchemaRef::new(Schema { - fields: fields + let schema = Arc::new( + fields .iter() .map(|field| { let datatype = convert_field_type(&field.r#type); FieldInfo::new(field.name.clone(), None, None, datatype, FieldFormat::Text) }) .collect(), - }); + ); Self { schema, @@ -84,7 +85,7 @@ impl BqSchema { } } - pub fn schema(&self) -> SchemaRef { + pub fn schema(&self) -> Schema { self.schema.clone() } } @@ -192,7 +193,7 @@ impl Stream for BqRecordStream { } impl RecordStream for BqRecordStream { - fn schema(&self) -> SchemaRef { + fn schema(&self) -> Schema { self.schema.schema() } } diff --git a/nexus/peer-cursor/src/lib.rs b/nexus/peer-cursor/src/lib.rs index 7d2525a7df..7df8f971f1 100644 --- a/nexus/peer-cursor/src/lib.rs +++ b/nexus/peer-cursor/src/lib.rs @@ -7,27 +7,22 @@ use value::Value; pub mod util; -#[derive(Debug, Clone)] -pub struct Schema { - pub fields: Vec, -} - -pub type SchemaRef = Arc; +pub type Schema = Arc>; pub struct Record { pub values: Vec, - pub schema: SchemaRef, + pub schema: Schema, } pub trait RecordStream: Stream> { - fn schema(&self) -> SchemaRef; + fn schema(&self) -> Schema; } pub type SendableStream = Pin>; pub struct Records { pub records: Vec, - pub schema: SchemaRef, + pub schema: Schema, } #[derive(Debug, Clone)] @@ -50,7 +45,7 @@ pub enum QueryOutput { pub trait QueryExecutor: Send + Sync { async fn execute(&self, stmt: &Statement) -> PgWireResult; - async fn describe(&self, stmt: &Statement) -> PgWireResult>; + async fn describe(&self, stmt: &Statement) -> PgWireResult>; async fn is_connection_valid(&self) -> anyhow::Result; } diff --git a/nexus/peer-cursor/src/util.rs b/nexus/peer-cursor/src/util.rs index e9b9d55b00..ce47639c26 100644 --- a/nexus/peer-cursor/src/util.rs +++ b/nexus/peer-cursor/src/util.rs @@ -1,13 +1,11 @@ -use std::sync::Arc; - use futures::{stream, StreamExt}; use pgwire::{ - api::results::{DataRowEncoder, FieldInfo, QueryResponse, Response}, + api::results::{DataRowEncoder, QueryResponse, Response}, error::{PgWireError, PgWireResult}, }; use value::Value; -use crate::{Records, SchemaRef, SendableStream}; +use crate::{Records, Schema, SendableStream}; fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()> { match value { @@ -58,11 +56,10 @@ fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()> } pub fn sendable_stream_to_query_response<'a>( - schema: SchemaRef, + schema: Schema, record_stream: SendableStream, ) -> PgWireResult> { - let pg_schema: Arc> = Arc::new(schema.fields.clone()); - let schema_copy = pg_schema.clone(); + let schema_copy = schema.clone(); let data_row_stream = record_stream .map(move |record_result| { @@ -76,15 +73,11 @@ pub fn sendable_stream_to_query_response<'a>( }) .boxed(); - Ok(Response::Query(QueryResponse::new( - pg_schema, - data_row_stream, - ))) + Ok(Response::Query(QueryResponse::new(schema, data_row_stream))) } pub fn records_to_query_response<'a>(records: Records) -> PgWireResult> { - let pg_schema: Arc> = Arc::new(records.schema.fields.clone()); - let schema_copy = pg_schema.clone(); + let schema_copy = records.schema.clone(); let data_row_stream = stream::iter(records.records) .map(move |record| { @@ -97,7 +90,7 @@ pub fn records_to_query_response<'a>(records: Records) -> PgWireResult anyhow::Result { + pub async fn schema_from_query(&self, query: &str) -> anyhow::Result { let prepared = self.client.prepare_typed(query, &[]).await?; let fields: Vec = prepared @@ -42,7 +42,7 @@ impl PostgresQueryExecutor { }) .collect(); - Ok(Arc::new(Schema { fields })) + Ok(Arc::new(fields)) } } @@ -113,7 +113,7 @@ impl QueryExecutor for PostgresQueryExecutor { } } - async fn describe(&self, stmt: &Statement) -> PgWireResult> { + async fn describe(&self, stmt: &Statement) -> PgWireResult> { match stmt { Statement::Query(_query) => { let schema = self diff --git a/nexus/peer-postgres/src/stream.rs b/nexus/peer-postgres/src/stream.rs index 21905c1cc6..230d2dca7d 100644 --- a/nexus/peer-postgres/src/stream.rs +++ b/nexus/peer-postgres/src/stream.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use futures::Stream; -use peer_cursor::{Record, RecordStream, SchemaRef}; +use peer_cursor::{Record, RecordStream, Schema}; use pgwire::error::{PgWireError, PgWireResult}; use postgres_inet::MaskedIpAddr; use rust_decimal::Decimal; @@ -14,11 +14,11 @@ use uuid::Uuid; use value::{array::ArrayValue, Value}; pub struct PgRecordStream { row_stream: Pin>, - schema: SchemaRef, + schema: Schema, } impl PgRecordStream { - pub fn new(row_stream: RowStream, schema: SchemaRef) -> Self { + pub fn new(row_stream: RowStream, schema: Schema) -> Self { Self { row_stream: Box::pin(row_stream), schema, @@ -277,7 +277,7 @@ impl Stream for PgRecordStream { } impl RecordStream for PgRecordStream { - fn schema(&self) -> SchemaRef { + fn schema(&self) -> Schema { self.schema.clone() } } diff --git a/nexus/peer-snowflake/src/cursor.rs b/nexus/peer-snowflake/src/cursor.rs index 475a2d7f35..f4401c3d72 100644 --- a/nexus/peer-snowflake/src/cursor.rs +++ b/nexus/peer-snowflake/src/cursor.rs @@ -1,7 +1,7 @@ use crate::SnowflakeQueryExecutor; use dashmap::DashMap; use futures::StreamExt; -use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream}; +use peer_cursor::{QueryExecutor, QueryOutput, Records, Schema, SendableStream}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use sqlparser::ast::Statement; use tokio::sync::Mutex; @@ -9,7 +9,7 @@ use tokio::sync::Mutex; pub struct SnowflakeCursor { position: usize, stream: Mutex, - schema: SchemaRef, + schema: Schema, } pub struct SnowflakeCursorManager { diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index ac4d0154d9..c58ad68902 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -1,7 +1,7 @@ use anyhow::Context; use async_recursion::async_recursion; use cursor::SnowflakeCursorManager; -use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef}; +use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use sqlparser::dialect::GenericDialect; use sqlparser::parser; @@ -395,7 +395,7 @@ impl QueryExecutor for SnowflakeQueryExecutor { } } - async fn describe(&self, stmt: &Statement) -> PgWireResult> { + async fn describe(&self, stmt: &Statement) -> PgWireResult> { match stmt { Statement::Query(query) => { let mut new_query = query.clone(); diff --git a/nexus/peer-snowflake/src/stream.rs b/nexus/peer-snowflake/src/stream.rs index 4740270d12..efac7b7e1f 100644 --- a/nexus/peer-snowflake/src/stream.rs +++ b/nexus/peer-snowflake/src/stream.rs @@ -1,8 +1,7 @@ use crate::{auth::SnowflakeAuth, PartitionResult, ResultSet}; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc}; use futures::Stream; -use peer_cursor::Schema; -use peer_cursor::{Record, RecordStream, SchemaRef}; +use peer_cursor::{Record, RecordStream, Schema}; use pgwire::{ api::{ results::{FieldFormat, FieldInfo}, @@ -14,6 +13,7 @@ use secrecy::ExposeSecret; use serde::Deserialize; use std::{ pin::Pin, + sync::Arc, task::{Context, Poll}, }; use value::Value::{ @@ -40,7 +40,7 @@ pub(crate) enum SnowflakeDataType { } pub struct SnowflakeSchema { - schema: SchemaRef, + schema: Schema, } fn convert_field_type(field_type: &SnowflakeDataType) -> Type { @@ -63,20 +63,20 @@ impl SnowflakeSchema { pub fn from_result_set(result_set: &ResultSet) -> Self { let fields = result_set.resultSetMetaData.rowType.clone(); - let schema = SchemaRef::new(Schema { - fields: fields + let schema = Arc::new( + fields .iter() .map(|field| { let datatype = convert_field_type(&field.r#type); FieldInfo::new(field.name.clone(), None, None, datatype, FieldFormat::Text) }) .collect(), - }); + ); Self { schema } } - pub fn schema(&self) -> SchemaRef { + pub fn schema(&self) -> Schema { self.schema.clone() } } @@ -249,7 +249,7 @@ impl Stream for SnowflakeRecordStream { } impl RecordStream for SnowflakeRecordStream { - fn schema(&self) -> SchemaRef { + fn schema(&self) -> Schema { self.schema.schema() } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index bb2219512e..9cb5c888a3 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -17,7 +17,7 @@ use peer_bigquery::BigQueryQueryExecutor; use peer_connections::{PeerConnectionTracker, PeerConnections}; use peer_cursor::{ util::{records_to_query_response, sendable_stream_to_query_response}, - QueryExecutor, QueryOutput, SchemaRef, + QueryExecutor, QueryOutput, Schema, }; use peerdb_parser::{NexusParsedStatement, NexusQueryParser, NexusStatement}; use pgwire::{ @@ -1045,7 +1045,7 @@ impl ExtendedQueryHandler for NexusBackend { NexusStatement::PeerCursor { .. } => Ok(DescribeResponse::no_data()), NexusStatement::Empty => Ok(DescribeResponse::no_data()), NexusStatement::PeerQuery { stmt, assoc } => { - let schema: Option = match assoc { + let schema: Option = match assoc { QueryAssociation::Peer(peer) => { // if the peer is of type bigquery, let us route the query to bq. match &peer.config { @@ -1099,7 +1099,7 @@ impl ExtendedQueryHandler for NexusBackend { } else { Ok(DescribeResponse::new( param_types, - described_schema.fields.clone(), + (*described_schema).clone(), )) } } else {