Skip to content

Commit

Permalink
Replace Schema/SchemaRef with type alias to Arc<Vec<FieldInfo>>
Browse files Browse the repository at this point in the history
SchemaRef was used everywhere, pgwire doesn't name the type,
so when the api wanted an `Arc<Vec<FieldInfo>>` while we had
`Arc<Schema{ fields: Vec<FieldInfo> }>` it required constructing
`Arc::new(schema.fields.clone())`, an unnecessary clone
  • Loading branch information
serprex committed Jan 9, 2024
1 parent 8e613b5 commit aa66464
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 55 deletions.
4 changes: 2 additions & 2 deletions nexus/peer-bigquery/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use dashmap::DashMap;
use tokio::sync::Mutex;

use futures::StreamExt;
use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream};
use peer_cursor::{QueryExecutor, QueryOutput, Records, Schema, SendableStream};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use sqlparser::ast::Statement;

Expand All @@ -11,7 +11,7 @@ use crate::BigQueryQueryExecutor;
pub struct BigQueryCursor {
position: usize,
stream: Mutex<SendableStream>,
schema: SchemaRef,
schema: Schema,
}

pub struct BigQueryCursorManager {
Expand Down
4 changes: 2 additions & 2 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use gcp_bigquery_client::{
Client,
};
use peer_connections::PeerConnectionTracker;
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef};
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pt::peerdb_peers::BigqueryConfig;
use sqlparser::ast::{CloseCursor, Expr, FetchDirection, Statement, Value};
Expand Down Expand Up @@ -200,7 +200,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
}

