Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extracts core WebSocket upgrade logic for integration into other libraries #45

Merged
merged 16 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions ratchet_core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@

use crate::errors::Error;
use crate::ext::NoExtProvider;
use crate::handshake::{ProtocolRegistry, UpgradedServer};
use crate::handshake::{SubprotocolRegistry, UpgradedServer};
use crate::{subscribe_with, TryIntoRequest, UpgradedClient, WebSocketConfig, WebSocketStream};
use ratchet_ext::ExtensionProvider;
use std::borrow::Cow;

/// A builder to construct WebSocket clients.
///
Expand All @@ -28,15 +27,15 @@ use std::borrow::Cow;
pub struct WebSocketClientBuilder<E> {
config: Option<WebSocketConfig>,
extension: E,
subprotocols: ProtocolRegistry,
subprotocols: SubprotocolRegistry,
}

impl Default for WebSocketClientBuilder<NoExtProvider> {
fn default() -> Self {
WebSocketClientBuilder {
config: None,
extension: NoExtProvider,
subprotocols: ProtocolRegistry::default(),
subprotocols: SubprotocolRegistry::default(),
}
}
}
Expand Down Expand Up @@ -95,9 +94,9 @@ impl<E> WebSocketClientBuilder<E> {
pub fn subprotocols<I>(mut self, subprotocols: I) -> Result<Self, Error>
where
I: IntoIterator,
I::Item: Into<Cow<'static, str>>,
I::Item: Into<String>,
{
self.subprotocols = ProtocolRegistry::new(subprotocols)?;
self.subprotocols = SubprotocolRegistry::new(subprotocols)?;
Ok(self)
}
}
Expand All @@ -110,7 +109,7 @@ impl<E> WebSocketClientBuilder<E> {
#[derive(Debug)]
pub struct WebSocketServerBuilder<E> {
config: Option<WebSocketConfig>,
subprotocols: ProtocolRegistry,
subprotocols: SubprotocolRegistry,
extension: E,
}

Expand All @@ -119,7 +118,7 @@ impl Default for WebSocketServerBuilder<NoExtProvider> {
WebSocketServerBuilder {
config: None,
extension: NoExtProvider,
subprotocols: ProtocolRegistry::default(),
subprotocols: SubprotocolRegistry::default(),
}
}
}
Expand Down Expand Up @@ -168,9 +167,9 @@ impl<E> WebSocketServerBuilder<E> {
pub fn subprotocols<I>(mut self, subprotocols: I) -> Result<Self, Error>
where
I: IntoIterator,
I::Item: Into<Cow<'static, str>>,
I::Item: Into<String>,
{
self.subprotocols = ProtocolRegistry::new(subprotocols)?;
self.subprotocols = SubprotocolRegistry::new(subprotocols)?;
Ok(self)
}
}
18 changes: 10 additions & 8 deletions ratchet_core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::protocol::{CloseCodeParseErr, OpCodeParseErr};
use http::header::{HeaderName, InvalidHeaderValue};
use http::status::InvalidStatusCode;
use http::uri::InvalidUri;
use http::StatusCode;
use std::any::Any;
use std::error::Error as StdError;
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -156,9 +155,12 @@ pub enum HttpError {
#[error("Redirected: `{0}`")]
Redirected(String),
/// The peer returned with a status code other than 101.
#[error("Status code: `{0}`")]
Status(StatusCode),
/// An invalid HTTP version was received in a request.
#[error("Status code: `{0:?}`")]
Status(u16),
/// A request or response was missing its status code.
#[error("Missing status code")]
MissingStatus,
/// An invalid HTTP version was received in a request or response.
#[error("Invalid HTTP version: `{0:?}`")]
HttpVersion(String),
/// A request or response was missing an expected header.
Expand Down Expand Up @@ -272,14 +274,11 @@ pub enum CloseCause {
}

/// WebSocket protocol errors.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Error)]
#[derive(Clone, Debug, Eq, PartialEq, Error)]
pub enum ProtocolError {
/// Invalid encoding was received.
#[error("Not valid UTF-8 encoding")]
Encoding,
/// A peer selected a protocol that was not sent.
#[error("Received an unknown subprotocol")]
UnknownProtocol,
/// An invalid OpCode was received.
#[error("Bad OpCode: `{0}`")]
OpCode(OpCodeParseErr),
Expand Down Expand Up @@ -313,6 +312,9 @@ pub enum ProtocolError {
/// An invalid control frame was received.
#[error("Received an invalid control frame")]
InvalidControlFrame,
/// Failed to build subprotocol header.
#[error("Invalid subprotocol header: `{0}`")]
InvalidSubprotocolHeader(String),
}

impl From<FromUtf8Error> for Error {
Expand Down
5 changes: 2 additions & 3 deletions ratchet_core/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use crate::Error;
use bytes::BytesMut;
use http::{HeaderMap, HeaderValue};
use httparse::Header;
use ratchet_ext::{
Extension, ExtensionDecoder, ExtensionEncoder, ExtensionProvider, FrameHeader,
ReunitableExtension, RsvBits, SplittableExtension,
Expand Down Expand Up @@ -61,14 +60,14 @@ impl ExtensionProvider for NoExtProvider {

fn negotiate_client(
&self,
_headers: &[Header],
_headers: &HeaderMap,
) -> Result<Option<Self::Extension>, Self::Error> {
Ok(None)
}

fn negotiate_server(
&self,
_headers: &[Header],
_headers: &HeaderMap,
) -> Result<Option<(Self::Extension, HeaderValue)>, Self::Error> {
Ok(None)
}
Expand Down
8 changes: 3 additions & 5 deletions ratchet_core/src/handshake/client/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use ratchet_ext::ExtensionProvider;

use crate::errors::{Error, ErrorKind, HttpError};
use crate::handshake::client::Nonce;
use crate::handshake::{
apply_to, ProtocolRegistry, UPGRADE_STR, WEBSOCKET_STR, WEBSOCKET_VERSION_STR,
};
use crate::handshake::{SubprotocolRegistry, UPGRADE_STR, WEBSOCKET_STR, WEBSOCKET_VERSION_STR};

use base64::engine::general_purpose::STANDARD;
use log::error;
Expand Down Expand Up @@ -83,7 +81,7 @@ pub struct ValidatedRequest {
pub fn build_request<E>(
request: Request<()>,
extension: &E,
subprotocols: &ProtocolRegistry,
subprotocols: &SubprotocolRegistry,
) -> Result<ValidatedRequest, Error>
where
E: ExtensionProvider,
Expand Down Expand Up @@ -176,7 +174,7 @@ where
));
}

apply_to(subprotocols, &mut headers);
subprotocols.apply_to(&mut headers);

if headers.get(SEC_WEBSOCKET_KEY).is_some() {
error!("{} should not be set", SEC_WEBSOCKET_KEY);
Expand Down
98 changes: 54 additions & 44 deletions ratchet_core/src/handshake/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ use crate::errors::{Error, ErrorKind, HttpError};
use crate::handshake::client::encoding::{build_request, encode_request};
use crate::handshake::io::BufferedIo;
use crate::handshake::{
negotiate_response, validate_header, validate_header_value, ParseResult, ProtocolRegistry,
StreamingParser, ACCEPT_KEY, BAD_STATUS_CODE, UPGRADE_STR, WEBSOCKET_STR,
validate_header, validate_header_value, ParseResult, StreamingParser, SubprotocolRegistry,
TryFromWrapper, ACCEPT_KEY, BAD_STATUS_CODE, UPGRADE_STR, WEBSOCKET_STR,
};
use crate::{
NoExt, NoExtProvider, Role, TryIntoRequest, WebSocket, WebSocketConfig, WebSocketStream,
};
use http::header::LOCATION;
use log::warn;
use ratchet_ext::ExtensionProvider;
use tokio_util::codec::Decoder;

Expand Down Expand Up @@ -80,7 +82,7 @@ where
&mut stream,
request.try_into_request()?,
NoExtProvider,
ProtocolRegistry::default(),
SubprotocolRegistry::default(),
&mut read_buffer,
)
.await?;
Expand All @@ -98,7 +100,7 @@ pub async fn subscribe_with<S, E, R>(
mut stream: S,
request: R,
extension: E,
subprotocols: ProtocolRegistry,
subprotocols: SubprotocolRegistry,
) -> Result<UpgradedClient<S, E::Extension>, Error>
where
S: WebSocketStream,
Expand Down Expand Up @@ -128,7 +130,7 @@ async fn exec_client_handshake<S, E>(
stream: &mut S,
request: Request<()>,
extension: E,
subprotocols: ProtocolRegistry,
subprotocols: SubprotocolRegistry,
buf: &mut BytesMut,
) -> Result<HandshakeResult<E::Extension>, Error>
where
Expand Down Expand Up @@ -163,14 +165,14 @@ where
struct ClientHandshake<'s, S, E> {
buffered: BufferedIo<'s, S>,
nonce: Nonce,
subprotocols: ProtocolRegistry,
subprotocols: SubprotocolRegistry,
extension: &'s E,
}

pub struct StreamingResponseParser<'b, E> {
nonce: &'b Nonce,
extension: &'b E,
subprotocols: &'b mut ProtocolRegistry,
subprotocols: &'b mut SubprotocolRegistry,
}

impl<'b, E> Decoder for StreamingResponseParser<'b, E>
Expand All @@ -188,11 +190,11 @@ where
} = self;

