Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) feat: add support for processing handshake packets async via vacation #99

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
further updates to match new vacation api
  • Loading branch information
jlizen committed Dec 30, 2024
commit 70d80d274f1a82665ef720a70c4d92e03ce7fbae
6 changes: 3 additions & 3 deletions src/common/async_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use super::{Stream, TlsState};
/// Full result of sync closure
type SessionResult<S> = Result<S, (Option<S>, io::Error)>;
/// Executor result wrapping sync closure result
type SyncExecutorResult<S> = Result<SessionResult<S>, vacation::Error>;
type ExecutorResult<S> = Result<SessionResult<S>, vacation::Error>;
/// Future wrapping waiting on executor
type SessionFuture<S> = Box<dyn Future<Output = SyncExecutorResult<S>> + Unpin + Send>;
type SessionFuture<S> = Box<dyn Future<Output = ExecutorResult<S>> + Unpin + Send>;

pin_project! {
/// Session is off doing compute-heavy sync work, such as initializing the session or processing handshake packets.
Expand Down Expand Up @@ -55,7 +55,7 @@ where

// TODO: if we ever start also delegating non-handshake byte processing, make this chance of blocking
// variable and set by caller
let future = vacation::execute_sync(closure, vacation::ChanceOfBlocking::High);
let future = vacation::execute(closure, vacation::ChanceOfBlocking::High);

Self {
future: Box::new(Box::pin(future)),
Expand Down
33 changes: 4 additions & 29 deletions tests/async_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,24 @@

use std::io::{self, ErrorKind, Read, Write};
use std::net::{SocketAddr, TcpListener};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread;

use futures_util::{future::Future, ready};
use rustls::pki_types::ServerName;
use rustls::{self, ClientConfig, ServerConnection, Stream};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_rustls::{client::TlsStream, TlsConnector};
use vacation::{global_sync_strategy_builder, CustomExecutorSyncClosure};

struct Read1<T>(T);

impl<T: AsyncRead + Unpin> Future for Read1<T> {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut buf = [0];
let mut buf = ReadBuf::new(&mut buf);

ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf))?;

if buf.filled().is_empty() {
Poll::Ready(Ok(()))
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
use vacation::{init, CustomClosure};

/// returns rx to listen on to confirm layer was hit
fn async_session_executor() -> (tokio::sync::mpsc::Receiver<()>, Arc<AtomicBool>) {
let (result_tx, result_rx) = tokio::sync::mpsc::channel(10);
let fail = Arc::new(AtomicBool::new(false));

let fail_cloned = fail.clone();
let closure: CustomExecutorSyncClosure = Box::new(move |f| {
let closure: CustomClosure = Box::new(move |f| {
let tx = result_tx.clone();
let fail = fail_cloned.clone();

Expand All @@ -61,9 +38,7 @@ fn async_session_executor() -> (tokio::sync::mpsc::Receiver<()>, Arc<AtomicBool>
})
});

global_sync_strategy_builder()
.initialize_custom_executor(closure)
.unwrap();
init().custom_executor(closure).install().unwrap();

(result_rx, fail)
}
Expand Down
Loading