From c0742ffc2aa9ab414f80869ea2d384ff87f21b0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 9 Jan 2024 18:07:01 +0000 Subject: [PATCH] SendableStream: remove mutex, require sync Cursor is mutable so type system already knows it has exclusive access --- nexus/peer-bigquery/src/cursor.rs | 8 +++----- nexus/peer-cursor/src/lib.rs | 2 +- nexus/peer-snowflake/src/cursor.rs | 8 +++----- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/nexus/peer-bigquery/src/cursor.rs b/nexus/peer-bigquery/src/cursor.rs index 23812a382a..6b8e758a5e 100644 --- a/nexus/peer-bigquery/src/cursor.rs +++ b/nexus/peer-bigquery/src/cursor.rs @@ -1,5 +1,4 @@ use dashmap::DashMap; -use tokio::sync::Mutex; use futures::StreamExt; use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream}; @@ -10,7 +9,7 @@ use crate::BigQueryQueryExecutor; pub struct BigQueryCursor { position: usize, - stream: Mutex, + stream: SendableStream, schema: SchemaRef, } @@ -42,7 +41,7 @@ impl BigQueryCursorManager { // Create a new cursor let cursor = BigQueryCursor { position: 0, - stream: Mutex::new(stream), + stream, schema, }; @@ -75,9 +74,8 @@ impl BigQueryCursorManager { let prev_end = cursor.position; let mut cursor_position = cursor.position; { - let mut stream = cursor.stream.lock().await; while cursor_position - prev_end < count { - match stream.next().await { + match cursor.stream.next().await { Some(Ok(record)) => { records.push(record); cursor_position += 1; diff --git a/nexus/peer-cursor/src/lib.rs b/nexus/peer-cursor/src/lib.rs index 7d2525a7df..e4029ab003 100644 --- a/nexus/peer-cursor/src/lib.rs +++ b/nexus/peer-cursor/src/lib.rs @@ -23,7 +23,7 @@ pub trait RecordStream: Stream> { fn schema(&self) -> SchemaRef; } -pub type SendableStream = Pin>; +pub type SendableStream = Pin>; pub struct Records { pub records: Vec, diff --git a/nexus/peer-snowflake/src/cursor.rs b/nexus/peer-snowflake/src/cursor.rs index 475a2d7f35..ef247d8243 100644 --- a/nexus/peer-snowflake/src/cursor.rs +++ b/nexus/peer-snowflake/src/cursor.rs @@ -4,11 +4,10 @@ use futures::StreamExt; use peer_cursor::{QueryExecutor, QueryOutput, Records, SchemaRef, SendableStream}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use sqlparser::ast::Statement; -use tokio::sync::Mutex; pub struct SnowflakeCursor { position: usize, - stream: Mutex, + stream: SendableStream, schema: SchemaRef, } @@ -39,7 +38,7 @@ impl SnowflakeCursorManager { // Create a new cursor let cursor = SnowflakeCursor { position: 0, - stream: Mutex::new(stream), + stream, schema, }; @@ -72,9 +71,8 @@ impl SnowflakeCursorManager { let prev_end = cursor.position; let mut cursor_position = cursor.position; { - let mut stream = cursor.stream.lock().await; while cursor_position - prev_end < count { - match stream.next().await { + match cursor.stream.next().await { Some(Ok(record)) => { records.push(record); cursor_position += 1;