Skip to content

Commit

Permalink
fix: fix EntityTooSmall issue (#4100)
Browse files Browse the repository at this point in the history
* fix: fix EntityTooSmall issue

* chore(ci): add minio to coverage

* tests: add test for parquet writer

* chore: move tests to `common-datasource` crate
  • Loading branch information
WenyXu authored Jun 4, 2024
1 parent a80059b commit a626939
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ jobs:
- name: Setup kafka server
working-directory: tests-integration/fixtures/kafka
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup minio
working-directory: tests-integration/fixtures/minio
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
env:
Expand All @@ -528,6 +531,11 @@ jobs:
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}
GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }}
GT_MINIO_BUCKET: greptime
GT_MINIO_ACCESS_KEY_ID: superpower_ci_user
GT_MINIO_ACCESS_KEY: superpower_password
GT_MINIO_REGION: us-west-2
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
UNITTEST_LOG_DIR: "__unittest_logs"
Expand Down
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/common/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async-compression = { version = "0.3", features = [
] }
async-trait.workspace = true
bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
Expand All @@ -33,6 +34,7 @@ object-store.workspace = true
orc-rust = { git = "https://github.com/datafusion-contrib/datafusion-orc.git", rev = "502217315726314c4008808fe169764529640599" }
parquet.workspace = true
paste = "1.0"
rand.workspace = true
regex = "1.7"
serde.workspace = true
snafu.workspace = true
Expand All @@ -42,4 +44,7 @@ tokio-util.workspace = true
url = "2.3"

[dev-dependencies]
common-telemetry.workspace = true
common-test-util.workspace = true
dotenv.workspace = true
uuid.workspace = true
2 changes: 2 additions & 0 deletions src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
use crate::compression::CompressionType;
use crate::error::{self, Result};
use crate::share_buffer::SharedBuffer;
use crate::DEFAULT_WRITE_BUFFER_SIZE;

pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
pub const FORMAT_DELIMITER: &str = "delimiter";
Expand Down Expand Up @@ -204,6 +205,7 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
store
.writer_with(&path)
.concurrent(concurrency)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
Expand Down
72 changes: 72 additions & 0 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBuffer
use crate::error::{self, Result};
use crate::file_format::FileFormat;
use crate::share_buffer::SharedBuffer;
use crate::DEFAULT_WRITE_BUFFER_SIZE;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct ParquetFormat {}
Expand Down Expand Up @@ -197,6 +198,7 @@ impl BufferedWriter {
store
.writer_with(&path)
.concurrent(concurrency)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
Expand Down Expand Up @@ -276,9 +278,19 @@ pub async fn stream_to_parquet(

#[cfg(test)]
mod tests {
use std::env;
use std::sync::Arc;

use common_telemetry::warn;
use common_test_util::find_workspace_path;
use datatypes::arrow::array::{ArrayRef, Int64Array, RecordBatch};
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use object_store::services::S3;
use object_store::ObjectStore;
use rand::{thread_rng, Rng};

use super::*;
use crate::file_format::parquet::BufferedWriter;
use crate::test_util::{format_schema, test_store};

fn test_data_root() -> String {
Expand All @@ -296,4 +308,64 @@ mod tests {

assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
}

#[tokio::test]
async fn test_parquet_writer() {
common_telemetry::init_default_ut_logging();
let _ = dotenv::dotenv();
let Ok(bucket) = env::var("GT_MINIO_BUCKET") else {
warn!("ignoring test parquet writer");
return;
};

let mut builder = S3::default();
let _ = builder
.root(&uuid::Uuid::new_v4().to_string())
.access_key_id(&env::var("GT_MINIO_ACCESS_KEY_ID").unwrap())
.secret_access_key(&env::var("GT_MINIO_ACCESS_KEY").unwrap())
.bucket(&bucket)
.region(&env::var("GT_MINIO_REGION").unwrap())
.endpoint(&env::var("GT_MINIO_ENDPOINT_URL").unwrap());

let object_store = ObjectStore::new(builder).unwrap().finish();
let file_path = uuid::Uuid::new_v4().to_string();
let fields = vec![
Field::new("field1", DataType::Int64, true),
Field::new("field0", DataType::Int64, true),
];
let arrow_schema = Arc::new(Schema::new(fields));
let mut buffered_writer = BufferedWriter::try_new(
file_path.clone(),
object_store.clone(),
arrow_schema.clone(),
None,
// Sets a small value.
128,
8,
)
.await
.unwrap();
let rows = 200000;
let generator = || {
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(
(0..rows)
.map(|_| thread_rng().gen::<i64>())
.collect::<Vec<_>>(),
)),
Arc::new(Int64Array::from(
(0..rows)
.map(|_| thread_rng().gen::<i64>())
.collect::<Vec<_>>(),
)),
];
RecordBatch::try_new(arrow_schema.clone(), columns).unwrap()
};
let batch = generator();
// Writes about ~30Mi
for _ in 0..10 {
buffered_writer.write(&batch).await.unwrap();
}
buffered_writer.close().await.unwrap();
}
}
5 changes: 5 additions & 0 deletions src/common/datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,8 @@ pub mod test_util;
#[cfg(test)]
pub mod tests;
pub mod util;

use common_base::readable_size::ReadableSize;

/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ uuid.workspace = true
common-procedure-test.workspace = true
common-test-util.workspace = true
criterion = "0.4"
dotenv.workspace = true
log-store.workspace = true
object-store = { workspace = true, features = ["services-memory"] }
rskafka.workspace = true
Expand Down
18 changes: 18 additions & 0 deletions tests-integration/fixtures/minio/docker-compose-standalone.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: '3.8'
services:
minio:
image: bitnami/minio:2024
ports:
- '9000:9000'
- '9001:9001'
environment:
- MINIO_ROOT_USER=superpower_ci_user
- MINIO_ROOT_PASSWORD=superpower_password
- MINIO_DEFAULT_BUCKETS=greptime
- BITNAMI_DEBUG=true
volumes:
- 'minio_data:/bitnami/minio/data'

volumes:
minio_data:
driver: local

0 comments on commit a626939

Please sign in to comment.