Skip to content

Commit

Permalink
Merge branch 'main' into pause-mirror-initial
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Nov 2, 2023
2 parents d231b9c + 0aec792 commit 4bbc07c
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 77 deletions.
6 changes: 3 additions & 3 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
destination_table_identifier: table_mapping.destination.to_string(),
partition_key: table_mapping
.partition_key
.clone()
.as_ref()
.map(|s| s.to_string()),
});
}
Expand Down Expand Up @@ -728,9 +728,9 @@ fn parse_db_options(
// check if peers contains key and if it does
// then add it to the eventhubs hashmap, if not error
if let Some(peer) = peers.get(&key) {
let eventhub_config = peer.config.clone().unwrap();
let eventhub_config = peer.config.as_ref().unwrap();
if let Config::EventhubConfig(eventhub_config) = eventhub_config {
eventhubs.insert(key.to_string(), eventhub_config);
eventhubs.insert(key.to_string(), eventhub_config.clone());
} else {
anyhow::bail!("Peer '{}' is not an eventhub", key);
}
Expand Down
56 changes: 27 additions & 29 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod embedded {

pub struct Catalog {
pg: Box<Client>,
executor: Arc<Box<dyn QueryExecutor>>,
executor: Arc<dyn QueryExecutor>,
}

async fn run_migrations(client: &mut Client) -> anyhow::Result<()> {
Expand Down Expand Up @@ -86,19 +86,18 @@ impl Catalog {
let pt_config = catalog_config.to_postgres_config();
let client = connect_postgres(&pt_config).await?;
let executor = PostgresQueryExecutor::new(None, &pt_config).await?;
let boxed_trait = Box::new(executor) as Box<dyn QueryExecutor>;

Ok(Self {
pg: Box::new(client),
executor: Arc::new(boxed_trait),
executor: Arc::new(executor),
})
}

pub async fn run_migrations(&mut self) -> anyhow::Result<()> {
run_migrations(&mut self.pg).await
}

pub fn get_executor(&self) -> Arc<Box<dyn QueryExecutor>> {
pub fn get_executor(&self) -> Arc<dyn QueryExecutor> {
self.executor.clone()
}

Expand Down Expand Up @@ -213,18 +212,18 @@ impl Catalog {
let mut peers = HashMap::new();

for row in rows {
let name: String = row.get(1);
let name: &str = row.get(1);
let peer_type: i32 = row.get(2);
let options: Vec<u8> = row.get(3);
let options: &[u8] = row.get(3);
let db_type = DbType::from_i32(peer_type);
let config = self.get_config(db_type, &name, options).await?;
let config = self.get_config(db_type, name, options).await?;

let peer = Peer {
name: name.clone().to_lowercase(),
name: name.to_lowercase(),
r#type: peer_type,
config,
};
peers.insert(name, peer);
peers.insert(name.to_string(), peer);
}

Ok(peers)
Expand All @@ -242,14 +241,14 @@ impl Catalog {
let rows = self.pg.query(&stmt, &[&peer_name]).await?;

if let Some(row) = rows.first() {
let name: String = row.get(1);
let name: &str = row.get(1);
let peer_type: i32 = row.get(2);
let options: Vec<u8> = row.get(3);
let options: &[u8] = row.get(3);
let db_type = DbType::from_i32(peer_type);
let config = self.get_config(db_type, &name, options).await?;
let config = self.get_config(db_type, name, options).await?;

let peer = Peer {
name: name.clone().to_lowercase(),
name: name.to_lowercase(),
r#type: peer_type,
config,
};
Expand All @@ -269,14 +268,14 @@ impl Catalog {
let rows = self.pg.query(&stmt, &[&peer_id]).await?;

if let Some(row) = rows.first() {
let name: String = row.get(0);
let name: &str = row.get(0);
let peer_type: i32 = row.get(1);
let options: Vec<u8> = row.get(2);
let options: &[u8] = row.get(2);
let db_type = DbType::from_i32(peer_type);
let config = self.get_config(db_type, &name, options).await?;
let config = self.get_config(db_type, name, options).await?;

let peer = Peer {
name: name.clone().to_lowercase(),
name: name.to_lowercase(),
r#type: peer_type,
config,
};
Expand All @@ -291,49 +290,49 @@ impl Catalog {
&self,
db_type: Option<DbType>,
name: &str,
options: Vec<u8>,
options: &[u8],
) -> anyhow::Result<Option<Config>> {
match db_type {
Some(DbType::Snowflake) => {
let err = format!("unable to decode {} options for peer {}", "snowflake", name);
let snowflake_config =
pt::peerdb_peers::SnowflakeConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::SnowflakeConfig::decode(options).context(err)?;
Ok(Some(Config::SnowflakeConfig(snowflake_config)))
}
Some(DbType::Bigquery) => {
let err = format!("unable to decode {} options for peer {}", "bigquery", name);
let bigquery_config =
pt::peerdb_peers::BigqueryConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::BigqueryConfig::decode(options).context(err)?;
Ok(Some(Config::BigqueryConfig(bigquery_config)))
}
Some(DbType::Mongo) => {
let err = format!("unable to decode {} options for peer {}", "mongo", name);
let mongo_config =
pt::peerdb_peers::MongoConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::MongoConfig::decode(options).context(err)?;
Ok(Some(Config::MongoConfig(mongo_config)))
}
Some(DbType::Eventhub) => {
let err = format!("unable to decode {} options for peer {}", "eventhub", name);
let eventhub_config =
pt::peerdb_peers::EventHubConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::EventHubConfig::decode(options).context(err)?;
Ok(Some(Config::EventhubConfig(eventhub_config)))
}
Some(DbType::Postgres) => {
let err = format!("unable to decode {} options for peer {}", "postgres", name);
let postgres_config =
pt::peerdb_peers::PostgresConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::PostgresConfig::decode(options).context(err)?;
Ok(Some(Config::PostgresConfig(postgres_config)))
}
Some(DbType::S3) => {
let err = format!("unable to decode {} options for peer {}", "s3", name);
let s3_config =
pt::peerdb_peers::S3Config::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::S3Config::decode(options).context(err)?;
Ok(Some(Config::S3Config(s3_config)))
}
Some(DbType::Sqlserver) => {
let err = format!("unable to decode {} options for peer {}", "sqlserver", name);
let sqlserver_config =
pt::peerdb_peers::SqlServerConfig::decode(options.as_slice()).context(err)?;
pt::peerdb_peers::SqlServerConfig::decode(options).context(err)?;
Ok(Some(Config::SqlserverConfig(sqlserver_config)))
}
Some(DbType::EventhubGroup) => {
Expand All @@ -342,7 +341,7 @@ impl Catalog {
"eventhub_group", name
);
let eventhub_group_config =
pt::peerdb_peers::EventHubGroupConfig::decode(options.as_slice())
pt::peerdb_peers::EventHubGroupConfig::decode(options)
.context(err)?;
Ok(Some(Config::EventhubGroupConfig(eventhub_group_config)))
}
Expand Down Expand Up @@ -528,13 +527,12 @@ impl Catalog {

let first_row = rows.get(0).unwrap();
let workflow_id: Option<String> = first_row.get(0);
if workflow_id.is_none() {
let Some(workflow_id) = workflow_id else {
return Err(anyhow!(
"workflow id not found for existing flow job {}",
flow_job_name
));
}
let workflow_id = workflow_id.unwrap();
};
let source_peer_id: i32 = first_row.get(1);
let destination_peer_id: i32 = first_row.get(2);

Expand Down
36 changes: 17 additions & 19 deletions nexus/peer-bigquery/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use sqlparser::ast::{
pub struct BigqueryAst {}

impl BigqueryAst {
pub fn is_timestamp_returning_function(&self, name: String) -> bool {
if name == "now"
|| name == "date_trunc"
|| name == "make_timestamp"
|| name == "current_timestamp"
pub fn is_timestamp_returning_function(&self, name: &str) -> bool {
if name.eq_ignore_ascii_case("now")
|| name.eq_ignore_ascii_case("date_trunc")
|| name.eq_ignore_ascii_case("make_timestamp")
|| name.eq_ignore_ascii_case("current_timestamp")
{
return true;
}
Expand All @@ -37,7 +37,7 @@ impl BigqueryAst {
..
}) = e
{
if self.is_timestamp_returning_function(v[0].to_string().to_lowercase()) {
if self.is_timestamp_returning_function(&v[0].value) {
return true;
}
}
Expand All @@ -49,21 +49,20 @@ impl BigqueryAst {
false
}

pub fn convert_to_datetimefield(&self, t: String) -> Option<DateTimeField> {
let t_lower = t.to_lowercase();
if t_lower == "day" || t_lower == "days" {
pub fn convert_to_datetimefield(&self, t: &str) -> Option<DateTimeField> {
if t.eq_ignore_ascii_case("day") || t.eq_ignore_ascii_case("days") {
return Some(DateTimeField::Day);
}
if t_lower == "hour" || t_lower == "hours" {
if t.eq_ignore_ascii_case("hour") || t.eq_ignore_ascii_case("hours") {
return Some(DateTimeField::Hour);
}
if t_lower == "minute" || t_lower == "minutes" {
if t.eq_ignore_ascii_case("minute") || t.eq_ignore_ascii_case("minutes") {
return Some(DateTimeField::Minute);
}
if t_lower == "second" || t_lower == "Seconds" {
if t.eq_ignore_ascii_case("second") || t.eq_ignore_ascii_case("seconds") {
return Some(DateTimeField::Second);
}
if t_lower == "millisecond" || t_lower == "milliseconds" {
if t.eq_ignore_ascii_case("millisecond") || t.eq_ignore_ascii_case("milliseconds") {
return Some(DateTimeField::Milliseconds);
}
None
Expand Down Expand Up @@ -119,7 +118,7 @@ impl BigqueryAst {
}) = node
{
// now() to CURRENT_TIMESTAMP
if v[0].to_string().to_lowercase() == "now" {
if v[0].value.eq_ignore_ascii_case("now") {
v[0].value = "CURRENT_TIMESTAMP".into();
}
}
Expand All @@ -131,17 +130,16 @@ impl BigqueryAst {
..
}) = node
{
if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = value.as_mut() {
if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = value.as_ref() {
/*
postgres will have interval '1 Day'
rewriting that to interval 1 Day in BQ
*/
let split = s.split(' ');
let vec = split.collect::<Vec<&str>>();
let val_string: String = vec[0].into();
let date_time_field_string: String = vec[1].into();
let date_time_field = self.convert_to_datetimefield(vec[1]);
*(value.as_mut()) = Expr::Value(Number(val_string, false));
let date_time_field = self.convert_to_datetimefield(date_time_field_string);
if date_time_field.is_none() {
// Error handling - Nexus for BQ only supports Day, Hour, Minute, Second, Millisecond
}
Expand All @@ -164,7 +162,7 @@ impl BigqueryAst {
change - to DATE_SUB
*/
if let Expr::BinaryOp { left, op, right } = node {
if self.is_timestamp_expr(left.as_mut()) || self.is_timestamp_expr(right.as_mut()) {
if self.is_timestamp_expr(left.as_ref()) || self.is_timestamp_expr(right.as_ref()) {
if let BinaryOperator::Minus = op {
*node = Expr::Function(Function {
name: ObjectName(vec![Ident::new("DATE_SUB".to_string())]),
Expand Down Expand Up @@ -198,7 +196,7 @@ impl BigqueryAst {
..
}) = node
{
if v[0].to_string().to_lowercase() == "date_trunc" {
if v[0].value.eq_ignore_ascii_case("date_trunc") {
let mut date_part = a[0].to_string();
let date_expression = &a[1];
a[0] = date_expression.clone();
Expand Down
6 changes: 3 additions & 3 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ impl BqSchema {
let bq_schema = result_set
.query_response()
.schema
.clone()
.as_ref()
.expect("Schema is not present");
let fields = bq_schema.fields.expect("Schema fields are not present");
let fields = bq_schema.fields.as_ref().expect("Schema fields are not present");

let schema = SchemaRef::new(Schema {
fields: fields
Expand All @@ -74,7 +74,7 @@ impl BqSchema {
.collect(),
});

Self { schema, fields }
Self { schema, fields: fields.clone() }
}

pub fn schema(&self) -> SchemaRef {
Expand Down
27 changes: 16 additions & 11 deletions nexus/peer-postgres/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ pub struct PostgresAst {
impl PostgresAst {
pub fn rewrite_query(&self, query: &mut Query) {
visit_relations_mut(query, |table| {
// if the peer name is the first part of the table name,
// remove it.
if Some(table.0[0].value.clone().to_lowercase()) == self.peername {
table.0.remove(0);
// if peer name is first part of table name, remove first part
if let Some(ref peername) = self.peername {
if peername.eq_ignore_ascii_case(&table.0[0].value) {
table.0.remove(0);
}
}
ControlFlow::<()>::Continue(())
});
Expand All @@ -29,20 +30,24 @@ impl PostgresAst {
} = stmnt
{
if object_type == &ObjectType::Table {
let table = names.get_mut(0).unwrap();
if Some(table.0[0].value.clone().to_lowercase()) == self.peername {
table.0.remove(0);
if let Some(ref peername) = self.peername {
if let Some(table) = names.first_mut() {
if peername.eq_ignore_ascii_case(&table.0[0].value) {
table.0.remove(0);
}
}
}
}
}
ControlFlow::<()>::Continue(())
});

visit_relations_mut(stmt, |table| {
// if the peer name is the first part of the table name,
// remove it.
if Some(table.0[0].value.clone().to_lowercase()) == self.peername {
table.0.remove(0);
// if peer name is first part of table name, remove first part
if let Some(ref peername) = self.peername {
if peername.eq_ignore_ascii_case(&table.0[0].value) {
table.0.remove(0);
}
}
ControlFlow::<()>::Continue(())
});
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-snowflake/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ impl SnowflakeAuth {
.get_or_insert(raw_account.chars().count())
};
raw_account
.to_uppercase()
.chars()
.flat_map(char::to_uppercase)
.take(split_index)
.collect()
}
Expand Down
8 changes: 4 additions & 4 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ impl SnowflakeQueryExecutor {
SNOWFLAKE_URL_PREFIX, config.account_id, SNOWFLAKE_URL_SUFFIX
),
auth: SnowflakeAuth::new(
config.clone().account_id,
config.clone().username,
config.clone().private_key,
config.clone().password,
config.account_id.clone(),
config.username.clone(),
config.private_key.clone(),
config.password.clone(),
DEFAULT_REFRESH_THRESHOLD,
DEFAULT_EXPIRY_THRESHOLD,
)?,
Expand Down
Loading

0 comments on commit 4bbc07c

Please sign in to comment.