Skip to content

Commit

Permalink
Expose server port (#255)
Browse files Browse the repository at this point in the history
### Changelog
The server now exposes the port it's listening on, so you can pass a
port of 0 and later determine the actual port used.

### Docs

None

### Description

Also tweaked a few memory orderings after discussing in slack – we can
stick with the following defaults unless otherwise noted with a comment:
- load: Acquire
- store: Release
- fetch_add/compare_exchange etc.: AcqRel
  • Loading branch information
jtbandes authored Mar 1, 2025
1 parent 16a3b62 commit 9448ccf
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 24 deletions.
7 changes: 7 additions & 0 deletions c/include/foxglove-c/foxglove-c.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ extern "C" {
/**
* Create and start a server. The server must later be freed with `foxglove_server_free`.
*
* `port` may be 0, in which case an available port will be automatically selected.
*
* # Safety
* `name` and `host` must be null-terminated strings with valid UTF8.
*/
struct foxglove_websocket_server *foxglove_server_start(const char *name,
const char *host,
uint16_t port);

/**
* Get the port on which the server is listening.
*/
uint16_t foxglove_server_get_port(const struct foxglove_websocket_server *server);

/**
* Free a server created via `foxglove_server_start`.
*
Expand Down
14 changes: 14 additions & 0 deletions c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub struct FoxgloveWebSocketServer(Option<foxglove::WebSocketServerBlockingHandl

/// Create and start a server. The server must later be freed with `foxglove_server_free`.
///
/// `port` may be 0, in which case an available port will be automatically selected.
///
/// # Safety
/// `name` and `host` must be null-terminated strings with valid UTF8.
#[unsafe(no_mangle)]
Expand All @@ -26,6 +28,18 @@ pub unsafe extern "C" fn foxglove_server_start(
))))
}

/// Get the port on which the server is listening.
#[unsafe(no_mangle)]
pub extern "C" fn foxglove_server_get_port(server: Option<&FoxgloveWebSocketServer>) -> u16 {
let Some(server) = server else {
panic!("Expected a non-null server");
};
let Some(ref handle) = server.0 else {
panic!("Server already stopped");
};
handle.port()
}

/// Free a server created via `foxglove_server_start`.
///
/// If the server has not already been stopped, it will be stopped automatically.
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/ws-server/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ int main(int argc, const char* argv[]) {
options.host = "127.0.0.1";
options.port = 8765;
foxglove::WebSocketServer server{options};
std::cerr << "Started server" << std::endl;
std::cerr << "Server listening on port " << server.port() << std::endl;

std::atomic_bool done = false;
sigintHandler = [&] {
Expand Down
4 changes: 4 additions & 0 deletions cpp/foxglove/include/foxglove/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ struct WebSocketServerOptions {
class WebSocketServer {
public:
WebSocketServer(WebSocketServerOptions options);

// Get the port on which the server is listening.
uint16_t port() const;

void stop();

private:
Expand Down
4 changes: 4 additions & 0 deletions cpp/foxglove/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ void WebSocketServer::stop() {
foxglove_server_stop(_impl.get());
}

uint16_t WebSocketServer::port() const {
return foxglove_server_get_port(_impl.get());
}

} // namespace foxglove
3 changes: 2 additions & 1 deletion cpp/foxglove/tests/test_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ TEST_CASE("Start and stop server") {
foxglove::WebSocketServerOptions options;
options.name = "unit-test";
options.host = "127.0.0.1";
options.port = 8765;
options.port = 0;
foxglove::WebSocketServer server{options};
REQUIRE(server.port() != 0);
server.stop();
}
6 changes: 6 additions & 0 deletions python/foxglove-sdk/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ PEP8 check:
poetry run flake8 .
```

Run unit tests:

```sh
cd python && poetry run python -m unittest
```

### Examples

Examples exist in the `foxglove-sdk-examples` directotry. See each example's readme for usage.
Expand Down
2 changes: 2 additions & 0 deletions python/foxglove-sdk/python/foxglove/_foxglove_py/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class WebSocketServer:
"""

def __new__(cls) -> "WebSocketServer": ...
@property
def port(self) -> int: ...
def stop(self) -> None: ...
def clear_session(self, session_id: Optional[str] = None) -> None: ...
def broadcast_time(self, timestamp_nanos: int) -> None: ...
Expand Down
2 changes: 2 additions & 0 deletions python/foxglove-sdk/python/foxglove/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def test_server_interface(self) -> None:
Exercise the server interface; will also be checked with mypy.
"""
server = start_server(port=0)
self.assertTrue(isinstance(server.port, int))
self.assertNotEqual(server.port, 0)
server.publish_status("test message", StatusLevel.Info, "some-id")
server.broadcast_time(time.time_ns())
server.remove_status(["some-id"])
Expand Down
6 changes: 6 additions & 0 deletions python/foxglove-sdk/src/websocket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,12 @@ impl PyWebSocketServer {
}
}

// Get the port on which the server is listening.
#[getter]
pub fn port(&self) -> u16 {
self.0.as_ref().map_or(0, |handle| handle.port())
}

/// Sets a new session ID and notifies all clients, causing them to reset their state.
/// If no session ID is provided, generates a new one based on the current timestamp.
/// If the server has been stopped, this has no effect.
Expand Down
36 changes: 21 additions & 15 deletions rust/foxglove/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use serde::Serialize;
use std::collections::hash_map::Entry;
use std::collections::HashSet;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32};
use std::sync::Weak;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
Expand Down Expand Up @@ -211,6 +211,8 @@ pub(crate) struct Server {
/// It's analogous to the mixin shared_from_this in C++.
weak_self: Weak<Self>,
started: AtomicBool,
/// Local port the server is listening on, once it has been started
port: AtomicU16,
message_backlog_size: u32,
runtime: Handle,
/// May be provided by the caller
Expand Down Expand Up @@ -340,7 +342,7 @@ impl ConnectedClient {
}

fn is_subscribed_to_connection_graph(&self) -> bool {
self.subscribed_to_connection_graph.load(Relaxed)
self.subscribed_to_connection_graph.load(Acquire)
}

/// Handle a text or binary message sent from the client.
Expand Down Expand Up @@ -899,7 +901,7 @@ impl ConnectedClient {
return;
}

let is_subscribed = self.subscribed_to_connection_graph.load(Relaxed);
let is_subscribed = self.subscribed_to_connection_graph.load(Acquire);
if is_subscribed {
tracing::debug!(
"Client {} is already subscribed to connection graph updates",
Expand Down Expand Up @@ -933,7 +935,7 @@ impl ConnectedClient {
let json_diff = connection_graph.update(current_graph);
// Send the full diff to the client as the starting state
self.send_control_msg(Message::text(json_diff));
self.subscribed_to_connection_graph.store(true, Relaxed);
self.subscribed_to_connection_graph.store(true, Release);
}

fn on_connection_graph_unsubscribe(&self, server: Arc<Server>) {
Expand Down Expand Up @@ -961,7 +963,7 @@ impl ConnectedClient {

// Acquire the lock to sychronize with subscribe and with server.connection_graph_update.
let _guard = server.connection_graph.lock();
self.subscribed_to_connection_graph.store(false, Relaxed);
self.subscribed_to_connection_graph.store(false, Release);
}

/// Send an ad hoc error status message to the client, with the given message.
Expand Down Expand Up @@ -1058,6 +1060,7 @@ impl Server {

Server {
weak_self,
port: AtomicU16::new(0),
started: AtomicBool::new(false),
message_backlog_size: opts
.message_backlog_size
Expand Down Expand Up @@ -1087,13 +1090,17 @@ impl Server {
.expect("server cannot be dropped while in use")
}

pub(crate) fn port(&self) -> u16 {
self.port.load(Acquire)
}

// Returns a handle to the async runtime that this server is using.
pub fn runtime(&self) -> &Handle {
&self.runtime
}

// Spawn a task to accept all incoming connections and return
pub async fn start(&self, host: &str, port: u16) -> Result<String, FoxgloveError> {
// Spawn a task to accept all incoming connections and return the server's local address
pub async fn start(&self, host: &str, port: u16) -> Result<SocketAddr, FoxgloveError> {
if self.started.load(Acquire) {
return Err(FoxgloveError::ServerAlreadyStarted);
}
Expand All @@ -1104,10 +1111,8 @@ impl Server {
let listener = TcpListener::bind(&addr)
.await
.map_err(FoxgloveError::Bind)?;
let bound_addr = listener
.local_addr()
.map_err(|err| FoxgloveError::Unspecified(err.into()))?
.to_string();
let local_addr = listener.local_addr().map_err(FoxgloveError::Bind)?;
self.port.store(local_addr.port(), Release);

let cancellation_token = self.cancellation_token.clone();
let server = self.arc().clone();
Expand All @@ -1120,9 +1125,9 @@ impl Server {
}
});

tracing::info!("Started server on {}", bound_addr);
tracing::info!("Started server on {}", local_addr);

Ok(bound_addr)
Ok(local_addr)
}

pub async fn stop(&self) {
Expand All @@ -1134,6 +1139,7 @@ impl Server {
return;
}
tracing::info!("Shutting down");
self.port.store(0, Release);
let clients = self.clients.get();
for client in clients.iter() {
let mut sender = client.sender.lock().await;
Expand Down Expand Up @@ -1300,7 +1306,7 @@ impl Server {
}

static CLIENT_ID: AtomicU32 = AtomicU32::new(1);
let id = ClientId(CLIENT_ID.fetch_add(1, Relaxed));
let id = ClientId(CLIENT_ID.fetch_add(1, AcqRel));

let (data_tx, data_rx) = flume::bounded(self.message_backlog_size as usize);
let (ctrl_tx, ctrl_rx) = flume::bounded(self.message_backlog_size as usize);
Expand Down
14 changes: 7 additions & 7 deletions rust/foxglove/src/websocket/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ async fn test_log_only_to_subscribers() {
.await
.expect("Failed to start server");

let mut client1 = connect_client(addr.clone()).await;
let mut client2 = connect_client(addr.clone()).await;
let mut client1 = connect_client(addr).await;
let mut client2 = connect_client(addr).await;
let mut client3 = connect_client(addr).await;

// client1 subscribes to ch1; client2 subscribes to ch2; client3 unsubscribes from all
Expand Down Expand Up @@ -648,7 +648,7 @@ async fn test_remove_status() {
.await
.expect("Failed to start server");

let mut ws_client1 = connect_client(addr.clone()).await;
let mut ws_client1 = connect_client(addr).await;
let mut ws_client2 = connect_client(addr).await;

_ = ws_client1.next().await.expect("No serverInfo sent");
Expand Down Expand Up @@ -1095,7 +1095,7 @@ async fn test_services() {
.await
.expect("Failed to start server");

let mut client1 = connect_client(addr.clone()).await;
let mut client1 = connect_client(addr).await;
let _ = client1.next().await.expect("No serverInfo sent").unwrap();
let msg = client1
.next()
Expand Down Expand Up @@ -1209,7 +1209,7 @@ async fn test_services() {
);

// New client sees both services immediately.
let mut client2 = connect_client(addr.clone()).await;
let mut client2 = connect_client(addr).await;
let _ = client2.next().await.expect("No serverInfo sent").unwrap();
let msg = client2
.next()
Expand Down Expand Up @@ -1465,9 +1465,9 @@ async fn test_slow_client() {

/// Connect to a server, ensuring the protocol header is set, and return the client WS stream
pub async fn connect_client(
addr: String,
addr: SocketAddr,
) -> tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>> {
let mut request = format!("ws://{addr}/")
let mut request = format!("ws://{}/", addr)
.into_client_request()
.expect("Failed to build request");

Expand Down
12 changes: 12 additions & 0 deletions rust/foxglove/src/websocket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ impl WebSocketServer {

/// Bind a TCP port.
///
/// `port` may be 0, in which case an available port will be automatically selected.
///
/// By default, the server will bind to `127.0.0.1:8765`.
pub fn bind(mut self, host: impl Into<String>, port: u16) -> Self {
self.host = host.into();
Expand Down Expand Up @@ -212,6 +214,11 @@ impl WebSocketServerHandle {
self.0.runtime()
}

/// Returns the local port that the server is listening on.
pub fn port(&self) -> u16 {
self.0.port()
}

/// Advertises support for the provided services.
///
/// These services will be available for clients to use until they are removed with
Expand Down Expand Up @@ -289,6 +296,11 @@ impl WebSocketServerHandle {
pub struct WebSocketServerBlockingHandle(WebSocketServerHandle);

impl WebSocketServerBlockingHandle {
/// Returns the local port that the server is listening on.
pub fn port(&self) -> u16 {
self.0.port()
}

/// Advertises support for the provided services.
///
/// These services will be available for clients to use until they are removed with
Expand Down

0 comments on commit 9448ccf

Please sign in to comment.