Skip to content

Commit

Permalink
ci
Browse files Browse the repository at this point in the history
  • Loading branch information
SirCipher committed May 20, 2024
1 parent 58f2112 commit f1136e9
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 36 deletions.
36 changes: 18 additions & 18 deletions .azure/azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,35 @@ stages:

- template: azure-install-sccache.yml

# - stage: Lint
# dependsOn: setup
# jobs:
# - template: azure-lint.yml
# parameters:
# name: lint_crates
# rust: $(RUST_VERSION)
# displayName: All crates

# - stage: Test
# # dependsOn: Lint
# jobs:
# - template: azure-test-stable.yml
# parameters:
# rust: $(RUST_VERSION)
# displayName: All crates
- stage: Lint
dependsOn: setup
jobs:
- template: azure-lint.yml
parameters:
name: lint_crates
rust: $(RUST_VERSION)
displayName: All crates

- stage: Test
dependsOn: Setup
jobs:
- template: azure-test-stable.yml
parameters:
rust: $(RUST_VERSION)
displayName: All crates


- stage: autobahnclient
displayName: Autobahn Client
# dependsOn: Lint
dependsOn: Setup
jobs:
- template: azure-autobahn-client.yml
parameters:
rust: $(RUST_VERSION)

- stage: autobahnserver
displayName: Autobahn Server
# dependsOn: Lint
dependsOn: Setup
jobs:
- template: azure-autobahn-server.yml
parameters:
Expand Down
6 changes: 3 additions & 3 deletions ratchet_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ rust-version = "1.65"

[features]
default = []
split = ["futures", "futures-util"]
split = ["futures"]
fixture = []

[dependencies]
ratchet_ext = { version = "0.4.2", path = "../ratchet_ext" }
url = { workspace = true }
http = { workspace = true }
tokio = { workspace = true, features = ["rt", "net", "io-util"] }
tokio-util = { workspace = true, features = ["codec"] }
tokio-util = { workspace = true, features = ["codec", "compat"] }
futures = { workspace = true, optional = true }
futures-util = { workspace = true, optional = true }
futures-util = { workspace = true }
derive_more = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true }
Expand Down
17 changes: 16 additions & 1 deletion ratchet_core/src/framed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,17 @@ impl FramedRead {
DecodeResult::Incomplete(count) => {
let len = read_buffer.len();
read_buffer.resize(len + count, 0u8);
io.read_exact(&mut read_buffer[len..]).await?;

let read = io.read(&mut read_buffer[len..]).await?;

if read == 0 {
return Err(Error::with_cause(
ErrorKind::IO,
std::io::Error::from(std::io::ErrorKind::UnexpectedEof),
));
}

read_buffer.truncate(len + read);
}
DecodeResult::Finished(header, payload) => {
return Ok((header, payload));
Expand Down Expand Up @@ -508,6 +518,11 @@ where
self.flags.contains(CodecFlags::ROLE)
}

pub async fn flush(&mut self) -> Result<(), Error> {
self.io.flush().await?;
Ok(())
}

pub async fn write<A, F>(
&mut self,
opcode: OpCode,
Expand Down
5 changes: 1 addition & 4 deletions ratchet_core/src/framed/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ async fn close() {
)
.await;

let mut frame = vec![136, 126, 1, 0];
frame.extend_from_slice(&[0; 256]);

let buffer = BytesMut::from_iter(frame);
let buffer = BytesMut::from_iter(vec![136, 2, 0, 0]);
let mut framed = FramedIo::new(EmptyIo, buffer, Role::Client, usize::MAX, 0);

let decode_result = framed.read_next(&mut BytesMut::default(), &mut NoExt).await;
Expand Down
8 changes: 8 additions & 0 deletions ratchet_core/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ where
.await
}

pub async fn flush(&mut self) -> Result<(), Error> {
if self.is_closed() {
return Err(Error::with_cause(ErrorKind::Close, CloseCause::Error));
}

self.framed.flush().await
}

/// Close this WebSocket with the reason provided.
///
/// If the WebSocket is already closed then `Ok(())` is returned.
Expand Down
3 changes: 2 additions & 1 deletion ratchet_deflate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ impl ExtensionDecoder for DeflateDecoder {
decompress.reset(false);
}

header.rsv1 = true;
*compressed = false;

Ok(())
}
}
4 changes: 2 additions & 2 deletions ratchet_rs/autobahn/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ async fn kill_container() {
.args(["kill", "fuzzingserver"])
.stdin(Stdio::null())
.spawn()
.expect("Failed to kill any lingering container")
.expect("Failed to spawn command to kill any lingering test container")
.wait()
.await
.expect("Failed to kill any lingering container");
.expect("Failed to kill any lingering test container");
}

fn docker_command() -> Result<Command> {
Expand Down
5 changes: 1 addition & 4 deletions ratchet_rs/autobahn/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::env::current_dir;
use std::process::Stdio;

use anyhow::{bail, Context, Result};
use tokio::process::Command;
Expand Down Expand Up @@ -31,9 +30,7 @@ fn docker_command() -> Result<Command> {
])
// spec is now available at this directory due to how the host directory was mounted
.arg("autobahn/fuzzingclient.json")
.current_dir(pwd)
.stdout(Stdio::piped())
.stderr(Stdio::inherit());
.current_dir(pwd);

Ok(cmd)
}
Expand Down
7 changes: 5 additions & 2 deletions ratchet_rs/examples/autobahn-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ use bytes::BytesMut;
use ratchet_deflate::{Deflate, DeflateExtProvider};
use ratchet_rs::UpgradedClient;
use ratchet_rs::{Error, Message, PayloadType, ProtocolRegistry, WebSocketConfig};
use tokio::io::{BufReader, BufWriter};
use tokio::net::TcpStream;

const AGENT: &str = "Ratchet";

async fn subscribe(url: &str) -> Result<UpgradedClient<TcpStream, Deflate>, Error> {
async fn subscribe(
url: &str,
) -> Result<UpgradedClient<BufReader<BufWriter<TcpStream>>, Deflate>, Error> {
let stream = TcpStream::connect("127.0.0.1:9001").await.unwrap();
stream.set_nodelay(true).unwrap();

ratchet_rs::subscribe_with(
WebSocketConfig::default(),
stream,
BufReader::new(BufWriter::new(stream)),
url,
&DeflateExtProvider::default(),
ProtocolRegistry::default(),
Expand Down
5 changes: 4 additions & 1 deletion ratchet_rs/examples/autobahn-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use bytes::BytesMut;
use log::trace;
use ratchet_rs::deflate::DeflateExtProvider;
use ratchet_rs::{Error, Message, PayloadType, ProtocolRegistry, WebSocketConfig};
use tokio::io::{BufReader, BufWriter};
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
Expand All @@ -30,7 +31,7 @@ async fn main() {

async fn run(stream: TcpStream) -> Result<(), Error> {
let mut websocket = ratchet_rs::accept_with(
stream,
BufReader::new(BufWriter::new(stream)),
WebSocketConfig::default(),
DeflateExtProvider::default(),
ProtocolRegistry::default(),
Expand All @@ -48,10 +49,12 @@ async fn run(stream: TcpStream) -> Result<(), Error> {
Message::Text => {
let _s = String::from_utf8(buf.to_vec())?;
websocket.write(&mut buf, PayloadType::Text).await?;
websocket.flush().await?;
buf.clear();
}
Message::Binary => {
websocket.write(&mut buf, PayloadType::Binary).await?;
websocket.flush().await?;
buf.clear();
}
Message::Ping(_) | Message::Pong(_) => {}
Expand Down

0 comments on commit f1136e9

Please sign in to comment.