Skip to content

Commit

Permalink
pgwire 0.23 (#1900)
Browse files Browse the repository at this point in the history
`cargo outdated` seems to have issues checking workspace dependencies
  • Loading branch information
serprex authored Jul 3, 2024
1 parent e438e30 commit 84c4573
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
4 changes: 2 additions & 2 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ rust_decimal = { version = "1", default-features = false, features = [
] }
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "main" }
tracing = "0.1"
pgwire = { version = "0.22", default-features = false, features = [
pgwire = { version = "0.23", default-features = false, features = [
"scram",
"server-api-ring",
] }
47 changes: 37 additions & 10 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ use peerdb_parser::{NexusParsedStatement, NexusQueryParser, NexusStatement};
use pgwire::{
api::{
auth::{
scram::{gen_salted_password, MakeSASLScramAuthStartupHandler},
scram::{gen_salted_password, SASLScramAuthStartupHandler},
AuthSource, LoginInfo, Password, ServerParameterProvider,
},
copy::NoopCopyHandler,
portal::Portal,
query::{ExtendedQueryHandler, SimpleQueryHandler},
results::{
DescribePortalResponse, DescribeResponse, DescribeStatementResponse, Response, Tag,
},
stmt::StoredStatement,
ClientInfo, MakeHandler, Type,
ClientInfo, PgWireHandlerFactory, Type,
},
error::{ErrorInfo, PgWireError, PgWireResult},
tokio::process_socket,
Expand All @@ -49,7 +50,7 @@ use tracing_subscriber::{fmt, prelude::*, EnvFilter};

mod cursor;

struct FixedPasswordAuthSource {
pub struct FixedPasswordAuthSource {
password: String,
}

Expand Down Expand Up @@ -1141,17 +1142,45 @@ async fn run_migrations<'a>(config: &CatalogConfig<'a>) -> anyhow::Result<()> {
Err(anyhow::anyhow!("Failed to connect to catalog"))
}

pub struct Handlers {
authenticator: Arc<SASLScramAuthStartupHandler<FixedPasswordAuthSource, NexusServerParameterProvider>>,
nexus: Arc<NexusBackend>,
}

impl PgWireHandlerFactory for Handlers {
type StartupHandler = SASLScramAuthStartupHandler<FixedPasswordAuthSource, NexusServerParameterProvider>;
type SimpleQueryHandler = NexusBackend;
type ExtendedQueryHandler = NexusBackend;
type CopyHandler = NoopCopyHandler;

fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
self.nexus.clone()
}

fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
self.nexus.clone()
}

fn startup_handler(&self) -> Arc<Self::StartupHandler> {
self.authenticator.clone()
}

fn copy_handler(&self) -> Arc<Self::CopyHandler> {
Arc::new(NoopCopyHandler)
}
}

#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();

let args = Args::parse();
let _guard = setup_tracing(args.log_dir.as_ref().map(|s| &s[..]));

let authenticator = MakeSASLScramAuthStartupHandler::new(
let authenticator = Arc::new(SASLScramAuthStartupHandler::new(
Arc::new(FixedPasswordAuthSource::new(args.peerdb_password.clone())),
Arc::new(NexusServerParameterProvider),
);
));
let catalog_config = get_catalog_config(&args);

run_migrations(&catalog_config).await?;
Expand Down Expand Up @@ -1184,7 +1213,7 @@ pub async fn main() -> anyhow::Result<()> {
let conn_flow_handler = flow_handler.clone();
let conn_peer_conns = peer_conns.clone();
let peerdb_fdw_mode = args.peerdb_fwd_mode == "true";
let authenticator_ref = authenticator.make();
let authenticator = authenticator.clone();
let pg_config = catalog_config.to_postgres_config();

tokio::task::spawn(async move {
Expand All @@ -1193,7 +1222,7 @@ pub async fn main() -> anyhow::Result<()> {
let conn_uuid = uuid::Uuid::new_v4();
let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns);

let processor = Arc::new(NexusBackend::new(
let nexus = Arc::new(NexusBackend::new(
Arc::new(catalog),
tracker,
conn_flow_handler,
Expand All @@ -1202,9 +1231,7 @@ pub async fn main() -> anyhow::Result<()> {
process_socket(
socket,
None,
authenticator_ref,
processor.clone(),
processor,
Arc::new(Handlers { nexus, authenticator }),
)
.await
}
Expand Down

0 comments on commit 84c4573

Please sign in to comment.