Skip to content

Commit

Permalink
change methods to take &self rather than &mut self
Browse files Browse the repository at this point in the history
  • Loading branch information
tamaroning committed Jun 6, 2024
1 parent 9ab3d80 commit 95d7f42
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 20 deletions.
35 changes: 22 additions & 13 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use http::{
HeaderMap,
};
use reqwest::Client;
use tokio::sync::Mutex;
use tokio::time::sleep;

use crate::row::SnowflakeColumnType;
Expand All @@ -19,11 +20,11 @@ const DEFAULT_TIMEOUT_SECONDS: u64 = 300;
pub struct QueryExecutor {
http: Client,
qrmk: String,
chunks: Vec<RawQueryResponseChunk>,
chunks: Mutex<Vec<RawQueryResponseChunk>>,
chunk_headers: HeaderMap,
column_types: Arc<Vec<SnowflakeColumnType>>,
column_indices: Arc<HashMap<String, usize>>,
row_set: Option<Vec<Vec<Option<String>>>>,
row_set: Mutex<Option<Vec<Vec<Option<String>>>>>,
}

impl QueryExecutor {
Expand Down Expand Up @@ -95,12 +96,14 @@ impl QueryExecutor {
let http = http.clone();
let qrmk = response.data.qrmk.unwrap_or_default();
let chunks = response.data.chunks.unwrap_or_default();
let chunks = Mutex::new(chunks);
let row_types = response.data.row_types.ok_or_else(|| {
Error::UnsupportedFormat("the response doesn't contain 'rowtype'".to_string())
})?;
let row_set = response.data.row_set.ok_or_else(|| {
Error::UnsupportedFormat("the response doesn't contain 'rowset'".to_string())
})?;
let row_set = Mutex::new(Some(row_set));

let column_indices = row_types
.iter()
Expand Down Expand Up @@ -131,26 +134,30 @@ impl QueryExecutor {
chunk_headers,
column_types,
column_indices,
row_set: Some(row_set),
row_set,
})
}

/// Check if there are no more rows to fetch
pub fn eof(&self) -> bool {
self.row_set.is_none() && self.chunks.is_empty()
pub async fn eof(&self) -> bool {
let row_set = &*self.row_set.lock().await;
let chunks = &*self.chunks.lock().await;
row_set.is_none() && chunks.is_empty()
}

/// Fetch a single chunk
pub async fn fetch_next_chunk(&mut self) -> Result<Option<Vec<SnowflakeRow>>> {
if let Some(row_set) = self.row_set.take() {
pub async fn fetch_next_chunk(&self) -> Result<Option<Vec<SnowflakeRow>>> {
let row_set = &mut *self.row_set.lock().await;
if let Some(row_set) = row_set.take() {
let rows = row_set.into_iter().map(|r| self.convert_row(r)).collect();
return Ok(Some(rows));
}

let http = self.http.clone();
let chunk_headers = self.chunk_headers.clone();
let qrmk = self.qrmk.clone();
let Some(chunk) = self.chunks.pop() else {
let chunks = &mut *self.chunks.lock().await;
let Some(chunk) = chunks.pop() else {
// Nothing to fetch
return Ok(None);
};
Expand All @@ -161,17 +168,19 @@ impl QueryExecutor {
}

/// Fetch all the remaining chunks at once
pub async fn fetch_all(&mut self) -> Result<Vec<SnowflakeRow>> {
pub async fn fetch_all(&self) -> Result<Vec<SnowflakeRow>> {
let mut rows = Vec::new();
if let Some(row_set) = self.row_set.take() {
let row_set = &mut *self.row_set.lock().await;
let chunks = &mut *self.chunks.lock().await;
if let Some(row_set) = row_set.take() {
rows.extend(row_set.into_iter().map(|r| self.convert_row(r)));
} else if self.chunks.is_empty() {
} else if chunks.is_empty() {
// Nothing to fetch
return Ok(vec![]);
}

let mut handles = Vec::with_capacity(self.chunks.len());
while let Some(chunk) = self.chunks.pop() {
let mut handles = Vec::with_capacity(chunks.len());
while let Some(chunk) = chunks.pop() {
let http = self.http.clone();
let chunk_headers = self.chunk_headers.clone();
let qrmk = self.qrmk.clone();
Expand Down
17 changes: 12 additions & 5 deletions src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,19 @@ impl SnowflakeDecode for NaiveDate {
fn try_decode(value: &Option<String>, _: &SnowflakeColumnType) -> Result<Self> {
let value = unwrap(value)?;
let days_since_epoch = value
.parse::<u64>()
.parse::<i64>()
.map_err(|_| Error::Decode(format!("'{value}' is not Date type")))?;
NaiveDate::from_ymd_opt(1970, 1, 1)
.unwrap_or_default()
.checked_add_days(Days::new(days_since_epoch))
.ok_or(Error::Decode(format!("'{value}' is not a valid date")))
let unix_epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap_or_default();
if days_since_epoch >= 0 {
unix_epoch
.checked_add_days(Days::new(days_since_epoch as u64))
.ok_or(Error::Decode(format!("'{value}' is not a valid date")))
} else {
let d = days_since_epoch.abs() as u64;
unix_epoch
.checked_sub_days(Days::new(d))
.ok_or(Error::Decode(format!("'{value}' is not a valid date")))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct SnowflakeSession {

impl SnowflakeSession {
pub async fn query<Q: Into<QueryRequest>>(&self, request: Q) -> Result<Vec<SnowflakeRow>> {
let mut executor = QueryExecutor::create(self, request).await?;
let executor = QueryExecutor::create(self, request).await?;
executor.fetch_all().await
}

Expand Down
2 changes: 1 addition & 1 deletion tests/test-chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn test_query_executor() -> Result<()> {
let query =
"SELECT SEQ8() AS SEQ, RANDSTR(1000, RANDOM()) AS RAND FROM TABLE(GENERATOR(ROWCOUNT=>10000))";

let mut executor = session.execute(query).await?;
let executor = session.execute(query).await?;
let mut rows = Vec::with_capacity(10000);
while let Some(mut r) = executor.fetch_next_chunk().await? {
rows.append(&mut r);
Expand Down

0 comments on commit 95d7f42

Please sign in to comment.