Skip to content

Commit

Permalink
feat: upgrade pgwire to 0.19 (#3157)
Browse files Browse the repository at this point in the history
* feat: upgrade pgwire to 0.19

* fix: update pgwire to 0.19.1
  • Loading branch information
sunng87 authored Jan 15, 2024
1 parent 6f07d69 commit 1294d6f
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 18 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ openmetrics-parser = "0.4"
opensrv-mysql = { git = "https://github.com/MichaelScofield/opensrv.git", rev = "1676c1d" }
opentelemetry-proto.workspace = true
parking_lot = "0.12"
pgwire = "0.18"
pgwire = "0.19.1"
pin-project = "1.0"
postgres-types = { version = "0.2", features = ["with-chrono-0_4"] }
pprof = { version = "0.13", features = [
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/postgres/auth_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl StartupHandler for PostgresServerHandler {
let login_info = LoginInfo::from_client_info(client);

// do authenticate
let auth_result = self.login_verifier.auth(&login_info, pwd.password()).await;
let auth_result = self.login_verifier.auth(&login_info, &pwd.password).await;

if let Ok(Some(user_info)) = auth_result {
self.session.set_user_info(user_info);
Expand Down
17 changes: 7 additions & 10 deletions src/servers/src/postgres/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ fn output_to_query_response<'a>(
field_format: &Format,
) -> PgWireResult<Response<'a>> {
match output {
Ok(Output::AffectedRows(rows)) => Ok(Response::Execution(Tag::new_for_execution(
"OK",
Some(rows),
))),
Ok(Output::AffectedRows(rows)) => Ok(Response::Execution(Tag::new("OK").with_rows(rows))),
Ok(Output::Stream(record_stream)) => {
let schema = record_stream.schema();
recordbatches_to_query_response(record_stream, schema, field_format)
Expand Down Expand Up @@ -212,7 +209,7 @@ impl ExtendedQueryHandler for PostgresServerHandler {
.with_label_values(&[crate::metrics::METRIC_POSTGRES_EXTENDED_QUERY, db.as_str()])
.start_timer();

let sql_plan = portal.statement().statement();
let sql_plan = &portal.statement.statement;

let output = if let Some(plan) = &sql_plan.plan {
let plan = plan
Expand All @@ -231,7 +228,7 @@ impl ExtendedQueryHandler for PostgresServerHandler {
self.query_handler.do_query(&sql, query_ctx).await.remove(0)
};

output_to_query_response(output, portal.result_column_format())
output_to_query_response(output, &portal.result_column_format)
}

async fn do_describe<C>(
Expand All @@ -244,7 +241,7 @@ impl ExtendedQueryHandler for PostgresServerHandler {
{
let (param_types, sql_plan, format) = match target {
StatementOrPortal::Statement(stmt) => {
let sql_plan = stmt.statement();
let sql_plan = &stmt.statement;
if let Some(plan) = &sql_plan.plan {
let param_types = plan
.get_param_types()
Expand All @@ -255,14 +252,14 @@ impl ExtendedQueryHandler for PostgresServerHandler {

(Some(types), sql_plan, &Format::UnifiedBinary)
} else {
let param_types = Some(stmt.parameter_types().clone());
let param_types = Some(stmt.parameter_types.clone());
(param_types, sql_plan, &Format::UnifiedBinary)
}
}
StatementOrPortal::Portal(portal) => (
None,
portal.statement().statement(),
portal.result_column_format(),
&portal.statement.statement,
&portal.result_column_format,
),
};

Expand Down
6 changes: 3 additions & 3 deletions src/servers/src/postgres/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
// the index is managed from portal's parameters count so it's safe to
// unwrap here.
let param_type = portal.statement().parameter_types().get(idx).unwrap();
let param_type = portal.statement.parameter_types.get(idx).unwrap();
match param_type {
&Type::VARCHAR | &Type::TEXT => Ok(format!(
"'{}'",
Expand Down Expand Up @@ -218,7 +218,7 @@ pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWir

pub(super) fn invalid_parameter_error(msg: &str, detail: Option<&str>) -> PgWireError {
let mut error_info = ErrorInfo::new("ERROR".to_owned(), "22023".to_owned(), msg.to_owned());
error_info.set_detail(detail.map(|s| s.to_owned()));
error_info.detail = detail.map(|s| s.to_owned());
PgWireError::UserError(Box::new(error_info))
}

Expand Down Expand Up @@ -246,7 +246,7 @@ pub(super) fn parameters_to_scalar_values(
let param_count = portal.parameter_len();
let mut results = Vec::with_capacity(param_count);

let client_param_types = portal.statement().parameter_types();
let client_param_types = &portal.statement.parameter_types;
let param_types = plan
.get_param_types()
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
Expand Down

0 comments on commit 1294d6f

Please sign in to comment.