Skip to content

Commit

Permalink
Basic optimizations
Browse files Browse the repository at this point in the history
Prefer `as_ref` over `clone` when possible
Prefer `eq_ignore_ascii_case` over calling `to_lower_case` before comparison
Executor: convert `Arc<Box<T>>` to `Arc<T>`
Prefer passing `&[u8]` instead of `Vec<u8>` to get_config. The postgres library is able to lend out slices from rows, avoiding a copy
  • Loading branch information
serprex committed Nov 2, 2023
1 parent 2920b47 commit 3fce83f
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 54 deletions.
6 changes: 3 additions & 3 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
destination_table_identifier: table_mapping.destination.to_string(),
partition_key: table_mapping
.partition_key
.clone()
.as_ref()
.map(|s| s.to_string()),
});
}
Expand Down Expand Up @@ -717,9 +717,9 @@ fn parse_db_options(
// check if peers contains key and if it does
// then add it to the eventhubs hashmap, if not error
if let Some(peer) = peers.get(&key) {
let eventhub_config = peer.config.clone().unwrap();
let eventhub_config = peer.config.as_ref().unwrap();
if let Config::EventhubConfig(eventhub_config) = eventhub_config {
eventhubs.insert(key.to_string(), eventhub_config);
eventhubs.insert(key.to_string(), eventhub_config.clone());
} else {
anyhow::bail!("Peer '{}' is not an eventhub", key);
}
Expand Down
50 changes: 24 additions & 26 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod embedded {

pub struct Catalog {
pg: Box<Client>,
executor: Arc<Box<dyn QueryExecutor>>,
executor: Arc<dyn QueryExecutor>,
}

async fn run_migrations(client: &mut Client) -> anyhow::Result<()> {
Expand Down Expand Up @@ -86,19 +86,18 @@ impl Catalog {
let pt_config = catalog_config.to_postgres_config();
let client = connect_postgres(&pt_config).await?;
let executor = PostgresQueryExecutor::new(None, &pt_config).await?;
let boxed_trait = Box::new(executor) as Box<dyn QueryExecutor>;

Ok(Self {
pg: Box::new(client),
executor: Arc::new(boxed_trait),
executor: Arc::new(executor),
})
}

pub async fn run_migrations(&mut self) -> anyhow::Result<()> {
run_migrations(&mut self.pg).await
}

pub fn get_executor(&self) -> Arc<Box<dyn QueryExecutor>> {
pub fn get_executor(&self) -> Arc<dyn QueryExecutor> {
self.executor.clone()
}

Expand Down Expand Up @@ -213,18 +212,18 @@ impl Catalog {
let mut peers = HashMap::new();

for row in rows {
let name: String = row.get(1);
let name: &str = row.get(1);
let peer_type: i32 = row.get(2);
let options: Vec<u8> = row.get(3);
let options: &[u8] = row.get(3);
let db_type = DbType::from_i32(peer_type);
let config = self.get_config(db_type, &name, options).await?;

let peer = Peer {
name: name.clone().to_lowercase(),
name: name.to_lowercase(),
r#type: peer_type,
config,
};
peers.insert(name, peer);
peers.insert(name.to_string(), peer);
}

Ok(peers)
Expand All @@ -242,14 +241,14 @@ impl Catalog {
let rows = self.pg.query(&stmt, &[&peer_name]).await?;

if let Some(row) = rows.first() {
let name: String = row.get(1);
let name: &str = row.get(1);
let peer_type: i32 = row.get(2);
let options: Vec<u8> = row.get(3);
let options: &[u8] = row.get(3);
let db_type = DbType::from_i32(peer_type);
let config = self.get_config(db_type, &name, options).await?;

let peer = Peer {
name: name.clone().to_lowercase(),
name: name.to_lowercase(),
r#type: peer_type,
config,
};
Expand All @@ -269,14 +268,14 @@ impl Catalog {
let rows = self.pg.query(&stmt, &[&peer_id]).await?;

if let Some(row) = rows.first() {
let name: String = row.get(0);
let name: &str = row.get(0);
let peer_type: i32 = row.get(1);
let options: Vec<u8> = row.get(2);
let options: &[u8] = row.get(2);
let db_type = DbType::from_i32(peer_type);
let config = self.get_config(db_type, &name, options).await?;

let peer = Peer {
name: name.clone().to_lowercase(),
name: name.to_lowercase(),
r#type: peer_type,
config,
};
Expand All @@ -291,49 +290,49 @@ impl Catalog {
&self,
db_type: Option<DbType>,
name: &str,
options: Vec<u8>,
options: &[u8],
) -> anyhow::Result<Option<Config>> {
match db_type {
Some(DbType::Snowflake) => {
let err = format!("unable to decode {} options for peer {}", "snowflake", name);
let snowflake_config =
pt::peerdb_peers::SnowflakeConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::SnowflakeConfig::decode(options).context(err)?;
Ok(Some(Config::SnowflakeConfig(snowflake_config)))
}
Some(DbType::Bigquery) => {
let err = format!("unable to decode {} options for peer {}", "bigquery", name);
let bigquery_config =
pt::peerdb_peers::BigqueryConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::BigqueryConfig::decode(options).context(err)?;
Ok(Some(Config::BigqueryConfig(bigquery_config)))
}
Some(DbType::Mongo) => {
let err = format!("unable to decode {} options for peer {}", "mongo", name);
let mongo_config =
pt::peerdb_peers::MongoConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::MongoConfig::decode(options).context(err)?;
Ok(Some(Config::MongoConfig(mongo_config)))
}
Some(DbType::Eventhub) => {
let err = format!("unable to decode {} options for peer {}", "eventhub", name);
let eventhub_config =
pt::peerdb_peers::EventHubConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::EventHubConfig::decode(options).context(err)?;
Ok(Some(Config::EventhubConfig(eventhub_config)))
}
Some(DbType::Postgres) => {
let err = format!("unable to decode {} options for peer {}", "postgres", name);
let postgres_config =
pt::peerdb_peers::PostgresConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::PostgresConfig::decode(options).context(err)?;
Ok(Some(Config::PostgresConfig(postgres_config)))
}
Some(DbType::S3) => {
let err = format!("unable to decode {} options for peer {}", "s3", name);
let s3_config =
pt::peerdb_peers::S3Config::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::S3Config::decode(options).context(err)?;
Ok(Some(Config::S3Config(s3_config)))
}
Some(DbType::Sqlserver) => {
let err = format!("unable to decode {} options for peer {}", "sqlserver", name);
let sqlserver_config =
pt::peerdb_peers::SqlServerConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::SqlServerConfig::decode(options).context(err)?;
Ok(Some(Config::SqlserverConfig(sqlserver_config)))
}
Some(DbType::EventhubGroup) => {
Expand All @@ -342,7 +341,7 @@ impl Catalog {
"eventhub_group", name
);
let eventhub_group_config =
pt::peerdb_peers::EventHubGroupConfig::decode(options.as_slice())
pt::peerdb_peers::EventHubGroupConfig::decode(options)
.context(err)?;
Ok(Some(Config::EventhubGroupConfig(eventhub_group_config)))
}
Expand Down Expand Up @@ -528,13 +527,12 @@ impl Catalog {

let first_row = rows.get(0).unwrap();
let workflow_id: Option<String> = first_row.get(0);
if workflow_id.is_none() {
let Some(workflow_id) = workflow_id else {
return Err(anyhow!(
"workflow id not found for existing flow job {}",
flow_job_name
));
}
let workflow_id = workflow_id.unwrap();
};
let source_peer_id: i32 = first_row.get(1);
let destination_peer_id: i32 = first_row.get(2);

Expand Down
6 changes: 3 additions & 3 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ impl BqSchema {
let bq_schema = result_set
.query_response()
.schema
.clone()
.as_ref()
.expect("Schema is not present");
let fields = bq_schema.fields.expect("Schema fields are not present");
let fields = bq_schema.fields.as_ref().expect("Schema fields are not present");

let schema = SchemaRef::new(Schema {
fields: fields
Expand All @@ -74,7 +74,7 @@ impl BqSchema {
.collect(),
});

Self { schema, fields }
Self { schema, fields: fields.clone() }
}

pub fn schema(&self) -> SchemaRef {
Expand Down
27 changes: 16 additions & 11 deletions nexus/peer-postgres/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ pub struct PostgresAst {
impl PostgresAst {
pub fn rewrite_query(&self, query: &mut Query) {
visit_relations_mut(query, |table| {
// if the peer name is the first part of the table name,
// remove it.
if Some(table.0[0].value.clone().to_lowercase()) == self.peername {
table.0.remove(0);
// if peer name is first part of table name, remove first part
if let Some(ref peername) = self.peername {
if peername.eq_ignore_ascii_case(&table.0[0].value) {
table.0.remove(0);
}
}
ControlFlow::<()>::Continue(())
});
Expand All @@ -29,20 +30,24 @@ impl PostgresAst {
} = stmnt
{
if object_type == &ObjectType::Table {
let table = names.get_mut(0).unwrap();
if Some(table.0[0].value.clone().to_lowercase()) == self.peername {
table.0.remove(0);
if let Some(ref peername) = self.peername {
if let Some(table) = names.first_mut() {
if peername.eq_ignore_ascii_case(&table.0[0].value) {
table.0.remove(0);
}
}
}
}
}
ControlFlow::<()>::Continue(())
});

visit_relations_mut(stmt, |table| {
// if the peer name is the first part of the table name,
// remove it.
if Some(table.0[0].value.clone().to_lowercase()) == self.peername {
table.0.remove(0);
// if peer name is first part of table name, remove first part
if let Some(ref peername) = self.peername {
if peername.eq_ignore_ascii_case(&table.0[0].value) {
table.0.remove(0);
}
}
ControlFlow::<()>::Continue(())
});
Expand Down
8 changes: 4 additions & 4 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ impl SnowflakeQueryExecutor {
SNOWFLAKE_URL_PREFIX, config.account_id, SNOWFLAKE_URL_SUFFIX
),
auth: SnowflakeAuth::new(
config.clone().account_id,
config.clone().username,
config.clone().private_key,
config.clone().password,
config.account_id.clone(),
config.username.clone(),
config.private_key.clone(),
config.password.clone(),
DEFAULT_REFRESH_THRESHOLD,
DEFAULT_EXPIRY_THRESHOLD,
)?,
Expand Down
14 changes: 7 additions & 7 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub struct NexusBackend {
portal_store: Arc<MemPortalStore<NexusParsedStatement>>,
query_parser: Arc<NexusQueryParser>,
peer_cursors: Arc<Mutex<PeerCursors>>,
executors: Arc<DashMap<String, Arc<Box<dyn QueryExecutor>>>>,
executors: Arc<DashMap<String, Arc<dyn QueryExecutor>>>,
flow_handler: Option<Arc<Mutex<FlowGrpcClient>>>,
peerdb_fdw_mode: bool,
}
Expand Down Expand Up @@ -104,7 +104,7 @@ impl NexusBackend {
// execute a statement on a peer
async fn execute_statement<'a>(
&self,
executor: Arc<Box<dyn QueryExecutor>>,
executor: Arc<dyn QueryExecutor>,
stmt: &sqlparser::ast::Statement,
peer_holder: Option<Box<Peer>>,
) -> PgWireResult<Vec<Response<'a>>> {
Expand Down Expand Up @@ -641,26 +641,26 @@ impl NexusBackend {
Ok(workflow_id)
}

async fn get_peer_executor(&self, peer: &Peer) -> anyhow::Result<Arc<Box<dyn QueryExecutor>>> {
async fn get_peer_executor(&self, peer: &Peer) -> anyhow::Result<Arc<dyn QueryExecutor>> {
if let Some(executor) = self.executors.get(&peer.name) {
return Ok(Arc::clone(executor.value()));
}

let executor = match &peer.config {
let executor: Arc<dyn QueryExecutor> = match &peer.config {
Some(Config::BigqueryConfig(ref c)) => {
let peer_name = peer.name.clone();
let executor =
BigQueryQueryExecutor::new(peer_name, c, self.peer_connections.clone()).await?;
Arc::new(Box::new(executor) as Box<dyn QueryExecutor>)
Arc::new(executor)
}
Some(Config::PostgresConfig(ref c)) => {
let peername = Some(peer.name.clone());
let executor = peer_postgres::PostgresQueryExecutor::new(peername, c).await?;
Arc::new(Box::new(executor) as Box<dyn QueryExecutor>)
Arc::new(executor)
}
Some(Config::SnowflakeConfig(ref c)) => {
let executor = peer_snowflake::SnowflakeQueryExecutor::new(c).await?;
Arc::new(Box::new(executor) as Box<dyn QueryExecutor>)
Arc::new(executor)
}
_ => {
panic!("peer type not supported: {:?}", peer)
Expand Down

0 comments on commit 3fce83f

Please sign in to comment.