Skip to content

Commit

Permalink
Implement a separate code path for serverless connectors at the sql-q…
Browse files Browse the repository at this point in the history
…uery-connector-level (#4076)
  • Loading branch information
Miguel Fernández authored Jul 25, 2023
1 parent 1548dc1 commit 512cf35
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 228 deletions.
1 change: 1 addition & 0 deletions psl/builtin-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use mongodb::MongoDbType;
pub use mssql_datamodel_connector::{MsSqlType, MsSqlTypeParameter};
pub use mysql_datamodel_connector::MySqlType;
pub use postgres_datamodel_connector::{PostgresDatasourceProperties, PostgresType};
pub use psl_core::js_connector::JsConnector;

mod mongodb;
mod mssql_datamodel_connector;
Expand Down
9 changes: 9 additions & 0 deletions psl/psl-core/src/js_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ pub struct JsConnector {
pub allowed_protocols: Option<&'static [&'static str]>,
}

impl JsConnector {
/// Returns true if the given name is a valid provider name for a JsConnector.
/// We use the convention that if a provider starts with ´@prisma/´ (ex. ´@prisma/planetscale´)
/// then its a provider for a JS connector.
pub fn is_provider(name: &str) -> bool {
name.starts_with("@prisma/")
}
}

#[derive(Copy, Clone)]
pub enum Flavor {
MySQL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub(crate) struct SqlConnection<C> {

impl<C> SqlConnection<C>
where
C: Queryable + TransactionCapable + Send + Sync + 'static,
C: TransactionCapable + Send + Sync + 'static,
{
pub fn new(inner: C, connection_info: &ConnectionInfo, features: psl::PreviewFeatures) -> Self {
let connection_info = connection_info.clone();
Expand Down
161 changes: 161 additions & 0 deletions query-engine/connectors/sql-query-connector/src/database/js.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use super::connection::SqlConnection;
use crate::FromSource;
use async_trait::async_trait;
use connector_interface::{
self as connector,
error::{ConnectorError, ErrorKind},
Connection, Connector,
};
use quaint::{
connector::IsolationLevel,
prelude::{Queryable as QuaintQueryable, *},
};
use std::sync::Arc;

// TODO: https://github.com/prisma/team-orm/issues/245
// implement registry for client drivers, rather than a global variable,
// this would require the register_driver and registered_js_driver functions to
// receive an identifier for the specific driver
static QUERYABLE: once_cell::sync::OnceCell<Arc<dyn Queryable>> = once_cell::sync::OnceCell::new();

pub fn registered_js_connector() -> Option<&'static Arc<dyn Queryable>> {
QUERYABLE.get()
}

pub fn register_js_connector(driver: Arc<dyn Queryable>) {
if QUERYABLE.set(driver).is_err() {
panic!("Cannot register driver twice");
}
}

pub struct Js {
connector: JsConnector,
connection_info: ConnectionInfo,
features: psl::PreviewFeatures,
psl_connector: psl::builtin_connectors::JsConnector,
}

fn get_connection_info(url: &str) -> connector::Result<ConnectionInfo> {
ConnectionInfo::from_url(url).map_err(|err| {
ConnectorError::from_kind(ErrorKind::InvalidDatabaseUrl {
details: err.to_string(),
url: url.to_string(),
})
})
}

#[async_trait]
impl FromSource for Js {
async fn from_source(
source: &psl::Datasource,
url: &str,
features: psl::PreviewFeatures,
) -> connector_interface::Result<Js> {
let psl_connector = source.active_connector.as_js_connector().unwrap_or_else(|| {
panic!(
"Connector for {} is not a JsConnector",
source.active_connector.provider_name()
)
});

let connector = registered_js_connector().unwrap().clone();
let connection_info = get_connection_info(url)?;

return Ok(Js {
connector: JsConnector { queryable: connector },
connection_info,
features: features.to_owned(),
psl_connector,
});
}
}

#[async_trait]
impl Connector for Js {
async fn get_connection<'a>(&'a self) -> connector::Result<Box<dyn Connection + Send + Sync + 'static>> {
super::catch(self.connection_info.clone(), async move {
let sql_conn = SqlConnection::new(self.connector.clone(), &self.connection_info, self.features);
Ok(Box::new(sql_conn) as Box<dyn Connection + Send + Sync + 'static>)
})
.await
}

fn name(&self) -> &'static str {
self.psl_connector.name
}

fn should_retry_on_transient_error(&self) -> bool {
false
}
}

// TODO: miguelff: I haven´t found a better way to do this, yet... please continue reading.
//
// There is a bug in NAPI-rs by wich compiling a binary crate that links code using napi-rs
// bindings breaks. We could have used a JsQueryable from the `js-connectors` crate directly, as the
// `connection` field of a `Js` connector, but that will imply using napi-rs transitively, and break
// the tests (which are compiled as binary creates)
//
// To avoid the problem above I separated interface from implementation, making JsConnector
// independent on napi-rs. Initially, I tried having a field Arc<&dyn TransactionCabable> to hold
// JsQueryable at runtime. I did this, because TransactionCapable is the trait bounds required to
// create a value of `SqlConnection` (see [SqlConnection::new])) to actually performt the queries.
// using JSQueryable. However, this didn't work because TransactionCapable is not object safe.
// (has Sized as a supertrait)
//
// The thing is that TransactionCapable is not object safe and cannot be used in a dynamic type
// declaration, so finally I couldn't come up with anything better then wrapping a QuaintQueryable
// in this object, and implementing TransactionCapable (and quaint::Queryable) explicitly for it.
#[derive(Clone)]
struct JsConnector {
queryable: Arc<dyn QuaintQueryable>,
}

#[async_trait]
impl QuaintQueryable for JsConnector {
async fn query(&self, q: Query<'_>) -> quaint::Result<quaint::prelude::ResultSet> {
self.queryable.query(q).await
}

async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<quaint::prelude::ResultSet> {
self.queryable.query_raw(sql, params).await
}

async fn query_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<quaint::prelude::ResultSet> {
self.queryable.query_raw_typed(sql, params).await
}

async fn execute(&self, q: Query<'_>) -> quaint::Result<u64> {
self.queryable.execute(q).await
}

async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<u64> {
self.queryable.execute_raw(sql, params).await
}

async fn execute_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<u64> {
self.queryable.execute_raw_typed(sql, params).await
}

async fn raw_cmd(&self, cmd: &str) -> quaint::Result<()> {
self.queryable.raw_cmd(cmd).await
}

async fn version(&self) -> quaint::Result<Option<String>> {
self.queryable.version().await
}

fn is_healthy(&self) -> bool {
self.queryable.is_healthy()
}

async fn set_tx_isolation_level(&self, isolation_level: IsolationLevel) -> quaint::Result<()> {
self.queryable.set_tx_isolation_level(isolation_level).await
}

fn requires_isolation_first(&self) -> bool {
self.queryable.requires_isolation_first()
}
}

impl TransactionCapable for JsConnector {}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#[cfg(feature = "js-connectors")]
pub mod js;
mod runtime;

mod connection;
#[cfg(feature = "js-connectors")]
mod js;
mod mssql;
mod mysql;
mod postgresql;
Expand All @@ -14,6 +12,8 @@ pub(crate) mod operations;
use async_trait::async_trait;
use connector_interface::{error::ConnectorError, Connector};

#[cfg(feature = "js-connectors")]
pub use js::*;
pub use mssql::*;
pub use mysql::*;
pub use postgresql::*;
Expand Down
34 changes: 4 additions & 30 deletions query-engine/connectors/sql-query-connector/src/database/mysql.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::connection::SqlConnection;
use super::runtime::RuntimePool;
use crate::{FromSource, SqlError};
use async_trait::async_trait;
use connector_interface::{
Expand All @@ -11,7 +10,7 @@ use quaint::{pooled::Quaint, prelude::ConnectionInfo};
use std::time::Duration;

pub struct Mysql {
pool: RuntimePool,
pool: Quaint,
connection_info: ConnectionInfo,
features: psl::PreviewFeatures,
}
Expand Down Expand Up @@ -39,31 +38,10 @@ fn get_connection_info(url: &str) -> connector::Result<ConnectionInfo> {
#[async_trait]
impl FromSource for Mysql {
async fn from_source(
source: &psl::Datasource,
_: &psl::Datasource,
url: &str,
features: psl::PreviewFeatures,
) -> connector_interface::Result<Mysql> {
if source.provider == "@prisma/mysql" {
#[cfg(feature = "js-connectors")]
{
let driver = super::js::registered_driver();
let connection_info = get_connection_info(url)?;

return Ok(Mysql {
pool: RuntimePool::Js(driver.unwrap().clone()),
connection_info,
features: features.to_owned(),
});
}

#[cfg(not(feature = "js-connectors"))]
{
return Err(ConnectorError::from_kind(ErrorKind::UnsupportedConnector(
"The @prisma/mysql connector requires the `jsConnectors` preview feature to be enabled.".into(),
)));
}
}

let connection_info = get_connection_info(url)?;

let mut builder = Quaint::builder(url)
Expand All @@ -77,7 +55,7 @@ impl FromSource for Mysql {
let connection_info = pool.connection_info().to_owned();

Ok(Mysql {
pool: RuntimePool::Rust(pool),
pool,
connection_info,
features: features.to_owned(),
})
Expand All @@ -99,11 +77,7 @@ impl Connector for Mysql {
}

fn name(&self) -> &'static str {
if self.pool.is_nodejs() {
"@prisma/mysql"
} else {
"mysql"
}
"mysql"
}

fn should_retry_on_transient_error(&self) -> bool {
Expand Down
Loading

0 comments on commit 512cf35

Please sign in to comment.