Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging record_batch results in incorrect Dictionary Data #674

Closed
gsilvestrin opened this issue Mar 10, 2023 · 0 comments · Fixed by #681
Closed

Merging record_batch results in incorrect Dictionary Data #674

gsilvestrin opened this issue Mar 10, 2023 · 0 comments · Fixed by #681
Assignees
Labels
bug Something isn't working rust Rust related tasks

Comments

@gsilvestrin
Copy link
Contributor

When merging multiple record_batch together, it results in wrong / corrupted data being stored to disk. The root cause is the way arrow-rs handle merges, there is an open issue for it: apache/arrow-rs#506.

We need to change the way our encoders work and handle merging multiple record_batch.

The following test case can be used to reproduce the issue

   #[tokio::test]
    async fn test_read_struct_of_dictionary_arrays() {
        let test_dir = tempdir().unwrap();

        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
            "s",
            DataType::Struct(vec![ArrowField::new(
                "d",
                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
                true,
            )]),
            true,
        )]));

        let mut batches: Vec<RecordBatch> = Vec::new();
        for i in 1..100 {
            let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
            dict_builder.append_null();
            dict_builder.append("a").unwrap();
            dict_builder.append("b").unwrap();
            dict_builder.append("c").unwrap();

            let struct_array = Arc::new(StructArray::from(vec![(
                ArrowField::new(
                    "d",
                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
                    true,
                ),
                Arc::new(dict_builder.finish()) as ArrayRef,
            )]));

            let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
            batches.push(batch);
        }

        let test_uri = test_dir.path().to_str().unwrap();

        let batches = crate::arrow::RecordBatchBuffer::new(batches);
        let mut batches: Box<dyn RecordBatchReader> = Box::new(batches);
        Dataset::write(&mut batches, test_uri, Some(WriteParams::default()))
            .await
            .unwrap();

        let result = scan_dataset(test_uri).await.unwrap();
        // assert_eq!(batch, result.as_slice()[0]);
    }

    async fn scan_dataset(uri: &str) -> Result<Vec<RecordBatch>> {
        let results = Dataset::open(uri)
            .await?
            .scan()
            .try_into_stream()
            .await?
            .try_collect::<Vec<_>>()
            .await?;
        Ok(results)
    }
@gsilvestrin gsilvestrin added bug Something isn't working rust Rust related tasks labels Mar 10, 2023
@gsilvestrin gsilvestrin self-assigned this Mar 10, 2023
@gsilvestrin gsilvestrin linked a pull request Mar 18, 2023 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working rust Rust related tasks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant