Skip to content

Commit

Permalink
TOKIO_RUNTIME
Browse files Browse the repository at this point in the history
  • Loading branch information
danking committed Oct 18, 2024
1 parent 1be39a7 commit f9f6662
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 7 additions & 12 deletions bench-vortex/benches/compress_noci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use vortex_dtype::field::Field;
use vortex_error::VortexResult;
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex_sampling_compressor::{SamplingCompressor, ALL_COMPRESSORS_CONTEXT};
use vortex_serde::io::TOKIO_RUNTIME;
use vortex_serde::layouts::{LayoutContext, LayoutDeserializer, LayoutReaderBuilder, LayoutWriter};

#[derive(serde::Serialize)]
Expand Down Expand Up @@ -160,10 +161,7 @@ fn benchmark_compress<F, U>(
U: AsRef<Array>,
{
ensure_dir_exists("benchmarked-files").unwrap();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let runtime = &TOKIO_RUNTIME;
let uncompressed = make_uncompressed();
let uncompressed_size = uncompressed.as_ref().nbytes();
let mut compressed_size = 0;
Expand All @@ -176,7 +174,7 @@ fn benchmark_compress<F, U>(
group.bench_function(bench_name, |b| {
b.iter_with_large_drop(|| {
compressed_size = black_box(
vortex_compressed_written_size(&runtime, compressor, uncompressed.as_ref())
vortex_compressed_written_size(runtime, compressor, uncompressed.as_ref())
.unwrap(),
);
});
Expand Down Expand Up @@ -212,10 +210,10 @@ fn benchmark_compress<F, U>(
measurement_time.map(|t| group.measurement_time(t));
group.bench_function(bench_name, |b| {
let mut buf = Vec::new();
vortex_compress_write(&runtime, compressor, uncompressed.as_ref(), &mut buf).unwrap();
vortex_compress_write(runtime, compressor, uncompressed.as_ref(), &mut buf).unwrap();
let arc = Arc::new(buf);
b.iter_with_large_drop(|| {
black_box(vortex_decompress_read(&runtime, arc.clone()).unwrap());
black_box(vortex_decompress_read(runtime, arc.clone()).unwrap());
});
});
group.finish();
Expand Down Expand Up @@ -250,7 +248,7 @@ fn benchmark_compress<F, U>(
.unwrap_or(false)
{
let vortex_nbytes =
vortex_compressed_written_size(&runtime, compressor, uncompressed.as_ref()).unwrap();
vortex_compressed_written_size(runtime, compressor, uncompressed.as_ref()).unwrap();

let parquet_zstd_nbytes = parquet_compressed_written_size(
uncompressed.as_ref(),
Expand Down Expand Up @@ -334,10 +332,7 @@ fn public_bi_benchmark(c: &mut Criterion) {

fn tpc_h_l_comment(c: &mut Criterion) {
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let rt = &TOKIO_RUNTIME;
let lineitem_vortex = rt.block_on(tpch::load_table(
data_dir,
"lineitem",
Expand Down
17 changes: 6 additions & 11 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vortex::arrow::infer_schema;
use vortex_dtype::field::Field;
use vortex_error::VortexResult;
use vortex_sampling_compressor::ALL_COMPRESSORS_CONTEXT;
use vortex_serde::io::TOKIO_RUNTIME;
use vortex_serde::layouts::{
LayoutContext, LayoutDeserializer, LayoutReaderBuilder, Projection, RowFilter,
VortexRecordBatchReader,
Expand Down Expand Up @@ -75,9 +76,7 @@ impl PyDataset {

let row_filter = row_filter.map(|x| RowFilter::new(x.borrow().unwrap().clone()));

tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
TOKIO_RUNTIME
.block_on(io::async_read(
self.fname(),
projection,
Expand All @@ -104,9 +103,7 @@ impl PyDataset {

let row_filter = row_filter.map(|x| RowFilter::new(x.borrow().unwrap().clone()));

let layout_reader = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
let layout_reader = TOKIO_RUNTIME
.block_on(io::layout_reader(
self_.fname(),
projection,
Expand All @@ -115,18 +112,16 @@ impl PyDataset {
))
.map_err(PyVortexError::map_err)?;

let record_batch_rader: Box<dyn RecordBatchReader + Send> =
let record_batch_reader: Box<dyn RecordBatchReader + Send> =
Box::new(VortexRecordBatchReader::new(layout_reader).map_err(PyVortexError::map_err)?);

record_batch_rader.into_pyarrow(self_.py())
record_batch_reader.into_pyarrow(self_.py())
}
}

#[pyfunction]
pub fn dataset(fname: &Bound<'_, PyString>) -> PyResult<PyDataset> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
TOKIO_RUNTIME
.block_on(PyDataset::new(fname.to_str()?))
.map_err(PyVortexError::map_err)
}
9 changes: 3 additions & 6 deletions pyvortex/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use vortex::Array;
use vortex_dtype::field::Field;
use vortex_error::VortexResult;
use vortex_sampling_compressor::ALL_COMPRESSORS_CONTEXT;
use vortex_serde::io::TOKIO_RUNTIME;
use vortex_serde::layouts::{
LayoutBatchStream, LayoutContext, LayoutDeserializer, LayoutReaderBuilder, LayoutWriter,
Projection, RowFilter,
Expand Down Expand Up @@ -162,9 +163,7 @@ pub fn read(

let row_filter = row_filter.map(|x| RowFilter::new(x.borrow().unwrap().clone()));

tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
TOKIO_RUNTIME
.block_on(async_read(fname, projection, None, row_filter))
.map_err(PyVortexError::map_err)
.map(PyArray::new)
Expand Down Expand Up @@ -248,9 +247,7 @@ pub fn write(array: &Bound<'_, PyArray>, f: &Bound<'_, PyString>) -> PyResult<()
let fname = f.to_str()?; // TODO(dk): support file objects
let array = array.borrow().unwrap().clone();

tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
TOKIO_RUNTIME
.block_on(run(&array, fname))
.map_err(PyVortexError::map_err)
}
3 changes: 2 additions & 1 deletion vortex-serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ futures = { workspace = true }
futures-executor = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
monoio = { workspace = true, optional = true, features = ["bytes"] }
object_store = { workspace = true, optional = true }
once_cell = { workspace = true }
pin-project = { workspace = true }
tokio = { workspace = true, features = ["io-util", "fs"], optional = true }
tokio = { workspace = true, features = ["io-util", "fs", "rt-multi-thread"], optional = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true, features = ["flatbuffers"] }
Expand Down
2 changes: 2 additions & 0 deletions vortex-serde/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ pub mod offset;
mod read;
mod tokio;
mod write;
#[cfg(feature = "tokio")]
pub use tokio::TOKIO_RUNTIME;
10 changes: 9 additions & 1 deletion vortex-serde/src/io/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@ use std::io;
use std::os::unix::prelude::FileExt;

use bytes::BytesMut;
use lazy_static::lazy_static;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::runtime::Runtime;
use vortex_buffer::io_buf::IoBuf;
use vortex_error::{VortexError, VortexUnwrap as _};
use vortex_error::{VortexError, VortexExpect, VortexUnwrap as _};

use crate::io::{VortexRead, VortexReadAt, VortexWrite};

pub struct TokioAdapter<IO>(pub IO);

lazy_static! {
pub static ref TOKIO_RUNTIME: Runtime = Runtime::new()
.map_err(VortexError::IOError)
.vortex_expect("tokio runtime must not fail to start");
}

impl<R: AsyncRead + Unpin> VortexRead for TokioAdapter<R> {
async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result<BytesMut> {
self.0.read_exact(buffer.as_mut()).await?;
Expand Down
2 changes: 2 additions & 0 deletions vortex-serde/src/layouts/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ mod context;
mod filtering;
mod footer;
mod layouts;
#[cfg(feature = "tokio")]
mod recordbatchreader;
mod stream;

pub use builder::LayoutReaderBuilder;
pub use cache::LayoutMessageCache;
pub use context::*;
pub use filtering::RowFilter;
#[cfg(feature = "tokio")]
pub use recordbatchreader::VortexRecordBatchReader;
pub use stream::LayoutBatchStream;
pub use vortex_schema::projection::Projection;
Expand Down
9 changes: 2 additions & 7 deletions vortex-serde/src/layouts/read/recordbatchreader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use vortex::Array;
use vortex_error::{VortexError, VortexResult};

use super::LayoutBatchStream;
use crate::io::VortexReadAt;
use crate::io::{VortexReadAt, TOKIO_RUNTIME};

fn vortex_to_arrow_error(error: VortexError) -> ArrowError {
ArrowError::ExternalError(Box::new(error))
Expand All @@ -23,19 +23,14 @@ fn vortex_to_arrow(result: VortexResult<Array>) -> Result<RecordBatch, ArrowErro
pub struct VortexRecordBatchReader<R: VortexReadAt + Unpin + Send + 'static> {
stream: LayoutBatchStream<R>,
arrow_schema: SchemaRef,
runtime: tokio::runtime::Runtime,
}

impl<R: VortexReadAt + Unpin + Send + 'static> VortexRecordBatchReader<R> {
pub fn new(stream: LayoutBatchStream<R>) -> VortexResult<VortexRecordBatchReader<R>> {
let arrow_schema = Arc::new(infer_schema(stream.schema().dtype())?);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
Ok(VortexRecordBatchReader {
stream,
arrow_schema,
runtime,
})
}
}
Expand All @@ -44,7 +39,7 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Iterator for VortexRecordBatchRea
type Item = Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_result = self.runtime.block_on(self.stream.next());
let maybe_result = TOKIO_RUNTIME.block_on(self.stream.next());
maybe_result.map(vortex_to_arrow)
}
}
Expand Down

0 comments on commit f9f6662

Please sign in to comment.