let mut headers = [httparse::EMPTY_HEADER; 32];
let mut response = Response::new(&mut headers);
let response = Response::new(&mut headers);

match try_parse_response(buf, &mut response, nonce, extension, subprotocols)? {
match try_parse_response(buf, response, nonce, extension, subprotocols)? {
ParseResult::Complete(result, count) => Ok(Some((result, count))),
ParseResult::Partial => {
ParseResult::Partial(response) => {
check_partial_response(&response)?;
Ok(None)
}
Expand All @@ -207,7 +209,7 @@ where
{
pub fn new(
socket: &'s mut S,
subprotocols: ProtocolRegistry,
subprotocols: SubprotocolRegistry,
extension: &'s E,
buf: &'s mut BytesMut,
) -> ClientHandshake<'s, S, E> {
Expand Down Expand Up @@ -303,84 +305,92 @@ fn check_partial_response(response: &Response) -> Result<(), Error> {
Ok(())
}
Some(code) => match StatusCode::try_from(code) {
Ok(code) => Err(Error::with_cause(ErrorKind::Http, HttpError::Status(code))),
Ok(code) => Err(Error::with_cause(
ErrorKind::Http,
HttpError::Status(code.as_u16()),
)),
Err(_) => Err(Error::with_cause(ErrorKind::Http, BAD_STATUS_CODE)),
},
None => Ok(()),
}
}

fn try_parse_response<'l, E>(
buffer: &'l [u8],
response: &mut Response<'_, 'l>,
fn try_parse_response<'b, E>(
buffer: &'b [u8],
mut response: Response<'b, 'b>,
expected_nonce: &Nonce,
extension: E,
subprotocols: &mut ProtocolRegistry,
) -> Result<ParseResult<HandshakeResult<E::Extension>>, Error>
subprotocols: &mut SubprotocolRegistry,
) -> Result<ParseResult<Response<'b, 'b>, HandshakeResult<E::Extension>>, Error>
where
E: ExtensionProvider,
{
match response.parse(buffer) {
Ok(Status::Complete(count)) => {
parse_response(response, expected_nonce, extension, subprotocols)
.map(|r| ParseResult::Complete(r, count))
}
Ok(Status::Partial) => Ok(ParseResult::Partial),
Ok(Status::Complete(count)) => parse_response(
TryFromWrapper(response).try_into()?,
expected_nonce,
extension,
subprotocols,
)
.map(|r| ParseResult::Complete(r, count)),
Ok(Status::Partial) => Ok(ParseResult::Partial(response)),
Err(e) => Err(e.into()),
}
}

