Skip to content

Commit

Permalink
SendableStream: remove mutex, require sync
Browse files Browse the repository at this point in the history
Cursor is mutable so type system already knows it has exclusive access
  • Loading branch information
serprex committed Jan 9, 2024
1 parent 8e613b5 commit c0742ff
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 11 deletions.
8 changes: 3 additions & 5 deletions nexus/peer-bigquery/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use dashmap::DashMap;
use tokio::sync::Mutex;

use futures::StreamExt;
use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream};
Expand All @@ -10,7 +9,7 @@ use crate::BigQueryQueryExecutor;

pub struct BigQueryCursor {
position: usize,
stream: Mutex<SendableStream>,
stream: SendableStream,
schema: SchemaRef,
}

Expand Down Expand Up @@ -42,7 +41,7 @@ impl BigQueryCursorManager {
// Create a new cursor
let cursor = BigQueryCursor {
position: 0,
stream: Mutex::new(stream),
stream,
schema,
};

Expand Down Expand Up @@ -75,9 +74,8 @@ impl BigQueryCursorManager {
let prev_end = cursor.position;
let mut cursor_position = cursor.position;
{
let mut stream = cursor.stream.lock().await;
while cursor_position - prev_end < count {
match stream.next().await {
match cursor.stream.next().await {
Some(Ok(record)) => {
records.push(record);
cursor_position += 1;
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-cursor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait RecordStream: Stream<Item = PgWireResult<Record>> {
fn schema(&self) -> SchemaRef;
}

pub type SendableStream = Pin<Box<dyn RecordStream + Send>>;
pub type SendableStream = Pin<Box<dyn RecordStream + Send + Sync>>;

pub struct Records {
pub records: Vec<Record>,
Expand Down
8 changes: 3 additions & 5 deletions nexus/peer-snowflake/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use futures::StreamExt;
use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use sqlparser::ast::Statement;
use tokio::sync::Mutex;

pub struct SnowflakeCursor {
position: usize,
stream: Mutex<SendableStream>,
stream: SendableStream,
schema: SchemaRef,
}

Expand Down Expand Up @@ -39,7 +38,7 @@ impl SnowflakeCursorManager {
// Create a new cursor
let cursor = SnowflakeCursor {
position: 0,
stream: Mutex::new(stream),
stream,
schema,
};

Expand Down Expand Up @@ -72,9 +71,8 @@ impl SnowflakeCursorManager {
let prev_end = cursor.position;
let mut cursor_position = cursor.position;
{
let mut stream = cursor.stream.lock().await;
while cursor_position - prev_end < count {
match stream.next().await {
match cursor.stream.next().await {
Some(Ok(record)) => {
records.push(record);
cursor_position += 1;
Expand Down

0 comments on commit c0742ff

Please sign in to comment.