Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
einarmo committed Jan 24, 2025
1 parent e0c849e commit 9d9ec67
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 42 deletions.
1 change: 0 additions & 1 deletion cognite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,5 @@ tokio-util = { version = "0.7.10" }
wiremock = "0.6.0"

[dev-dependencies]
once_cell = { version = "1.19.0" }
uuid = { version = "1.10.0", features = ["v4"] }
wiremock = "0.6.0"
75 changes: 37 additions & 38 deletions cognite/src/api/data_ingestion/raw.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::VecDeque;

use futures::stream::{try_unfold, SelectAll};
use futures::{StreamExt, TryStream, TryStreamExt};
use futures::{FutureExt, StreamExt, TryStream, TryStreamExt};

use crate::api::resource::Resource;
use crate::dto::items::Items;
Expand Down Expand Up @@ -225,10 +225,9 @@ impl RawResource {
table_name: &str,
params: Option<RetrieveRowsQuery>,
) -> Result<Vec<RawRow>> {
Ok(self
.retrieve_all_rows_stream(db_name, table_name, params)
self.retrieve_all_rows_stream(db_name, table_name, params)
.try_collect()
.await?)
.await
}

/// Retrieve all rows from a table, following cursors and reading from multiple streams in parallel.
Expand All @@ -244,11 +243,9 @@ impl RawResource {
table_name: &str,
params: RetrieveAllPartitionedQuery,
) -> Result<Vec<RawRow>> {
Ok(self
.retrieve_all_rows_partitioned_stream(db_name, table_name, params)
.await?
self.retrieve_all_rows_partitioned_stream(db_name, table_name, params)
.try_collect()
.await?)
.await
}

/// Retrieve all rows from a table, following cursors and reading from multiple streams in parallel.
Expand All @@ -258,40 +255,40 @@ impl RawResource {
/// * `db_name` - Database to retrieve rows from.
/// * `table_name` - Table to retrieve rows from.
/// * `params` - Optional filter parameters.
pub async fn retrieve_all_rows_partitioned_stream<'a>(
pub fn retrieve_all_rows_partitioned_stream<'a>(
&'a self,
db_name: &'a str,
table_name: &'a str,
params: RetrieveAllPartitionedQuery,
) -> Result<impl TryStream<Ok = RawRow, Error = crate::Error, Item = Result<RawRow>> + Send + 'a>
{
let cursors = self
.retrieve_cursors_for_parallel_reads(
db_name,
table_name,
Some(RetrieveCursorsQuery {
min_last_updated_time: params.min_last_updated_time,
max_last_updated_time: params.max_last_updated_time,
number_of_cursors: params.number_of_cursors,
}),
)
.await?;

let mut streams = SelectAll::new();
for cursor in cursors {
let query = RetrieveRowsQuery {
limit: params.limit,
columns: params.columns.clone(),
cursor: Some(cursor),
) -> impl TryStream<Ok = RawRow, Error = crate::Error, Item = Result<RawRow>> + Send + 'a {
self.retrieve_cursors_for_parallel_reads(
db_name,
table_name,
Some(RetrieveCursorsQuery {
min_last_updated_time: params.min_last_updated_time,
max_last_updated_time: params.max_last_updated_time,
};
streams.push(
self.retrieve_all_rows_stream(db_name, table_name, Some(query))
.boxed(),
);
}
Ok(streams)
number_of_cursors: params.number_of_cursors,
}),
)
.into_stream()
.map_ok(move |cursors| {
let mut streams = SelectAll::new();
for cursor in cursors {
let query = RetrieveRowsQuery {
limit: params.limit,
columns: params.columns.clone(),
cursor: Some(cursor),
min_last_updated_time: params.min_last_updated_time,
max_last_updated_time: params.max_last_updated_time,
};
streams.push(
self.retrieve_all_rows_stream(db_name, table_name, Some(query))
.boxed(),
);
}
streams
})
.try_flatten()
}