// describe the output of the query
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<SchemaRef>> {
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>> {
// print the statement
tracing::info!("[bigquery] describe: {}", stmt);
// only support SELECT statements
Expand Down
15 changes: 8 additions & 7 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
pin::Pin,
str::FromStr,
sync::Arc,
task::{Context, Poll},
};

Expand All @@ -9,7 +10,7 @@ use futures::Stream;
use gcp_bigquery_client::model::{
field_type::FieldType, query_response::ResultSet, table_field_schema::TableFieldSchema,
};
use peer_cursor::{Record, RecordStream, Schema, SchemaRef};
use peer_cursor::{Record, RecordStream, Schema};
use pgwire::{
api::{
results::{FieldFormat, FieldInfo},
Expand All @@ -22,7 +23,7 @@ use value::Value;

#[derive(Debug)]
pub struct BqSchema {
schema: SchemaRef,
schema: Schema,
fields: Vec<TableFieldSchema>,
}

Expand Down Expand Up @@ -68,23 +69,23 @@ impl BqSchema {
.as_ref()
.expect("Schema fields are not present");

let schema = SchemaRef::new(Schema {
fields: fields
let schema = Arc::new(
fields
.iter()
.map(|field| {
let datatype = convert_field_type(&field.r#type);
FieldInfo::new(field.name.clone(), None, None, datatype, FieldFormat::Text)
})
.collect(),
});
);

Self {
schema,
fields: fields.clone(),
}
}

pub fn schema(&self) -> SchemaRef {
pub fn schema(&self) -> Schema {
self.schema.clone()
}
}
Expand Down Expand Up @@ -192,7 +193,7 @@ impl Stream for BqRecordStream {
}

impl RecordStream for BqRecordStream {
fn schema(&self) -> SchemaRef {
fn schema(&self) -> Schema {
self.schema.schema()
}
}
15 changes: 5 additions & 10 deletions nexus/peer-cursor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,22 @@ use value::Value;

pub mod util;

#[derive(Debug, Clone)]
pub struct Schema {
pub fields: Vec<FieldInfo>,
}

pub type SchemaRef = Arc<Schema>;
pub type Schema = Arc<Vec<FieldInfo>>;

pub struct Record {
pub values: Vec<Value>,
pub schema: SchemaRef,
pub schema: Schema,
}

pub trait RecordStream: Stream<Item = PgWireResult<Record>> {
fn schema(&self) -> SchemaRef;
fn schema(&self) -> Schema;
}

pub type SendableStream = Pin<Box<dyn RecordStream + Send>>;

pub struct Records {
pub records: Vec<Record>,
pub schema: SchemaRef,
pub schema: Schema,
}

#[derive(Debug, Clone)]
Expand All @@ -50,7 +45,7 @@ pub enum QueryOutput {
pub trait QueryExecutor: Send + Sync {
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput>;

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<SchemaRef>>;
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>>;

async fn is_connection_valid(&self) -> anyhow::Result<bool>;
}
18 changes: 7 additions & 11 deletions nexus/peer-cursor/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::sync::Arc;

use futures::{stream, StreamExt};
use pgwire::{
api::results::{DataRowEncoder, FieldInfo, QueryResponse, Response},
api::results::{DataRowEncoder, QueryResponse, Response},
error::{PgWireError, PgWireResult},
};
use value::Value;

use crate::{Records, SchemaRef, SendableStream};
use crate::{Records, Schema, SendableStream};

fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()> {
match value {
Expand Down Expand Up @@ -58,11 +56,10 @@ fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResult<()>
}

pub fn sendable_stream_to_query_response<'a>(
schema: SchemaRef,
schema: Schema,
record_stream: SendableStream,
) -> PgWireResult<Response<'a>> {
let pg_schema: Arc<Vec<FieldInfo>> = Arc::new(schema.fields.clone());
let schema_copy = pg_schema.clone();
let schema_copy = schema.clone();

let data_row_stream = record_stream
.map(move |record_result| {
Expand All @@ -77,14 +74,13 @@ pub fn sendable_stream_to_query_response<'a>(
.boxed();

Ok(Response::Query(QueryResponse::new(
pg_schema,
schema,
data_row_stream,
)))
}

pub fn records_to_query_response<'a>(records: Records) -> PgWireResult<Response<'a>> {
let pg_schema: Arc<Vec<FieldInfo>> = Arc::new(records.schema.fields.clone());
let schema_copy = pg_schema.clone();
let schema_copy = records.schema.clone();

let data_row_stream = stream::iter(records.records)
.map(move |record| {
Expand All @@ -97,7 +93,7 @@ pub fn records_to_query_response<'a>(records: Records) -> PgWireResult<Response<
.boxed();

Ok(Response::Query(QueryResponse::new(
pg_schema,
records.schema.clone(),
data_row_stream,
)))
}
8 changes: 4 additions & 4 deletions nexus/peer-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use peer_cursor::{QueryExecutor, QueryOutput, Schema, SchemaRef};
use peer_cursor::{QueryExecutor, QueryOutput, Schema};
use pgwire::{
api::results::{FieldFormat, FieldInfo},
error::{PgWireError, PgWireResult},
Expand Down Expand Up @@ -30,7 +30,7 @@ impl PostgresQueryExecutor {
})
}

pub async fn schema_from_query(&self, query: &str) -> anyhow::Result<SchemaRef> {
pub async fn schema_from_query(&self, query: &str) -> anyhow::Result<Schema> {
let prepared = self.client.prepare_typed(query, &[]).await?;

let fields: Vec<FieldInfo> = prepared
Expand All @@ -42,7 +42,7 @@ impl PostgresQueryExecutor {
})
.collect();

Ok(Arc::new(Schema { fields }))
Ok(Arc::new(fields))
}
}

Expand Down Expand Up @@ -113,7 +113,7 @@ impl QueryExecutor for PostgresQueryExecutor {
}
}

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<SchemaRef>> {
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>> {
match stmt {
Statement::Query(_query) => {
let schema = self
Expand Down
8 changes: 4 additions & 4 deletions nexus/peer-postgres/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use futures::Stream;
use peer_cursor::{Record, RecordStream, SchemaRef};
use peer_cursor::{Record, RecordStream, Schema};
use pgwire::error::{PgWireError, PgWireResult};
use postgres_inet::MaskedIpAddr;
use rust_decimal::Decimal;
Expand All @@ -14,11 +14,11 @@ use uuid::Uuid;
use value::{array::ArrayValue, Value};
pub struct PgRecordStream {
row_stream: Pin<Box<RowStream>>,
schema: SchemaRef,
schema: Schema,
}

impl PgRecordStream {
pub fn new(row_stream: RowStream, schema: SchemaRef) -> Self {
pub fn new(row_stream: RowStream, schema: Schema) -> Self {
Self {
row_stream: Box::pin(row_stream),
schema,
Expand Down Expand Up @@ -277,7 +277,7 @@ impl Stream for PgRecordStream {
}

impl RecordStream for PgRecordStream {
fn schema(&self) -> SchemaRef {
fn schema(&self) -> Schema {
self.schema.clone()
}
}
4 changes: 2 additions & 2 deletions nexus/peer-snowflake/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::SnowflakeQueryExecutor;
use dashmap::DashMap;
use futures::StreamExt;
use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream};
use peer_cursor::{QueryExecutor, QueryOutput, Records, Schema, SendableStream};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use sqlparser::ast::Statement;
use tokio::sync::Mutex;

pub struct SnowflakeCursor {
position: usize,
stream: Mutex<SendableStream>,
schema: SchemaRef,
schema: Schema,
}

pub struct SnowflakeCursorManager {
Expand Down
4 changes: 2 additions & 2 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Context;
use async_recursion::async_recursion;
use cursor::SnowflakeCursorManager;
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, SchemaRef};
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser;
Expand Down Expand Up @@ -395,7 +395,7 @@ impl QueryExecutor for SnowflakeQueryExecutor {
}
}

async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<SchemaRef>> {
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>> {
match stmt {
Statement::Query(query) => {
let mut new_query = query.clone();
Expand Down
16 changes: 8 additions & 8 deletions nexus/peer-snowflake/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::{auth::SnowflakeAuth, PartitionResult, ResultSet};
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use futures::Stream;
use peer_cursor::Schema;
use peer_cursor::{Record, RecordStream, SchemaRef};
use peer_cursor::{Record, RecordStream, Schema};
use pgwire::{
api::{
results::{FieldFormat, FieldInfo},
Expand All @@ -15,6 +14,7 @@ use serde::Deserialize;
use std::{
pin::Pin,
task::{Context, Poll},
sync::Arc,
};
use value::Value::{
self, BigInt, Binary, Bool, Date, Float, PostgresTimestamp, Text, Time, TimestampWithTimeZone,
Expand All @@ -40,7 +40,7 @@ pub(crate) enum SnowflakeDataType {
}

pub struct SnowflakeSchema {
schema: SchemaRef,
schema: Schema,
}

fn convert_field_type(field_type: &SnowflakeDataType) -> Type {
Expand All @@ -63,20 +63,20 @@ impl SnowflakeSchema {
pub fn from_result_set(result_set: &ResultSet) -> Self {
let fields = result_set.resultSetMetaData.rowType.clone();

let schema = SchemaRef::new(Schema {
fields: fields
let schema = Arc::new(
fields
.iter()
.map(|field| {
let datatype = convert_field_type(&field.r#type);
FieldInfo::new(field.name.clone(), None, None, datatype, FieldFormat::Text)
})
.collect(),
});
);

Self { schema }
}

pub fn schema(&self) -> SchemaRef {
pub fn schema(&self) -> Schema {
self.schema.clone()
}
}
Expand Down Expand Up @@ -249,7 +249,7 @@ impl Stream for SnowflakeRecordStream {
}

impl RecordStream for SnowflakeRecordStream {
fn schema(&self) -> SchemaRef {
fn schema(&self) -> Schema {
self.schema.schema()
}
}
6 changes: 3 additions & 3 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use peer_bigquery::BigQueryQueryExecutor;
use peer_connections::{PeerConnectionTracker, PeerConnections};
use peer_cursor::{
util::{records_to_query_response, sendable_stream_to_query_response},
QueryExecutor, QueryOutput, SchemaRef,
QueryExecutor, QueryOutput, Schema,
};
use peerdb_parser::{NexusParsedStatement, NexusQueryParser, NexusStatement};
use pgwire::{
Expand Down Expand Up @@ -1045,7 +1045,7 @@ impl ExtendedQueryHandler for NexusBackend {
NexusStatement::PeerCursor { .. } => Ok(DescribeResponse::no_data()),
NexusStatement::Empty => Ok(DescribeResponse::no_data()),
NexusStatement::PeerQuery { stmt, assoc } => {
let schema: Option<SchemaRef> = match assoc {
let schema: Option<Schema> = match assoc {
QueryAssociation::Peer(peer) => {
// if the peer is of type bigquery, let us route the query to bq.
match &peer.config {
Expand Down Expand Up @@ -1099,7 +1099,7 @@ impl ExtendedQueryHandler for NexusBackend {
} else {
Ok(DescribeResponse::new(
param_types,
described_schema.fields.clone(),
(*described_schema).clone(),
))
}
} else {
Expand Down

0 comments on commit aa66464

Please sign in to comment.