Skip to content

Commit

Permalink
Fix SenderLink closed state, if link is closed remotely (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Jan 18, 2024
1 parent 53a3cf7 commit 3921603
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.0.1] - 2024-01-18

* Fix SenderLink closed state, if link is closed remotely

## [1.0.0] - 2024-01-09

* Release
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "1.0.0"
version = "1.0.1"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down
1 change: 1 addition & 0 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ impl ReceiverLinkInner {
.get_mut()
.detach_receiver_link(self.handle, true, error, tx);
}
self.closed = true;
self.wake();

async move {
Expand Down
1 change: 1 addition & 0 deletions src/sndlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ impl SenderLinkInner {
}
}

self.closed = true;
self.error = Some(err);
self.on_close.notify();
}
Expand Down
75 changes: 74 additions & 1 deletion tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{cell::Cell, convert::TryFrom, rc::Rc, sync::Arc, sync::Mutex};
use ntex::server::test_server;
use ntex::service::{boxed, boxed::BoxService, fn_factory_with_config, fn_service};
use ntex::util::{Bytes, Either, Ready};
use ntex::{http::Uri, time::sleep, time::Millis};
use ntex::{http::Uri, rt, time::sleep, time::Millis};
use ntex_amqp::{client, error::LinkError, server, types, ControlFrame, ControlFrameKind};

async fn server(
Expand Down Expand Up @@ -191,3 +191,76 @@ async fn test_session_end() -> std::io::Result<()> {

Ok(())
}

#[ntex::test]
async fn test_link_detach() -> std::io::Result<()> {
let _ = env_logger::try_init();

let srv = test_server(move || {
server::Server::build(|con: server::Handshake| async move {
match con {
server::Handshake::Amqp(con) => {
let con = con.open().await.unwrap();
Ok(con.ack(()))
}
server::Handshake::Sasl(_) => Err(()),
}
})
.control(move |frm: ControlFrame| {
if let ControlFrameKind::AttachSender(_, ref link) = frm.kind() {
let link = link.clone();
rt::spawn(async move {
sleep(Millis(150)).await;
let _ = link.close().await;
});
}
Ready::<_, ()>::Ok(())
})
.finish(
server::Router::<()>::new()
.service(
"test",
fn_factory_with_config(|link: types::Link<()>| async move {
rt::spawn(async move {
sleep(Millis(150)).await;
let _ = link.receiver().close().await;
});

Ok::<_, LinkError>(boxed::service(fn_service(|_req| async move {
Ok::<_, LinkError>(types::Outcome::Accept)
})))
}),
)
.finish(),
)
});

let uri = Uri::try_from(format!("amqp://{}:{}", srv.addr().ip(), srv.addr().port())).unwrap();
let client = client::Connector::new().connect(uri).await.unwrap();

let sink = client.sink();
ntex::rt::spawn(async move {
let _ = client.start_default().await;
});

let session = sink.open_session().await.unwrap();
let link = session
.build_sender_link("test", "test")
.attach()
.await
.unwrap();

link.on_close().await;
assert!(link.is_closed());
assert!(!link.is_opened());

let link = session
.build_receiver_link("test", "test")
.attach()
.await
.unwrap();
sleep(Millis(350)).await;
assert!(link.is_closed());

Ok(())
}

0 comments on commit 3921603

Please sign in to comment.