fn parse_response<E>(
response: &Response,
response: http::Response<()>,
expected_nonce: &Nonce,
extension: E,
subprotocols: &mut ProtocolRegistry,
subprotocols: &SubprotocolRegistry,
) -> Result<HandshakeResult<E::Extension>, Error>
where
E: ExtensionProvider,
{
match response.version {
if response.version() < Version::HTTP_11 {
// rfc6455 § 4.2.1.1: must be HTTP/1.1 or higher
Some(1) => {}
_ => {
return Err(Error::with_cause(
ErrorKind::Http,
HttpError::HttpVersion(format!("{:?}", Version::HTTP_10)),
))
}
return Err(Error::with_cause(
ErrorKind::Http,
HttpError::HttpVersion(format!("{:?}", Version::HTTP_10)),
));
}

let raw_status_code = response.code.ok_or_else(|| Error::new(ErrorKind::Http))?;
let status_code = StatusCode::from_u16(raw_status_code)?;
let status_code = response.status();
match status_code {
c if c == StatusCode::SWITCHING_PROTOCOLS => {}
c if c.is_redirection() => {
return match response.headers.iter().find(|h| h.name == header::LOCATION) {
Some(header) => {
return match response.headers().get(LOCATION) {
Some(value) => {
// the value _should_ be valid UTF-8
let location = String::from_utf8(header.value.to_vec())
let location = String::from_utf8(value.as_bytes().to_vec())
.map_err(|_| Error::new(ErrorKind::Http))?;
Err(Error::with_cause(
ErrorKind::Http,
HttpError::Redirected(location),
))
}
None => Err(Error::with_cause(ErrorKind::Http, HttpError::Status(c))),
None => {
warn!("Received a redirection status code with no location header");
Err(Error::with_cause(
ErrorKind::Http,
HttpError::Status(c.as_u16()),
))
}
};
}
status_code => {
return Err(Error::with_cause(
ErrorKind::Http,
HttpError::Status(status_code),
HttpError::Status(status_code.as_u16()),
))
}
}

validate_header_value(response.headers, header::UPGRADE, WEBSOCKET_STR)?;
validate_header_value(response.headers, header::CONNECTION, UPGRADE_STR)?;
validate_header_value(response.headers(), header::UPGRADE, WEBSOCKET_STR)?;
validate_header_value(response.headers(), header::CONNECTION, UPGRADE_STR)?;

validate_header(
response.headers,
response.headers(),
header::SEC_WEBSOCKET_ACCEPT,
|_name, actual| {
let mut digest = Sha1::new();
Expand All @@ -397,9 +407,9 @@ where
)?;

Ok(HandshakeResult {
subprotocol: negotiate_response(subprotocols, response)?,
subprotocol: subprotocols.validate_accepted_subprotocol(response.headers())?,
extension: extension
.negotiate_client(response.headers)
.negotiate_client(response.headers())
.map_err(|e| Error::with_cause(ErrorKind::Extension, e))?,
})
}
Loading
Loading