Skip to content

Commit

Permalink
consolidate cursor manager code
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Apr 23, 2024
1 parent 523003b commit 49c3081
Show file tree
Hide file tree
Showing 19 changed files with 56 additions and 320 deletions.
5 changes: 2 additions & 3 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ resolver = "2"

[workspace.dependencies]
chrono = { version = "0.4", default-features = false, features = ["serde", "std"] }
dashmap = "5.0"
rust_decimal = { version = "1", default-features = false, features = ["tokio-pg"] }
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "mysql" }
tracing = "0.1"
pgwire = "0.19"
2 changes: 1 addition & 1 deletion nexus/flow-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
serde_json = "1.0"
anyhow = "1.0"
tracing = "0.1"
tracing.workspace = true
tonic-health = "0.11"
pt = { path = "../pt" }
catalog = { path = "../catalog" }
2 changes: 1 addition & 1 deletion nexus/parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ pt = { path = "../pt" }
rand = "0.8"
sqlparser.workspace = true
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing.workspace = true
3 changes: 1 addition & 2 deletions nexus/peer-bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ edition = "2021"
anyhow = "1.0"
async-trait = "0.1"
chrono.workspace = true
dashmap = "5.0"
futures = { version = "0.3.28", features = ["executor"] }
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
Expand All @@ -20,7 +19,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_bytes = "0.11"
sqlparser.workspace = true
tracing = "0.1"
tracing.workspace = true
tokio = { version = "1.0", features = ["full"] }
gcp-bigquery-client = "0.18"
uuid = { version = "1.0", features = ["serde", "v4"] }
Expand Down
127 changes: 0 additions & 127 deletions nexus/peer-bigquery/src/cursor.rs

This file was deleted.

13 changes: 5 additions & 8 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use std::time::Duration;

use anyhow::Context;
use cursor::BigQueryCursorManager;
use gcp_bigquery_client::{
model::{query_request::QueryRequest, query_response::ResultSet},
Client,
};
use peer_connections::PeerConnectionTracker;
use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema};
use peer_cursor::{CursorManager, CursorModification, QueryExecutor, QueryOutput, Schema};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pt::peerdb_peers::BigqueryConfig;
use sqlparser::ast::{CloseCursor, Expr, FetchDirection, Statement, Value};
use stream::{BqRecordStream, BqSchema};

mod ast;
mod cursor;
mod stream;

pub struct BigQueryQueryExecutor {
Expand All @@ -23,7 +21,7 @@ pub struct BigQueryQueryExecutor {
dataset_id: String,
peer_connections: PeerConnectionTracker,
client: Box<Client>,
cursor_manager: BigQueryCursorManager,
cursor_manager: CursorManager,
}

pub async fn bq_client_from_config(config: &BigqueryConfig) -> anyhow::Result<Client> {
Expand Down Expand Up @@ -53,15 +51,14 @@ impl BigQueryQueryExecutor {
peer_connections: PeerConnectionTracker,
) -> anyhow::Result<Self> {
let client = bq_client_from_config(config).await?;
let client = Box::new(client);
let cursor_manager = BigQueryCursorManager::new();

Ok(Self {
peer_name,
project_id: config.project_id.clone(),
dataset_id: config.dataset_id.clone(),
peer_connections,
client,
cursor_manager,
client: Box::new(client),
cursor_manager: Default::default(),
})
}

Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-connections/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ tokio-postgres = { version = "0.7.6", features = [
"with-serde_json-1",
"with-uuid-1",
] }
tracing = "0.1"
tracing.workspace = true
uuid = { version = "1.0" }
2 changes: 2 additions & 0 deletions nexus/peer-cursor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ edition = "2021"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
dashmap.workspace = true
futures = "0.3"
pgwire.workspace = true
sqlparser.workspace = true
tokio = { version = "1.0", features = ["full"] }
tracing.workspace = true
value = { path = "../value" }
9 changes: 9 additions & 0 deletions nexus/peer-cursor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use sqlparser::ast::Statement;
use value::Value;

pub mod util;
mod manager;

pub use manager::CursorManager;

pub type Schema = Arc<Vec<FieldInfo>>;

Expand Down Expand Up @@ -46,3 +49,9 @@ pub trait QueryExecutor: Send + Sync {
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput>;
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>>;
}

pub struct Cursor {
position: usize,
stream: SendableStream,
schema: Schema,
}
Loading

0 comments on commit 49c3081

Please sign in to comment.