Skip to content

Commit

Permalink
RUST-1454 Add a PooledConnection type (#1241)
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelatkinson authored Nov 13, 2024
1 parent 80bab06 commit 6b5089f
Show file tree
Hide file tree
Showing 11 changed files with 476 additions and 363 deletions.
16 changes: 8 additions & 8 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use crate::{
},
cmap::{
conn::{
pooled::PooledConnection,
wire::{next_request_id, Message},
PinnedConnectionHandle,
},
Connection,
ConnectionPool,
RawCommandResponse,
},
Expand Down Expand Up @@ -193,7 +193,7 @@ impl Client {
pub(crate) fn pin_connection_for_cursor(
&self,
spec: &CursorSpecification,
conn: &mut Connection,
conn: &mut PooledConnection,
) -> Result<Option<PinnedConnectionHandle>> {
if self.is_load_balanced() && spec.info.id != 0 {
Ok(Some(conn.pin()?))
Expand All @@ -205,7 +205,7 @@ impl Client {
fn pin_connection_for_session(
&self,
spec: &CursorSpecification,
conn: &mut Connection,
conn: &mut PooledConnection,
session: &mut ClientSession,
) -> Result<Option<PinnedConnectionHandle>> {
if let Some(handle) = session.transaction.pinned_connection() {
Expand Down Expand Up @@ -489,7 +489,7 @@ impl Client {
async fn execute_operation_on_connection<T: Operation>(
&self,
op: &mut T,
connection: &mut Connection,
connection: &mut PooledConnection,
session: &mut Option<&mut ClientSession>,
txn_number: Option<i64>,
retryability: Retryability,
Expand Down Expand Up @@ -904,7 +904,7 @@ impl Client {
/// Returns the retryability level for the execution of this operation on this connection.
fn get_retryability<T: Operation>(
&self,
conn: &Connection,
conn: &PooledConnection,
op: &T,
session: &Option<&mut ClientSession>,
) -> Result<Retryability> {
Expand Down Expand Up @@ -945,7 +945,7 @@ async fn get_connection<T: Operation>(
session: &Option<&mut ClientSession>,
op: &T,
pool: &ConnectionPool,
) -> Result<Connection> {
) -> Result<PooledConnection> {
let session_pinned = session
.as_ref()
.and_then(|s| s.transaction.pinned_connection());
Expand Down Expand Up @@ -995,7 +995,7 @@ impl Error {
/// ClientSession should be unpinned.
fn add_labels_and_update_pin(
&mut self,
conn: Option<&Connection>,
conn: Option<&PooledConnection>,
session: &mut Option<&mut ClientSession>,
retryability: Option<Retryability>,
) -> Result<()> {
Expand Down Expand Up @@ -1060,7 +1060,7 @@ impl Error {

struct ExecutionDetails<T: Operation> {
output: T::O,
connection: Connection,
connection: PooledConnection,
implicit_session: Option<ClientSession>,
}

Expand Down
13 changes: 7 additions & 6 deletions src/cmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ use std::time::Instant;
use derive_where::derive_where;

pub use self::conn::ConnectionInfo;
pub(crate) use self::{
conn::{Command, Connection, RawCommandResponse, StreamDescription},
status::PoolGenerationSubscriber,
worker::PoolGeneration,
};
use self::{
conn::pooled::PooledConnection,
connection_requester::ConnectionRequestResult,
establish::ConnectionEstablisher,
options::ConnectionPoolOptions,
};
pub(crate) use self::{
conn::{Command, Connection, RawCommandResponse, StreamDescription},
status::PoolGenerationSubscriber,
worker::PoolGeneration,
};
use crate::{
bson::oid::ObjectId,
error::{Error, Result},
Expand Down Expand Up @@ -120,7 +121,7 @@ impl ConnectionPool {
/// Checks out a connection from the pool. This method will yield until this thread is at the
/// front of the wait queue, and then will block again if no available connections are in the
/// pool and the total number of connections is not less than the max pool size.
pub(crate) async fn check_out(&self) -> Result<Connection> {
pub(crate) async fn check_out(&self) -> Result<PooledConnection> {
let time_started = Instant::now();
self.event_emitter.emit_event(|| {
ConnectionCheckoutStartedEvent {
Expand Down
Loading

0 comments on commit 6b5089f

Please sign in to comment.