Skip to content

Commit

Permalink
Replace Schema/SchemaRef with type alias to Arc<Vec<FieldInfo>>
Browse files Browse the repository at this point in the history
SchemaRef was used everywhere, pgwire doesn't name the type,
so when the api wanted an `Arc<Vec<FieldInfo>>` while we had
`Arc<Schema{ fields: Vec<FieldInfo> }>` it required constructing
`Arc::new(schema.fields.clone())`, an unnecessary clone
  • Loading branch information
serprex committed Jan 9, 2024
1 parent 8e613b5 commit 490e14f
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 65 deletions.
8 changes: 4 additions & 4 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use anyhow::Context;
use pt::{
flow_model::{FlowJob, FlowJobTableMapping, QRepFlowJob},
peerdb_peers::{
peer::Config, BigqueryConfig, DbType, EventHubConfig, MongoConfig, Peer, PostgresConfig,
S3Config, SnowflakeConfig, SqlServerConfig,ClickhouseConfig
peer::Config, BigqueryConfig, ClickhouseConfig, DbType, EventHubConfig, MongoConfig, Peer,
PostgresConfig, S3Config, SnowflakeConfig, SqlServerConfig,
},
};
use qrep::process_options;
Expand Down Expand Up @@ -798,7 +798,7 @@ fn parse_db_options(
.get("s3_integration")
.map(|s| s.to_string())
.unwrap_or_default();

let clickhouse_config = ClickhouseConfig {
host: opts.get("host").context("no host specified")?.to_string(),
port: opts
Expand All @@ -822,7 +822,7 @@ fn parse_db_options(
};
let config = Config::ClickhouseConfig(clickhouse_config);
Some(config)
}
}
};

Ok(config)
Expand Down
9 changes: 6 additions & 3 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Catalog {
let config_len = clickhouse_config.encoded_len();
buf.reserve(config_len);
clickhouse_config.encode(&mut buf)?;
}
}
};

buf
Expand Down Expand Up @@ -340,11 +340,14 @@ impl Catalog {
Ok(Some(Config::EventhubGroupConfig(eventhub_group_config)))
}
Some(DbType::Clickhouse) => {
let err = format!("unable to decode {} options for peer {}", "clickhouse", name);
let err = format!(
"unable to decode {} options for peer {}",
"clickhouse", name
);
let clickhouse_config =
pt::peerdb_peers::ClickhouseConfig::decode(options).context(err)?;
Ok(Some(Config::ClickhouseConfig(clickhouse_config)))
}
}
None => Ok(None),
}
}
Expand Down
4 changes: 2 additions & 2 deletions nexus/peer-bigquery/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use dashmap::DashMap;
use tokio::sync::Mutex;

use futures::StreamExt;
use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream};
use peer_cursor::{QueryExecutor, QueryOutput, Records, Schema, SendableStream};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use sqlparser::ast::Statement;

Expand All @@ -11,7 +11,7 @@ use crate::BigQueryQueryExecutor;
pub struct BigQueryCursor {
position: usize,
stream: Mutex<SendableStream>,
schema: SchemaRef,
schema: Schema,
}

pub struct BigQueryCursorManager {
Expand Down
4 changes: 2 additions & 2 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use gcp_bigquery_client::{
Client,
};
use peer_connections::PeerConnectionTracker;
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef};
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pt::peerdb_peers::BigqueryConfig;
use sqlparser::ast::{CloseCursor, Expr, FetchDirection, Statement, Value};
Expand Down Expand Up @@ -200,7 +200,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
}

