Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(adapter-d1) Use max_bind_value = 100 on Cloudflare D1 #4878

Merged
merged 14 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ tracing = { version = "0.1" }
tsify = { version = "0.4.5" }
wasm-bindgen = { version = "0.2.92" }
wasm-bindgen-futures = { version = "0.4" }
wasm-rs-dbg = { version = "0.1.2" }
wasm-rs-dbg = { version = "0.1.2", default-features = false, features = ["console-error"] }
wasm-bindgen-test = { version = "0.3.0" }
url = { version = "2.5.0" }

Expand Down
15 changes: 14 additions & 1 deletion quaint/src/connector/connection_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,19 @@ impl ConnectionInfo {
}
}

pub fn max_insert_rows(&self) -> Option<usize> {
self.sql_family().max_insert_rows()
}

pub fn max_bind_values(&self) -> usize {
match self {
#[cfg(not(target_arch = "wasm32"))]
ConnectionInfo::Native(_) => self.sql_family().max_bind_values(),
// Wasm connectors can override the default max bind values.
ConnectionInfo::External(info) => info.max_bind_values.unwrap_or(self.sql_family().max_bind_values()),
}
}

/// The family of databases connected.
pub fn sql_family(&self) -> SqlFamily {
match self {
Expand Down Expand Up @@ -316,7 +329,7 @@ impl SqlFamily {
}

/// Get the default max rows for a batch insert.
pub fn max_insert_rows(&self) -> Option<usize> {
pub(crate) fn max_insert_rows(&self) -> Option<usize> {
match self {
#[cfg(feature = "postgresql")]
SqlFamily::Postgres => None,
Expand Down
4 changes: 3 additions & 1 deletion quaint/src/connector/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use super::{SqlFamily, TransactionCapable};
pub struct ExternalConnectionInfo {
pub sql_family: SqlFamily,
pub schema_name: String,
pub max_bind_values: Option<usize>,
}

impl ExternalConnectionInfo {
pub fn new(sql_family: SqlFamily, schema_name: String) -> Self {
pub fn new(sql_family: SqlFamily, schema_name: String, max_bind_values: Option<usize>) -> Self {
ExternalConnectionInfo {
sql_family,
schema_name,
max_bind_values,
}
}
}
Expand Down
63 changes: 36 additions & 27 deletions query-engine/connector-test-kit-rs/qe-setup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ use psl::{builtin_connectors::*, Datasource};
use schema_core::schema_connector::{ConnectorResult, DiffTarget, SchemaConnector};
use std::env;

#[derive(Debug, serde::Deserialize, PartialEq)]
pub struct InitResult {
pub max_bind_values: Option<usize>,
}

pub trait ExternalInitializer<'a>
where
Self: Sized,
{
#[allow(async_fn_in_trait)]
async fn init_with_migration(&self, script: String) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
async fn init_with_migration(&self, script: String)
-> Result<InitResult, Box<dyn std::error::Error + Send + Sync>>;

#[allow(async_fn_in_trait)]
async fn init(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
async fn init(&self) -> Result<InitResult, Box<dyn std::error::Error + Send + Sync>>;

fn url(&self) -> &'a str;
fn datamodel(&self) -> &'a str;
Expand Down Expand Up @@ -63,38 +69,41 @@ pub async fn setup_external<'a, EI>(
driver_adapter: DriverAdapter,
initializer: EI,
db_schemas: &[&str],
) -> ConnectorResult<()>
) -> ConnectorResult<InitResult>
where
EI: ExternalInitializer<'a> + ?Sized,
{
let prisma_schema = initializer.datamodel();
let (source, url, _preview_features) = parse_configuration(prisma_schema)?;

if driver_adapter == DriverAdapter::D1 {
// 1. Compute the diff migration script.
std::fs::remove_file(source.url.as_literal().unwrap().trim_start_matches("file:")).ok();
let mut connector = sql_schema_connector::SqlSchemaConnector::new_sqlite();
let migration_script = crate::diff(prisma_schema, url, &mut connector).await?;

// 2. Tell JavaScript to take care of the schema migration.
// This results in a JSON-RPC call to the JS runtime.
// The JSON-RPC machinery is defined in the `[query-tests-setup]` crate, and it
// implements the `ExternalInitializer<'a>` trait.
initializer
.init_with_migration(migration_script)
.await
.map_err(|err| ConnectorError::from_msg(format!("Error migrating with D1 adapter: {}", err)))?;
} else {
setup(prisma_schema, db_schemas).await?;

// 3. Tell JavaScript to initialize the external test session.
// The schema migration is taken care of by the Schema Engine.
initializer.init().await.map_err(|err| {
ConnectorError::from_msg(format!("Error initializing {} adapter: {}", driver_adapter, err))
})?;
}
let init_result = match driver_adapter {
DriverAdapter::D1 => {
// 1. Compute the diff migration script.
std::fs::remove_file(source.url.as_literal().unwrap().trim_start_matches("file:")).ok();
let mut connector = sql_schema_connector::SqlSchemaConnector::new_sqlite();
let migration_script = crate::diff(prisma_schema, url, &mut connector).await?;

// 2. Tell JavaScript to take care of the schema migration.
// This results in a JSON-RPC call to the JS runtime.
// The JSON-RPC machinery is defined in the `[query-tests-setup]` crate, and it
// implements the `ExternalInitializer<'a>` trait.
initializer
.init_with_migration(migration_script)
.await
.map_err(|err| ConnectorError::from_msg(format!("Error migrating with D1 adapter: {}", err)))
}
_ => {
setup(prisma_schema, db_schemas).await?;

// 3. Tell JavaScript to initialize the external test session.
// The schema migration is taken care of by the Schema Engine.
initializer.init().await.map_err(|err| {
ConnectorError::from_msg(format!("Error initializing {} adapter: {}", driver_adapter, err))
})
}
}?;

Ok(())
Ok(init_result)
}

/// Database setup for connector-test-kit-rs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Actor {
Some("READ COMMITTED"),
);

let mut runner = Runner::load(datamodel, &[], version, tag, setup_metrics(), log_capture).await?;
let mut runner = Runner::load(datamodel, &[], version, tag, None, setup_metrics(), log_capture).await?;

tokio::spawn(async move {
while let Some(message) = query_receiver.recv().await {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use query_engine_tests::*;

#[test_suite(schema(autoinc_id), capabilities(CreateMany, AutoIncrement), exclude(CockroachDb))]
#[test_suite(schema(autoinc_id), capabilities(AutoIncrement), exclude(CockroachDb))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random find, a couple of line below D1 is excluded, just wanted to let you know, would it be interesting to change that?

#[connector_test(exclude(CockroachDb, Sqlite("cfd1")))]

mod not_in_chunking {
use query_engine_tests::Runner;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ pub struct TestConfigFromSerde {
/// This is the URL to the mobile emulator which will execute the queries against
/// the instances of the engine running on the device.
pub(crate) mobile_emulator_url: Option<String>,

/// The maximum number of bind values to use in a query for a driver adapter test runner.
pub(crate) driver_adapter_max_bind_values: Option<usize>,
}

impl TestConfigFromSerde {
Expand Down Expand Up @@ -156,6 +159,9 @@ pub(crate) struct WithDriverAdapter {
/// The driver adapter configuration to forward as a stringified JSON object to the external
/// test executor by setting the `DRIVER_ADAPTER_CONFIG` env var when spawning the executor.
pub(crate) config: Option<DriverAdapterConfig>,

/// The maximum number of bind values to use in a query for a driver adapter test runner.
pub(crate) max_bind_values: Option<usize>,
}

impl WithDriverAdapter {
Expand All @@ -181,6 +187,7 @@ impl From<TestConfigFromSerde> for TestConfig {
adapter,
test_executor: config.external_test_executor.unwrap(),
config: config.driver_adapter_config,
max_bind_values: config.driver_adapter_max_bind_values,
}),
None => None,
};
Expand Down Expand Up @@ -295,6 +302,9 @@ impl TestConfig {
let driver_adapter_config = std::env::var("DRIVER_ADAPTER_CONFIG")
.map(|config| serde_json::from_str::<DriverAdapterConfig>(config.as_str()).ok())
.unwrap_or_default();
let driver_adapter_max_bind_values = std::env::var("DRIVER_ADAPTER_MAX_BIND_VALUES")
.ok()
.map(|v| v.parse::<usize>().unwrap());

let mobile_emulator_url = std::env::var("MOBILE_EMULATOR_URL").ok();

Expand All @@ -310,6 +320,7 @@ impl TestConfig {
driver_adapter,
driver_adapter_config,
mobile_emulator_url,
driver_adapter_max_bind_values,
})
.map(Self::from)
}
Expand Down Expand Up @@ -387,7 +398,7 @@ impl TestConfig {
}

pub fn test_connector(&self) -> TestResult<(ConnectorTag, ConnectorVersion)> {
let version = ConnectorVersion::try_from((self.connector(), self.connector_version()))?;
let version = self.parse_connector_version()?;
let tag = match version {
ConnectorVersion::SqlServer(_) => &SqlServerConnectorTag as ConnectorTag,
ConnectorVersion::Postgres(_) => &PostgresConnectorTag,
Expand All @@ -401,6 +412,17 @@ impl TestConfig {
Ok((tag, version))
}

pub fn max_bind_values(&self) -> Option<usize> {
let version = self.parse_connector_version().unwrap();
let local_mbv = self.with_driver_adapter().and_then(|config| config.max_bind_values);

local_mbv.or_else(|| version.max_bind_values())
}

fn parse_connector_version(&self) -> TestResult<ConnectorVersion> {
ConnectorVersion::try_from((self.connector(), self.connector_version()))
}

#[rustfmt::skip]
pub fn for_external_executor(&self) -> Vec<(String, String)> {
let with_driver_adapter = self.with_driver_adapter().unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,11 @@ impl ConnectorVersion {
/// From the PoV of the test binary, the target architecture is that of where the test runs,
/// generally x86_64, or aarch64, etc.
///
/// As a consequence there is an mismatch between the the max_bind_values as seen by the test
/// As a consequence there is a mismatch between the max_bind_values as seen by the test
/// binary (overriden by the QUERY_BATCH_SIZE env var) and the max_bind_values as seen by the
/// WASM engine being exercised in those tests, through the RunnerExecutor::External test runner.
///
/// What we do in here, is returning the number of max_bind_values hat the connector under test
/// What we do in here, is returning the number of max_bind_values that the connector under test
/// will use. i.e. if it's a WASM connector, the default, not overridable one. Otherwise the one
/// as seen by the test binary (which will be the same as the engine exercised)
pub fn max_bind_values(&self) -> Option<usize> {
Expand All @@ -318,14 +318,17 @@ impl ConnectorVersion {
}
}

/// Determines if the connector uses a driver adapter implemented in Wasm
/// Determines if the connector uses a driver adapter implemented in Wasm.
/// Do not delete! This is used because the `#[cfg(target_arch = "wasm32")]` conditional compilation
/// directive doesn't work in the test runner.
fn is_wasm(&self) -> bool {
matches!(
self,
Self::Postgres(Some(PostgresVersion::PgJsWasm))
| Self::Postgres(Some(PostgresVersion::NeonJsWasm))
| Self::Vitess(Some(VitessVersion::PlanetscaleJsWasm))
| Self::Sqlite(Some(SqliteVersion::LibsqlJsWasm))
| Self::Sqlite(Some(SqliteVersion::CloudflareD1))
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ fn run_relation_link_test_impl(
run_with_tokio(
async move {
println!("Used datamodel:\n {}", datamodel.yellow());
let runner = Runner::load(datamodel.clone(), &[], version, connector_tag, metrics, log_capture)
let override_local_max_bind_values = None;
let runner = Runner::load(datamodel.clone(), &[], version, connector_tag, override_local_max_bind_values, metrics, log_capture)
.await
.unwrap();

Expand Down Expand Up @@ -281,11 +282,13 @@ fn run_connector_test_impl(
crate::run_with_tokio(
async {
println!("Used datamodel:\n {}", datamodel.yellow());
let override_local_max_bind_values = None;
let runner = Runner::load(
datamodel.clone(),
db_schemas,
version,
connector_tag,
override_local_max_bind_values,
metrics,
log_capture,
)
Expand Down
Loading
Loading