Skip to content

Commit

Permalink
rpc benchmark: init commit of direct reading DB based on schema
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Nov 21, 2024
1 parent 4cf47ad commit 9c98761
Show file tree
Hide file tree
Showing 9 changed files with 641 additions and 0 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ members = [
"crates/sui-replay",
"crates/sui-rest-api",
"crates/sui-rosetta",
"crates/sui-rpc-benchmark",
"crates/sui-rpc-loadgen",
"crates/sui-sdk",
"crates/sui-security-watchdog",
Expand Down
23 changes: 23 additions & 0 deletions crates/sui-rpc-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "sui-rpc-benchmark"
version.workspace = true
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
clap = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
futures.workspace = true
sqlparser = "0.52.0"
tokio-postgres = "0.7.12"

sui-pg-temp-db = { workspace = true }

[[bin]]
name = "sui-rpc-benchmark"
path = "src/main.rs"
22 changes: 22 additions & 0 deletions crates/sui-rpc-benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# sui-rpc-benchmark: Benchmarking Tool for SUI RPC Performance

`sui-rpc-benchmark` is a benchmarking utility designed to measure performance across different RPC access methods in Sui:
- Direct database reads
- JSON RPC endpoints
- GraphQL queries

## Usage Examples
Run benchmarks with:
```
# Direct database queries:
cargo run --bin sui-rpc-benchmark direct --db-url postgres://postgres:postgres@localhost:5432/sui
# JSON RPC endpoints:
cargo run --bin sui-rpc-benchmark jsonrpc --endpoint http://127.0.0.1:9000
# GraphQL queries:
cargo run --bin sui-rpc-benchmark graphql --endpoint http://127.0.0.1:9000/graphql
```



5 changes: 5 additions & 0 deletions crates/sui-rpc-benchmark/src/direct/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

pub mod query_executor;
pub mod query_generator;
221 changes: 221 additions & 0 deletions crates/sui-rpc-benchmark/src/direct/query_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use tokio_postgres::{types::Type, types::ToSql, Client, Row};

use crate::direct::query_generator::BenchmarkQuery;

#[derive(Debug)]
pub struct EnrichedBenchmarkQuery {
pub query: BenchmarkQuery,
pub rows: Vec<Row>,
}

pub struct QueryExecutor {
db_client: Client,
enriched_benchmark_queries: Vec<EnrichedBenchmarkQuery>,
}

impl QueryExecutor {
pub async fn new(db_url: &str, benchmark_queries: Vec<BenchmarkQuery>) -> Result<Self, anyhow::Error> {
let (client, connection) = tokio_postgres::connect(db_url, tokio_postgres::NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});

let mut executor = Self {
db_client: client,
enriched_benchmark_queries: Vec::new(),
};
let mut enriched_queries = Vec::new();
for query in benchmark_queries {
let enriched = executor.enrich_query(&query).await?;
enriched_queries.push(enriched);
}
executor.enriched_benchmark_queries = enriched_queries;

Ok(executor)
}

pub async fn run(&self) -> Result<(), anyhow::Error> {
println!("Starting parallel execution of {} queries", self.enriched_benchmark_queries.len());
let futures: Vec<_> = self.enriched_benchmark_queries.iter()
.map(|enriched| {
println!("Executing query: {}", enriched.query.query_template);
self.execute_query(enriched)
})
.collect();
let results = futures::future::join_all(futures).await;

for (i, result) in results.into_iter().enumerate() {
match result {
Ok(rows) => println!("Query \n'{}'\n completed successfully with {} rows", self.enriched_benchmark_queries[i].query.query_template, rows.len()),
Err(e) => println!("Query \n'{}'\n failed with error: {}", self.enriched_benchmark_queries[i].query.query_template, e.to_string()),
}
}
println!("All benchmark queries completed");
Ok(())
}

pub async fn enrich_query(&self, bq: &BenchmarkQuery) -> Result<EnrichedBenchmarkQuery, anyhow::Error> {
// TODO(gegaowp): only fetch one row for quick execution, will configure and fetch more.
let query = format!("SELECT {} FROM {} LIMIT 1", bq.needed_columns.join(","), bq.table_name);
println!("Enriched query: {}", query);
let rows = self.db_client.query(&query, &[]).await?;

Ok(EnrichedBenchmarkQuery { query: bq.clone(), rows })
}

pub async fn execute_query(&self, query: &EnrichedBenchmarkQuery) -> Result<Vec<tokio_postgres::Row>, tokio_postgres::Error> {
let mut all_results = Vec::new();
for row in &query.rows {
let params_vec = row_to_params(row).await;
let value_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params_vec.iter()
.map(|v| v.as_ref())
.collect();

let results = self.db_client.query(&query.query.query_template, &value_refs).await?;
all_results.extend(results);
}
Ok(all_results)
}
}

