Skip to content

Commit

Permalink
Add JOB benchmark dataset [1/N] (imdb dataset) (#12497)
Browse files Browse the repository at this point in the history
* imdb dataset

* cargo fmt

* we should also extrac the tar after download

* we should not skip last col
  • Loading branch information
doupache authored Sep 24, 2024
1 parent ba8a759 commit 6546479
Show file tree
Hide file tree
Showing 5 changed files with 450 additions and 0 deletions.
83 changes: 83 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ main() {
data_tpch "10"
data_clickbench_1
data_clickbench_partitioned
data_imdb
;;
tpch)
data_tpch "1"
Expand All @@ -166,6 +167,9 @@ main() {
clickbench_extended)
data_clickbench_1
;;
imdb)
data_imdb
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
usage
Expand Down Expand Up @@ -430,6 +434,85 @@ run_clickbench_extended() {
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o "${RESULTS_FILE}"
}

# Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors)
# http://homepages.cwi.nl/~boncz/job/imdb.tgz
data_imdb() {
local imdb_dir="${DATA_DIR}/imdb"
local imdb_temp_gz="${imdb_dir}/imdb.tgz"
local imdb_url="https://homepages.cwi.nl/~boncz/job/imdb.tgz"

# imdb has 21 files, we just separate them into 3 groups for better readability
local first_required_files=(
"aka_name.parquet"
"aka_title.parquet"
"cast_info.parquet"
"char_name.parquet"
"comp_cast_type.parquet"
"company_name.parquet"
"company_type.parquet"
)

local second_required_files=(
"complete_cast.parquet"
"info_type.parquet"
"keyword.parquet"
"kind_type.parquet"
"link_type.parquet"
"movie_companies.parquet"
"movie_info.parquet"
)

local third_required_files=(
"movie_info_idx.parquet"
"movie_keyword.parquet"
"movie_link.parquet"
"name.parquet"
"person_info.parquet"
"role_type.parquet"
"title.parquet"
)

# Combine the three arrays into one
local required_files=("${first_required_files[@]}" "${second_required_files[@]}" "${third_required_files[@]}")
local convert_needed=false

# Create directory if it doesn't exist
mkdir -p "${imdb_dir}"

# Check if required files exist
for file in "${required_files[@]}"; do
if [ ! -f "${imdb_dir}/${file}" ]; then
convert_needed=true
break
fi
done

if [ "$convert_needed" = true ]; then
if [ ! -f "${imdb_dir}/imdb.tgz" ]; then
echo "Downloading IMDB dataset..."

# Download the dataset
curl -o "${imdb_temp_gz}" "${imdb_url}"

# Extract the dataset
tar -xzvf "${imdb_temp_gz}" -C "${imdb_dir}"
$CARGO_COMMAND --bin imdb -- convert --input ${imdb_dir} --output ${imdb_dir} --format parquet
else
echo "IMDB.tgz already exists."

# Extract the dataset
tar -xzvf "${imdb_temp_gz}" -C "${imdb_dir}"
$CARGO_COMMAND --bin imdb -- convert --input ${imdb_dir} --output ${imdb_dir} --format parquet
fi
echo "IMDB dataset downloaded and extracted."
else
echo "IMDB dataset already exists and contains required parquet files."
fi
}




compare_benchmarks() {
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
BRANCH1="$1"
Expand Down
49 changes: 49 additions & 0 deletions benchmarks/src/bin/imdb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! IMDB binary entrypoint
use datafusion::error::Result;
use datafusion_benchmarks::imdb;
use structopt::StructOpt;

#[cfg(all(feature = "snmalloc", feature = "mimalloc"))]
compile_error!(
"feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the same time"
);

#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;

#[cfg(feature = "mimalloc")]
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

#[derive(Debug, StructOpt)]
#[structopt(name = "IMDB", about = "IMDB Dataset Processing.")]
enum ImdbOpt {
Convert(imdb::ConvertOpt),
}

#[tokio::main]
pub async fn main() -> Result<()> {
env_logger::init();
match ImdbOpt::from_args() {
ImdbOpt::Convert(opt) => opt.run().await,
}
}
112 changes: 112 additions & 0 deletions benchmarks/src/imdb/convert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::dataframe::DataFrameWriteOptions;
use datafusion_common::instant::Instant;
use std::path::PathBuf;

use datafusion::error::Result;
use datafusion::prelude::*;
use structopt::StructOpt;

use datafusion::common::not_impl_err;

use super::get_imdb_table_schema;
use super::IMDB_TABLES;

#[derive(Debug, StructOpt)]
pub struct ConvertOpt {
/// Path to csv files
#[structopt(parse(from_os_str), required = true, short = "i", long = "input")]
input_path: PathBuf,

/// Output path
#[structopt(parse(from_os_str), required = true, short = "o", long = "output")]
output_path: PathBuf,

/// Output file format: `csv` or `parquet`
#[structopt(short = "f", long = "format")]
file_format: String,

/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
batch_size: usize,
}

impl ConvertOpt {
pub async fn run(self) -> Result<()> {
let input_path = self.input_path.to_str().unwrap();
let output_path = self.output_path.to_str().unwrap();

for table in IMDB_TABLES {
let start = Instant::now();
let schema = get_imdb_table_schema(table);

let input_path = format!("{input_path}/{table}.csv");
let output_path = format!("{output_path}/{table}.parquet");
let options = CsvReadOptions::new()
.schema(&schema)
.has_header(false)
.delimiter(b',')
.escape(b'\\')
.file_extension(".csv");

let config = SessionConfig::new().with_batch_size(self.batch_size);
let ctx = SessionContext::new_with_config(config);

let mut csv = ctx.read_csv(&input_path, options).await?;

// Select all apart from the padding column
let selection = csv
.schema()
.iter()
.take(schema.fields.len())
.map(Expr::from)
.collect();

csv = csv.select(selection)?;

println!(
"Converting '{}' to {} files in directory '{}'",
&input_path, self.file_format, &output_path
);
match self.file_format.as_str() {
"csv" => {
csv.write_csv(
output_path.as_str(),
DataFrameWriteOptions::new(),
None,
)
.await?;
}
"parquet" => {
csv.write_parquet(
output_path.as_str(),
DataFrameWriteOptions::new(),
None,
)
.await?;
}
other => {
return not_impl_err!("Invalid output format: {other}");
}
}
println!("Conversion completed in {} ms", start.elapsed().as_millis());
}
Ok(())
}
}
Loading

0 comments on commit 6546479

Please sign in to comment.