From 9448ccf37ee2d6a1bb2e61966b160d386955cdbb Mon Sep 17 00:00:00 2001 From: Jacob Bandes-Storch Date: Fri, 28 Feb 2025 17:07:44 -0800 Subject: [PATCH] Expose server port (#255) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Changelog The server now exposes the port it's listening on, so you can pass a port of 0 and later determine the actual port used. ### Docs None ### Description Also tweaked a few memory orderings after discussing in slack – we can stick with the following defaults unless otherwise noted with a comment: - load: Acquire - store: Release - fetch_add/compare_exchange etc.: AcqRel --- c/include/foxglove-c/foxglove-c.h | 7 ++++ c/src/lib.rs | 14 ++++++++ cpp/examples/ws-server/src/main.cpp | 2 +- cpp/foxglove/include/foxglove/server.hpp | 4 +++ cpp/foxglove/src/server.cpp | 4 +++ cpp/foxglove/tests/test_server.cpp | 3 +- python/foxglove-sdk/CONTRIBUTING.md | 6 ++++ .../python/foxglove/_foxglove_py/__init__.pyi | 2 ++ .../python/foxglove/tests/test_server.py | 2 ++ python/foxglove-sdk/src/websocket_server.rs | 6 ++++ rust/foxglove/src/websocket.rs | 36 +++++++++++-------- rust/foxglove/src/websocket/tests.rs | 14 ++++---- rust/foxglove/src/websocket_server.rs | 12 +++++++ 13 files changed, 88 insertions(+), 24 deletions(-) diff --git a/c/include/foxglove-c/foxglove-c.h b/c/include/foxglove-c/foxglove-c.h index c90a72a7..8a4915e9 100644 --- a/c/include/foxglove-c/foxglove-c.h +++ b/c/include/foxglove-c/foxglove-c.h @@ -23,6 +23,8 @@ extern "C" { /** * Create and start a server. The server must later be freed with `foxglove_server_free`. * + * `port` may be 0, in which case an available port will be automatically selected. + * * # Safety * `name` and `host` must be null-terminated strings with valid UTF8. */ @@ -30,6 +32,11 @@ struct foxglove_websocket_server *foxglove_server_start(const char *name, const char *host, uint16_t port); +/** + * Get the port on which the server is listening. + */ +uint16_t foxglove_server_get_port(const struct foxglove_websocket_server *server); + /** * Free a server created via `foxglove_server_start`. * diff --git a/c/src/lib.rs b/c/src/lib.rs index b20fc9d5..d3376331 100644 --- a/c/src/lib.rs +++ b/c/src/lib.rs @@ -8,6 +8,8 @@ pub struct FoxgloveWebSocketServer(Option) -> u16 { + let Some(server) = server else { + panic!("Expected a non-null server"); + }; + let Some(ref handle) = server.0 else { + panic!("Server already stopped"); + }; + handle.port() +} + /// Free a server created via `foxglove_server_start`. /// /// If the server has not already been stopped, it will be stopped automatically. diff --git a/cpp/examples/ws-server/src/main.cpp b/cpp/examples/ws-server/src/main.cpp index e563956a..81e3529f 100644 --- a/cpp/examples/ws-server/src/main.cpp +++ b/cpp/examples/ws-server/src/main.cpp @@ -22,7 +22,7 @@ int main(int argc, const char* argv[]) { options.host = "127.0.0.1"; options.port = 8765; foxglove::WebSocketServer server{options}; - std::cerr << "Started server" << std::endl; + std::cerr << "Server listening on port " << server.port() << std::endl; std::atomic_bool done = false; sigintHandler = [&] { diff --git a/cpp/foxglove/include/foxglove/server.hpp b/cpp/foxglove/include/foxglove/server.hpp index 814f8e78..89d0c044 100644 --- a/cpp/foxglove/include/foxglove/server.hpp +++ b/cpp/foxglove/include/foxglove/server.hpp @@ -15,6 +15,10 @@ struct WebSocketServerOptions { class WebSocketServer { public: WebSocketServer(WebSocketServerOptions options); + + // Get the port on which the server is listening. + uint16_t port() const; + void stop(); private: diff --git a/cpp/foxglove/src/server.cpp b/cpp/foxglove/src/server.cpp index 24197a61..8aa0f525 100644 --- a/cpp/foxglove/src/server.cpp +++ b/cpp/foxglove/src/server.cpp @@ -11,4 +11,8 @@ void WebSocketServer::stop() { foxglove_server_stop(_impl.get()); } +uint16_t WebSocketServer::port() const { + return foxglove_server_get_port(_impl.get()); +} + } // namespace foxglove diff --git a/cpp/foxglove/tests/test_server.cpp b/cpp/foxglove/tests/test_server.cpp index e8501cc3..a0c95794 100644 --- a/cpp/foxglove/tests/test_server.cpp +++ b/cpp/foxglove/tests/test_server.cpp @@ -6,7 +6,8 @@ TEST_CASE("Start and stop server") { foxglove::WebSocketServerOptions options; options.name = "unit-test"; options.host = "127.0.0.1"; - options.port = 8765; + options.port = 0; foxglove::WebSocketServer server{options}; + REQUIRE(server.port() != 0); server.stop(); } diff --git a/python/foxglove-sdk/CONTRIBUTING.md b/python/foxglove-sdk/CONTRIBUTING.md index cafbaeca..c58bde5c 100644 --- a/python/foxglove-sdk/CONTRIBUTING.md +++ b/python/foxglove-sdk/CONTRIBUTING.md @@ -48,6 +48,12 @@ PEP8 check: poetry run flake8 . ``` +Run unit tests: + +```sh +cd python && poetry run python -m unittest +``` + ### Examples Examples exist in the `foxglove-sdk-examples` directotry. See each example's readme for usage. diff --git a/python/foxglove-sdk/python/foxglove/_foxglove_py/__init__.pyi b/python/foxglove-sdk/python/foxglove/_foxglove_py/__init__.pyi index aa12acaa..a44fc19e 100644 --- a/python/foxglove-sdk/python/foxglove/_foxglove_py/__init__.pyi +++ b/python/foxglove-sdk/python/foxglove/_foxglove_py/__init__.pyi @@ -39,6 +39,8 @@ class WebSocketServer: """ def __new__(cls) -> "WebSocketServer": ... + @property + def port(self) -> int: ... def stop(self) -> None: ... def clear_session(self, session_id: Optional[str] = None) -> None: ... def broadcast_time(self, timestamp_nanos: int) -> None: ... diff --git a/python/foxglove-sdk/python/foxglove/tests/test_server.py b/python/foxglove-sdk/python/foxglove/tests/test_server.py index 548200d5..28706fba 100644 --- a/python/foxglove-sdk/python/foxglove/tests/test_server.py +++ b/python/foxglove-sdk/python/foxglove/tests/test_server.py @@ -17,6 +17,8 @@ def test_server_interface(self) -> None: Exercise the server interface; will also be checked with mypy. """ server = start_server(port=0) + self.assertTrue(isinstance(server.port, int)) + self.assertNotEqual(server.port, 0) server.publish_status("test message", StatusLevel.Info, "some-id") server.broadcast_time(time.time_ns()) server.remove_status(["some-id"]) diff --git a/python/foxglove-sdk/src/websocket_server.rs b/python/foxglove-sdk/src/websocket_server.rs index a7f156de..0b284abd 100644 --- a/python/foxglove-sdk/src/websocket_server.rs +++ b/python/foxglove-sdk/src/websocket_server.rs @@ -385,6 +385,12 @@ impl PyWebSocketServer { } } + // Get the port on which the server is listening. + #[getter] + pub fn port(&self) -> u16 { + self.0.as_ref().map_or(0, |handle| handle.port()) + } + /// Sets a new session ID and notifies all clients, causing them to reset their state. /// If no session ID is provided, generates a new one based on the current timestamp. /// If the server has been stopped, this has no effect. diff --git a/rust/foxglove/src/websocket.rs b/rust/foxglove/src/websocket.rs index 8849e638..2c2317bb 100644 --- a/rust/foxglove/src/websocket.rs +++ b/rust/foxglove/src/websocket.rs @@ -16,8 +16,8 @@ use futures_util::{stream::SplitSink, SinkExt, StreamExt}; use serde::Serialize; use std::collections::hash_map::Entry; use std::collections::HashSet; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed}; -use std::sync::atomic::{AtomicBool, AtomicU32}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32}; use std::sync::Weak; use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, net::SocketAddr, sync::Arc}; @@ -211,6 +211,8 @@ pub(crate) struct Server { /// It's analogous to the mixin shared_from_this in C++. weak_self: Weak, started: AtomicBool, + /// Local port the server is listening on, once it has been started + port: AtomicU16, message_backlog_size: u32, runtime: Handle, /// May be provided by the caller @@ -340,7 +342,7 @@ impl ConnectedClient { } fn is_subscribed_to_connection_graph(&self) -> bool { - self.subscribed_to_connection_graph.load(Relaxed) + self.subscribed_to_connection_graph.load(Acquire) } /// Handle a text or binary message sent from the client. @@ -899,7 +901,7 @@ impl ConnectedClient { return; } - let is_subscribed = self.subscribed_to_connection_graph.load(Relaxed); + let is_subscribed = self.subscribed_to_connection_graph.load(Acquire); if is_subscribed { tracing::debug!( "Client {} is already subscribed to connection graph updates", @@ -933,7 +935,7 @@ impl ConnectedClient { let json_diff = connection_graph.update(current_graph); // Send the full diff to the client as the starting state self.send_control_msg(Message::text(json_diff)); - self.subscribed_to_connection_graph.store(true, Relaxed); + self.subscribed_to_connection_graph.store(true, Release); } fn on_connection_graph_unsubscribe(&self, server: Arc) { @@ -961,7 +963,7 @@ impl ConnectedClient { // Acquire the lock to sychronize with subscribe and with server.connection_graph_update. let _guard = server.connection_graph.lock(); - self.subscribed_to_connection_graph.store(false, Relaxed); + self.subscribed_to_connection_graph.store(false, Release); } /// Send an ad hoc error status message to the client, with the given message. @@ -1058,6 +1060,7 @@ impl Server { Server { weak_self, + port: AtomicU16::new(0), started: AtomicBool::new(false), message_backlog_size: opts .message_backlog_size @@ -1087,13 +1090,17 @@ impl Server { .expect("server cannot be dropped while in use") } + pub(crate) fn port(&self) -> u16 { + self.port.load(Acquire) + } + // Returns a handle to the async runtime that this server is using. pub fn runtime(&self) -> &Handle { &self.runtime } - // Spawn a task to accept all incoming connections and return - pub async fn start(&self, host: &str, port: u16) -> Result { + // Spawn a task to accept all incoming connections and return the server's local address + pub async fn start(&self, host: &str, port: u16) -> Result { if self.started.load(Acquire) { return Err(FoxgloveError::ServerAlreadyStarted); } @@ -1104,10 +1111,8 @@ impl Server { let listener = TcpListener::bind(&addr) .await .map_err(FoxgloveError::Bind)?; - let bound_addr = listener - .local_addr() - .map_err(|err| FoxgloveError::Unspecified(err.into()))? - .to_string(); + let local_addr = listener.local_addr().map_err(FoxgloveError::Bind)?; + self.port.store(local_addr.port(), Release); let cancellation_token = self.cancellation_token.clone(); let server = self.arc().clone(); @@ -1120,9 +1125,9 @@ impl Server { } }); - tracing::info!("Started server on {}", bound_addr); + tracing::info!("Started server on {}", local_addr); - Ok(bound_addr) + Ok(local_addr) } pub async fn stop(&self) { @@ -1134,6 +1139,7 @@ impl Server { return; } tracing::info!("Shutting down"); + self.port.store(0, Release); let clients = self.clients.get(); for client in clients.iter() { let mut sender = client.sender.lock().await; @@ -1300,7 +1306,7 @@ impl Server { } static CLIENT_ID: AtomicU32 = AtomicU32::new(1); - let id = ClientId(CLIENT_ID.fetch_add(1, Relaxed)); + let id = ClientId(CLIENT_ID.fetch_add(1, AcqRel)); let (data_tx, data_rx) = flume::bounded(self.message_backlog_size as usize); let (ctrl_tx, ctrl_rx) = flume::bounded(self.message_backlog_size as usize); diff --git a/rust/foxglove/src/websocket/tests.rs b/rust/foxglove/src/websocket/tests.rs index 8fe79dd9..fb55f1fc 100644 --- a/rust/foxglove/src/websocket/tests.rs +++ b/rust/foxglove/src/websocket/tests.rs @@ -304,8 +304,8 @@ async fn test_log_only_to_subscribers() { .await .expect("Failed to start server"); - let mut client1 = connect_client(addr.clone()).await; - let mut client2 = connect_client(addr.clone()).await; + let mut client1 = connect_client(addr).await; + let mut client2 = connect_client(addr).await; let mut client3 = connect_client(addr).await; // client1 subscribes to ch1; client2 subscribes to ch2; client3 unsubscribes from all @@ -648,7 +648,7 @@ async fn test_remove_status() { .await .expect("Failed to start server"); - let mut ws_client1 = connect_client(addr.clone()).await; + let mut ws_client1 = connect_client(addr).await; let mut ws_client2 = connect_client(addr).await; _ = ws_client1.next().await.expect("No serverInfo sent"); @@ -1095,7 +1095,7 @@ async fn test_services() { .await .expect("Failed to start server"); - let mut client1 = connect_client(addr.clone()).await; + let mut client1 = connect_client(addr).await; let _ = client1.next().await.expect("No serverInfo sent").unwrap(); let msg = client1 .next() @@ -1209,7 +1209,7 @@ async fn test_services() { ); // New client sees both services immediately. - let mut client2 = connect_client(addr.clone()).await; + let mut client2 = connect_client(addr).await; let _ = client2.next().await.expect("No serverInfo sent").unwrap(); let msg = client2 .next() @@ -1465,9 +1465,9 @@ async fn test_slow_client() { /// Connect to a server, ensuring the protocol header is set, and return the client WS stream pub async fn connect_client( - addr: String, + addr: SocketAddr, ) -> tokio_tungstenite::WebSocketStream> { - let mut request = format!("ws://{addr}/") + let mut request = format!("ws://{}/", addr) .into_client_request() .expect("Failed to build request"); diff --git a/rust/foxglove/src/websocket_server.rs b/rust/foxglove/src/websocket_server.rs index fa09b163..61bf15b0 100644 --- a/rust/foxglove/src/websocket_server.rs +++ b/rust/foxglove/src/websocket_server.rs @@ -53,6 +53,8 @@ impl WebSocketServer { /// Bind a TCP port. /// + /// `port` may be 0, in which case an available port will be automatically selected. + /// /// By default, the server will bind to `127.0.0.1:8765`. pub fn bind(mut self, host: impl Into, port: u16) -> Self { self.host = host.into(); @@ -212,6 +214,11 @@ impl WebSocketServerHandle { self.0.runtime() } + /// Returns the local port that the server is listening on. + pub fn port(&self) -> u16 { + self.0.port() + } + /// Advertises support for the provided services. /// /// These services will be available for clients to use until they are removed with @@ -289,6 +296,11 @@ impl WebSocketServerHandle { pub struct WebSocketServerBlockingHandle(WebSocketServerHandle); impl WebSocketServerBlockingHandle { + /// Returns the local port that the server is listening on. + pub fn port(&self) -> u16 { + self.0.port() + } + /// Advertises support for the provided services. /// /// These services will be available for clients to use until they are removed with