/// Insert rows into a table.
Expand All @@ -308,11 +305,13 @@ impl RawResource {
&self,
db_name: &str,
table_name: &str,
ensure_parent: Option<bool>,
ensure_parent: bool,
rows: &[RawRowCreate],
) -> Result<()> {
let path = format!("raw/dbs/{db_name}/tables/{table_name}/rows");
let query = EnsureParentQuery { ensure_parent };
let query = EnsureParentQuery {
ensure_parent: Some(ensure_parent),
};
let items = Items::new(rows);
self.api_client
.post_with_query::<::serde_json::Value, _, EnsureParentQuery>(
Expand Down
5 changes: 2 additions & 3 deletions cognite/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::{future::Future, time::Duration};
use std::{future::Future, sync::LazyLock, time::Duration};

#[cfg(test)]
use cognite::ClientConfig;
use cognite::CogniteClient;
use once_cell::sync::Lazy;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use tokio::sync::Semaphore;

Expand Down Expand Up @@ -35,7 +34,7 @@ pub fn get_client_for_mocking(api_base_url: &str, project_name: &str) -> Cognite
}

#[allow(dead_code)]
pub static PREFIX: Lazy<String> = Lazy::new(|| {
pub static PREFIX: LazyLock<String> = LazyLock::new(|| {
format!(
"rust-sdk-test-{}",
rand::thread_rng()
Expand Down
149 changes: 149 additions & 0 deletions cognite/tests/raw_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#![cfg(feature = "integration_tests")]

#[cfg(test)]
mod common;

use cognite::raw::*;
pub use common::*;

use futures::{StreamExt, TryStreamExt};
use serde_json::json;

#[tokio::test]
async fn create_delete_raw_rows_and_tables() {
let client = get_client();
let db_name = format!("{}-test-db-1", PREFIX.as_str());
let db = client
.raw
.create_databases(&[Database {
name: db_name.clone(),
}])
.await
.unwrap();
assert_eq!(1, db.len());
let db = db.into_iter().next().unwrap();
assert_eq!(db_name, db.name);

let table = client
.raw
.create_tables(
&db_name,
false,
&[Table {
name: "test-table-1".to_owned(),
}],
)
.await
.unwrap();

assert_eq!(1, table.len());
let table = table.into_iter().next().unwrap();
assert_eq!(table.name, "test-table-1");

client
.raw
.insert_rows(
&db_name,
"test-table-1",
false,
&[RawRowCreate {
key: "key-1".to_owned(),
columns: json!({
"hello": "123",
"world": "321"
}),
}],
)
.await
.unwrap();

client
.raw
.delete_tables(
&db_name,
&[Table {
name: "test-table-1".to_owned(),
}],
)
.await
.unwrap();

client
.raw
.delete_databases(&DeleteDatabasesRequest {
items: vec![Database {
name: db_name.clone(),
}],
recursive: false,
})
.await
.unwrap();
}

#[tokio::test]
async fn retrieve_raw_rows_stream() {
let client = get_client();
let db_name = format!("{}-test-db-2", PREFIX.as_str());
client
.raw
.insert_rows(
&db_name,
"test-table-1",
true,
&(0..1000)
.map(|r| RawRowCreate {
key: format!("key-{r}"),
columns: json!({
"value": r,
}),
})
.collect::<Vec<_>>(),
)
.await
.unwrap();

let streamed: Vec<_> = client
.raw
.retrieve_all_rows_stream(
&db_name,
"test-table-1",
Some(RetrieveRowsQuery {
limit: Some(200),
..Default::default()
}),
)
.take(300)
.try_collect()
.await
.unwrap();

assert_eq!(300, streamed.len());

let streamed_partitioned: Vec<_> = client
.raw
.retrieve_all_rows_partitioned_stream(
&db_name,
"test-table-1",
RetrieveAllPartitionedQuery {
limit: Some(200),
number_of_cursors: Some(3),
..Default::default()
},
)
.try_collect()
.await
.unwrap();

assert_eq!(1000, streamed_partitioned.len());

client
.raw
.delete_databases(&DeleteDatabasesRequest {
items: vec![Database {
name: db_name.clone(),
}],
recursive: true,
})
.await
.unwrap();
}

0 comments on commit 9d9ec67

Please sign in to comment.