// describe the output of the query
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<SchemaRef>> {
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>> {
// print the statement
tracing::info!("[bigquery] describe: {}", stmt);
// only support SELECT statements
Expand Down
15 changes: 8 additions & 7 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
pin::Pin,
str::FromStr,
sync::Arc,
task::{Context, Poll},
};

Expand All @@ -9,7 +10,7 @@ use futures::Stream;
use gcp_bigquery_client::model::{
field_type::FieldType, query_response::ResultSet, table_field_schema::TableFieldSchema,
};
use peer_cursor::{Record, RecordStream, Schema, SchemaRef};
use peer_cursor::{Record, RecordStream, Schema};
use pgwire::{
api::{
results::{FieldFormat, FieldInfo},
Expand All @@ -22,7 +23,7 @@ use value::Value;

#[derive(Debug)]
pub struct BqSchema {
schema: SchemaRef,
schema: Schema,
fields: Vec<TableFieldSchema>,
}

Expand Down Expand Up @@ -68,23 +69,23 @@ impl BqSchema {
.as_ref()
.expect("Schema fields are not present");

let schema = SchemaRef::new(Schema {
fields: fields
let schema = Arc::new(
fields
.iter()
.map(|field| {
let datatype = convert_field_type(&field.r#type);
FieldInfo::new(field.name.clone(), None, None, datatype, FieldFormat::Text)
})
.collect(),
});
);

Self {
schema,
fields: fields.clone(),
}
}

pub fn schema(&self) -> SchemaRef {
pub fn schema(&self) -> Schema {
self.schema.clone()
}
}
Expand Down Expand Up @@ -192,7 +193,7 @@ impl Stream for BqRecordStream {
}

impl RecordStream for BqRecordStream {
fn schema(&self) -> SchemaRef {
fn schema(&self) -> Schema {
self.schema.schema()
}
}
15 changes: 5 additions & 10 deletions nexus/peer-cursor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,22 @@ use value::Value;

pub mod util;

#[derive(Debug, Clone)]
pub struct Schema {
pub fields: Vec<FieldInfo>,
}

pub type SchemaRef = Arc<Schema>;
pub type Schema = Arc<Vec<FieldInfo>>;

pub struct Record {
pub values: Vec<Value>,
pub schema: SchemaRef,
pub schema: Schema,
}

pub trait RecordStream: Stream<Item = PgWireResult<Record>> {
fn schema(&self) -> SchemaRef;
fn schema(&self) -> Schema;
}

pub type SendableStream = Pin<Box<dyn RecordStream + Send>>;

pub struct Records {
pub records: Vec<Record>,
pub schema: SchemaRef,
pub schema: Schema,
}

#[derive(Debug, Clone)]
Expand All @@ -50,7 +45,7 @@ pub enum QueryOutput {
pub trait QueryExecutor: Send + Sync {
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput>;

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<SchemaRef>>;
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>>;

async fn is_connection_valid(&self) -> anyhow::Result<bool>;
}
21 changes: 7 additions & 14 deletions nexus/peer-cursor/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::sync::Arc;

use futures::{stream, StreamExt};
use pgwire::{
api::results::{DataRowEncoder, FieldInfo, QueryResponse, Response},
api::results::{DataRowEncoder, QueryResponse, Response},
error::{PgWireError, PgWireResult},
};
use value::Value;

use crate::{Records, SchemaRef, SendableStream};
use crate::{Records, Schema, SendableStream};

fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()> {
match value {
Expand Down Expand Up @@ -58,11 +56,10 @@ fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()>
}

pub fn sendable_stream_to_query_response<'a>(
schema: SchemaRef,
schema: Schema,
record_stream: SendableStream,
) -> PgWireResult<Response<'a>> {
let pg_schema: Arc<Vec<FieldInfo>> = Arc::new(schema.fields.clone());
let schema_copy = pg_schema.clone();
let schema_copy = schema.clone();

let data_row_stream = record_stream
.map(move |record_result| {
Expand All @@ -76,15 +73,11 @@ pub fn sendable_stream_to_query_response<'a>(
})
.boxed();

Ok(Response::Query(QueryResponse::new(
pg_schema,
data_row_stream,
)))
Ok(Response::Query(QueryResponse::new(schema, data_row_stream)))
}

pub fn records_to_query_response<'a>(records: Records) -> PgWireResult<Response<'a>> {
let pg_schema: Arc<Vec<FieldInfo>> = Arc::new(records.schema.fields.clone());
let schema_copy = pg_schema.clone();
let schema_copy = records.schema.clone();

let data_row_stream = stream::iter(records.records)
.map(move |record| {
Expand All @@ -97,7 +90,7 @@ pub fn records_to_query_response<'a>(records: Records) -> PgWireResult<Response<
.boxed();

Ok(Response::Query(QueryResponse::new(
pg_schema,
records.schema.clone(),
data_row_stream,
)))
}
8 changes: 4 additions & 4 deletions nexus/peer-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use peer_cursor::{QueryExecutor, QueryOutput, Schema, SchemaRef};
use peer_cursor::{QueryExecutor, QueryOutput, Schema};
use pgwire::{
api::results::{FieldFormat, FieldInfo},
error::{PgWireError, PgWireResult},
Expand Down Expand Up @@ -30,7 +30,7 @@ impl PostgresQueryExecutor {
})
}

pub async fn schema_from_query(&self, query: &str) -> anyhow::Result<SchemaRef> {
pub async fn schema_from_query(&self, query: &str) -> anyhow::Result<Schema> {
let prepared = self.client.prepare_typed(query, &[]).await?;

let fields: Vec<FieldInfo> = prepared
Expand All @@ -42,7 +42,7 @@ impl PostgresQueryExecutor {
})
.collect();

Ok(Arc::new(Schema { fields }))
Ok(Arc::new(fields))
}
}

Expand Down Expand Up @@ -113,7 +113,7 @@ impl QueryExecutor for PostgresQueryExecutor {
}
}

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<SchemaRef>> {
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>> {
match stmt {
Statement::Query(_query) => {
let schema = self
Expand Down
8 changes: 4 additions & 4 deletions nexus/peer-postgres/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use futures::Stream;
use peer_cursor::{Record, RecordStream, SchemaRef};
use peer_cursor::{Record, RecordStream, Schema};
use pgwire::error::{PgWireError, PgWireResult};
use postgres_inet::MaskedIpAddr;
use rust_decimal::Decimal;
Expand All @@ -14,11 +14,11 @@ use uuid::Uuid;
use value::{array::ArrayValue, Value};
pub struct PgRecordStream {
row_stream: Pin<Box<RowStream>>,
schema: SchemaRef,
schema: Schema,
}

impl PgRecordStream {
pub fn new(row_stream: RowStream, schema: SchemaRef) -> Self {
pub fn new(row_stream: RowStream, schema: Schema) -> Self {
Self {
row_stream: Box::pin(row_stream),
schema,
Expand Down Expand Up @@ -277,7 +277,7 @@ impl Stream for PgRecordStream {
}

impl RecordStream for PgRecordStream {
fn schema(&self) -> SchemaRef {
fn schema(&self) -> Schema {
self.schema.clone()
}
}
4 changes: 2 additions & 2 deletions nexus/peer-snowflake/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::SnowflakeQueryExecutor;
use dashmap::DashMap;
use futures::StreamExt;
use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream};
use peer_cursor::{QueryExecutor, QueryOutput, Records, Schema, SendableStream};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use sqlparser::ast::Statement;
use tokio::sync::Mutex;

pub struct SnowflakeCursor {
position: usize,
stream: Mutex<SendableStream>,
schema: SchemaRef,
schema: Schema,
}

pub struct SnowflakeCursorManager {
Expand Down
4 changes: 2 additions & 2 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Context;
use async_recursion::async_recursion;
use cursor::SnowflakeCursorManager;
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef};
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser;
Expand Down Expand Up @@ -395,7 +395,7 @@ impl QueryExecutor for SnowflakeQueryExecutor {
}
}

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<SchemaRef>> {
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>> {
match stmt {
Statement::Query(query) => {
let mut new_query = query.clone();
Expand Down
Loading

0 comments on commit 490e14f

Please sign in to comment.