async fn row_to_params(row: &Row) -> Vec<Box<dyn ToSql + Sync>> {
let mut params: Vec<Box<dyn ToSql + Sync>> = Vec::new();

for i in 0..row.len() {
match row.columns()[i].type_() {
&Type::TEXT | &Type::VARCHAR => {
params.push(Box::new(row.get::<_, Option<String>>(i)) as Box<dyn ToSql + Sync>)
},
&Type::INT4 => {
params.push(Box::new(row.get::<_, Option<i32>>(i)) as Box<dyn ToSql + Sync>)
},
&Type::INT8 => {
params.push(Box::new(row.get::<_, Option<i64>>(i)) as Box<dyn ToSql + Sync>)
},
&Type::FLOAT8 => {
params.push(Box::new(row.get::<_, Option<f64>>(i)) as Box<dyn ToSql + Sync>)
},
&Type::BOOL => {
params.push(Box::new(row.get::<_, Option<bool>>(i)) as Box<dyn ToSql + Sync>)
},
&Type::INT2 => {
params.push(Box::new(row.get::<_, Option<i16>>(i)) as Box<dyn ToSql + Sync>)
},
&Type::BYTEA => {
params.push(Box::new(row.get::<_, Option<Vec<u8>>>(i)) as Box<dyn ToSql + Sync>)
},
_ => panic!("Unsupported type: {:?}", row.columns()[i].type_()),
}
}
params
}

#[cfg(test)]
mod tests {
use super::*;
use tokio_postgres::NoTls;
use sui_pg_temp_db::TempDb;

#[tokio::test]
async fn test_execute_enriched_query() -> Result<(), Box<dyn std::error::Error>> {
let db = TempDb::new().unwrap();
let url = db.database().url();
let (client, connection) = tokio_postgres::connect(url.as_str(), NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});

// Create test table and insert test data
client.execute(
"CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, name TEXT)",
&[],
).await?;
client.execute(
"INSERT INTO test_table (id, name) VALUES ($1, $2)",
&[&1i32 as &(dyn tokio_postgres::types::ToSql + Sync), &"test" as &(dyn tokio_postgres::types::ToSql + Sync)],
).await?;
// Create benchmark query
let benchmark_query = BenchmarkQuery {
query_template: "SELECT * FROM test_table WHERE id = $1 AND name = $2".to_string(),
table_name: "test_table".to_string(),
needed_columns: vec!["id".to_string(), "name".to_string()],
};

// Create executor and enrich query
let executor = QueryExecutor::new(url.as_str(), vec![benchmark_query]).await?;
let enriched_query = &executor.enriched_benchmark_queries[0];

// Assert enriched query details match what we expect
assert_eq!(enriched_query.query.query_template, "SELECT * FROM test_table WHERE id = $1 AND name = $2");
assert_eq!(enriched_query.query.table_name, "test_table");
assert_eq!(enriched_query.query.needed_columns, vec!["id".to_string(), "name".to_string()]);
assert_eq!(enriched_query.rows.len(), 1);

// Execute enriched query
let result = executor.execute_query(&enriched_query).await?;

// Verify result matches expected values
assert_eq!(result.len(), 1);
assert!(check_rows_consistency(&result[0], &enriched_query.rows[0])?);

Ok(())
}

fn check_rows_consistency(row1: &Row, row2: &Row) -> Result<bool, Box<dyn std::error::Error>> {
// Get column names for both rows
let cols1: Vec<&str> = row1.columns().iter().map(|c| c.name()).collect();
let cols2: Vec<&str> = row2.columns().iter().map(|c| c.name()).collect();

// Find overlapping columns
let common_cols: Vec<&str> = cols1.iter()
.filter(|&col| cols2.contains(col))
.cloned()
.collect();

// Check each common column for value equality
for col in common_cols {
// assert the column types match
let col_type1 = row1.columns()
.iter()
.find(|c| c.name() == col)
.map(|c| c.type_())
.unwrap();
let col_type2 = row2.columns()
.iter()
.find(|c| c.name() == col)
.map(|c| c.type_())
.unwrap();
assert_eq!(col_type1, col_type2, "Column types should match for column {}", col);
let col_type = col_type1;

// assert the column values match
if !compare_row_values(row1, row2, col, col_type)? {
println!("Column '{}' has inconsistent values between rows", col);
return Ok(false);
}
}

Ok(true)
}

fn compare_row_values(row1: &Row, row2: &Row, col: &str, col_type: &Type) -> Result<bool, Box<dyn std::error::Error>> {
Ok(match col_type {
&Type::TEXT | &Type::VARCHAR => row1.get::<_, String>(col) == row2.get::<_, String>(col),
&Type::INT4 => row1.get::<_, i32>(col) == row2.get::<_, i32>(col),
&Type::INT8 => row1.get::<_, i64>(col) == row2.get::<_, i64>(col),
&Type::FLOAT8 => row1.get::<_, f64>(col) == row2.get::<_, f64>(col),
&Type::BOOL => row1.get::<_, bool>(col) == row2.get::<_, bool>(col),
&Type::INT2 => row1.get::<_, i16>(col) == row2.get::<_, i16>(col),
&Type::BYTEA => row1.get::<_, Vec<u8>>(col) == row2.get::<_, Vec<u8>>(col),
_ => panic!("Unsupported type: {:?}", col_type),
})
}
}

Loading

0 comments on commit 9c98761

Please sign in to comment.