Skip to content

Commit

Permalink
Support interop between v8 and v9+ (#1016)
Browse files Browse the repository at this point in the history
* Support interop between v8 and v9+

* Ignore clippy lint

* Reduce flakiness

* Fix unwrap

* Remove dbg

* Flatten + untagged enums don't really work

* Revert "Flatten + untagged enums don't really work"

Agent configs are broken, but we never use it, so just backing this out for now.

* Workaround serde deficiency

* Enable CI logging

* Sigh

* Disable ansi when outputting json to make output less terrible

* Sigh

* Show output on success to see what it is actually supposed to look like

* ?

* Fix relay selector

* Check if it is just a deployment count issue

* Disable liveness probe

* Derp

* Debug broken listener

* Do extremely inefficient parsing and reserialization

* Sigh

* Show callstack of failure

* Fix serialization

* Revert debug changes
  • Loading branch information
Jake-Shadle authored Sep 17, 2024
1 parent 3392942 commit 3363452
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 74 deletions.
2 changes: 1 addition & 1 deletion build/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ endif
# Set this value if you want to use an external registry
REPOSITORY ?= ""
IMAGE_TAG ?= ${REPOSITORY}quilkin:$(package_version)
PREV_IMAGE_TAG ?= us-docker.pkg.dev/quilkin/release/quilkin:0.9.0
PREV_IMAGE_TAG ?= us-docker.pkg.dev/quilkin/release/quilkin:0.8.0
MINIKUBE_PROFILE ?= quilkin
CARGO_TARGET_DIR ?= /workspace/target/build-image
common_rust_args := -v $(project_path):/workspace -w /workspace \
Expand Down
5 changes: 3 additions & 2 deletions crates/agones/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ mod tests {
};

const PROXY_DEPLOYMENT: &str = "quilkin-xds-proxies";
const SLOW: Duration = Duration::from_secs(60);

#[tokio::test]
#[serial]
Expand Down Expand Up @@ -94,15 +95,15 @@ mod tests {
.await
.unwrap();

let response = timeout(Duration::from_secs(30), rx.recv())
let response = timeout(SLOW, rx.recv())
.await
.expect("should receive packet from GameServer")
.unwrap();
assert_eq!("ACK: ALLOCATE\n", response);

// Proxy Deployment should be ready, since there is now an endpoint
if timeout(
Duration::from_secs(30),
SLOW,
await_condition(deployments.clone(), PROXY_DEPLOYMENT, is_deployment_ready()),
)
.await
Expand Down
5 changes: 4 additions & 1 deletion crates/agones/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ mod tests {
.is_err()
{
debug_pods(&client, format!("role={relay_proxy_name}")).await;
debug_pods(&client, "role=relay".into()).await;
debug_pods(&client, "role=agent".into()).await;
panic!("Quilkin proxy deployment should be ready");
}

Expand Down Expand Up @@ -170,7 +172,8 @@ mod tests {
}
if !failed {
debug_pods(&client, format!("role={relay_proxy_name}")).await;
debug_pods(&client, "role=xds".into()).await;
debug_pods(&client, "role=relay".into()).await;
debug_pods(&client, "role=agent".into()).await;
}
assert!(failed, "Packet should have failed");

Expand Down
54 changes: 50 additions & 4 deletions crates/xds/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,17 @@ impl Drop for DeltaSubscription {
impl AdsClient {
/// Attempts to start a new delta stream to the xDS management server, if the
/// management server does not support delta xDS we return the client as an error
#[allow(clippy::type_complexity)]
pub async fn delta_subscribe<C: crate::config::Configuration>(
self,
config: Arc<C>,
is_healthy: Arc<AtomicBool>,
notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
resources: impl IntoIterator<Item = (&'static str, Vec<String>)>,
resources: &'static [(&'static str, &'static [(&'static str, Vec<String>)])],
) -> Result<DeltaSubscription, Self> {
let resource_subscriptions: Vec<_> = resources.into_iter().collect();

let identifier = String::from(&*self.identifier);

let (mut ds, stream) = match DeltaClientStream::connect(
let (mut ds, mut stream) = match DeltaClientStream::connect(
self.client.clone(),
identifier.clone(),
)
Expand All @@ -433,6 +432,49 @@ impl AdsClient {
}
};

async fn handle_first_response(
stream: &mut tonic::Streaming<DeltaDiscoveryResponse>,
resources: &'static [(&'static str, &'static [(&'static str, Vec<String>)])],
) -> eyre::Result<&'static [(&'static str, Vec<String>)]> {
let resource_subscriptions = if let Some(first) = stream.message().await? {
let mut rsubs = None;
if first.type_url == "ignore-me" {
if !first.system_version_info.is_empty() {
rsubs = resources.iter().find_map(|(vers, subs)| {
(*vers == first.system_version_info).then_some(subs)
});
}
} else {
tracing::warn!("expected `ignore-me` response from management server");
}

if let Some(subs) = rsubs {
subs
} else {
let Some(subs) = resources
.iter()
.find_map(|(vers, subs)| vers.is_empty().then_some(subs))
else {
eyre::bail!("failed to find fallback resource subscription set");
};

subs
}
} else {
eyre::bail!("expected at least one response from the management server");
};

Ok(resource_subscriptions)
}

let resource_subscriptions = match handle_first_response(&mut stream, resources).await {
Ok(rs) => rs,
Err(error) => {
tracing::error!(%error, "failed to acquire matching resource subscriptions based on response from management sever");
return Err(self);
}
};

// Send requests for our resource subscriptions, in this first request we
// won't have any resources, but if we reconnect to management servers in
// the future we'll send the resources we already have locally to hopefully
Expand All @@ -454,6 +496,7 @@ impl AdsClient {
async move {
tracing::trace!("starting xDS delta stream task");
let mut stream = stream;
let mut resource_subscriptions = resource_subscriptions;

loop {
tracing::trace!("creating discovery response handler");
Expand Down Expand Up @@ -504,6 +547,9 @@ impl AdsClient {

(ds, stream) =
DeltaClientStream::connect(new_client, identifier.clone()).await?;

resource_subscriptions = handle_first_response(&mut stream, resources).await?;

ds.refresh(&identifier, resource_subscriptions.to_vec(), &local)
.await?;
}
Expand Down
5 changes: 4 additions & 1 deletion crates/xds/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ pub trait Configuration: Send + Sync + Sized + 'static {
subscribed: crate::server::ControlPlane<Self>,
) -> impl std::future::Future<Output = ()> + Send + 'static;

fn interested_resources(&self) -> impl Iterator<Item = (&'static str, Vec<String>)>;
fn interested_resources(
&self,
server_version: &str,
) -> impl Iterator<Item = (&'static str, Vec<String>)>;
}

pub struct DeltaDiscoveryRes {
Expand Down
67 changes: 46 additions & 21 deletions crates/xds/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::{
net::TcpListener,
};

const VERSION_INFO: &str = "9";

pub struct ControlPlane<C> {
pub config: Arc<C>,
pub idle_request_interval: Duration,
Expand Down Expand Up @@ -218,8 +220,7 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
control_plane: Some(control_plane_id.clone()),
type_url: type_url.into(),
removed_resources,
// Only used for debugging, not really useful
system_version_info: String::new(),
system_version_info: VERSION_INFO.into(),
};

tracing::trace!(
Expand Down Expand Up @@ -248,14 +249,22 @@ impl<C: crate::config::Configuration> ControlPlane<C> {
control_plane: None,
type_url: message.type_url,
removed_resources: Vec::new(),
// Only used for debugging, not really useful
system_version_info: String::new(),
system_version_info: VERSION_INFO.into(),
}
} else {
tracing::debug!(client = %node_id, resource_type = %message.type_url, "initial delta response");

let type_url = message.type_url.clone();
responder(Some(message), &type_url, &mut client_tracker)?.unwrap()
responder(Some(message), &type_url, &mut client_tracker)?.unwrap_or(
DeltaDiscoveryResponse {
resources: Vec::new(),
nonce: String::new(),
control_plane: None,
type_url,
removed_resources: Vec::new(),
system_version_info: VERSION_INFO.into(),
},
)
}
};

Expand Down Expand Up @@ -406,17 +415,25 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for

tracing::info!("control plane discovery delta stream attempt");
let mut responses = responses.into_inner();
let Some(identifier) = responses

fn handle_first_response(
res: DeltaDiscoveryResponse,
) -> Result<(String, String), tonic::Status> {
let Some(identifier) = res.control_plane.map(|cp| cp.identifier) else {
return Err(tonic::Status::invalid_argument(
"DeltaDiscoveryResponse.control_plane.identifier is required in the first message",
));
};

Ok((identifier, res.system_version_info))
}

let first_response = responses
.next()
.await
.ok_or_else(|| tonic::Status::cancelled("received empty first response"))??
.control_plane
.map(|cp| cp.identifier)
else {
return Err(tonic::Status::invalid_argument(
"DeltaDiscoveryResponse.control_plane.identifier is required in the first message",
));
};
.ok_or_else(|| tonic::Status::cancelled("received empty first response"))??;

let (identifier, server_version) = handle_first_response(first_response)?;

tracing::info!(identifier, "new control plane delta discovery stream");
let config = self.config.clone();
Expand All @@ -429,12 +446,16 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
tracing::info!(identifier, "sending initial delta discovery request");

let local = Arc::new(crate::config::LocalVersions::new(
config.interested_resources().map(|(n, _)| n),
config.interested_resources(&server_version).map(|(n, _)| n),
));

ds.refresh(&identifier, config.interested_resources().collect(), &local)
.await
.map_err(|error| tonic::Status::internal(error.to_string()))?;
ds.refresh(
&identifier,
config.interested_resources(&server_version).collect(),
&local,
)
.await
.map_err(|error| tonic::Status::internal(error.to_string()))?;

let mut response_stream = crate::config::handle_delta_discovery_responses(
identifier.clone(),
Expand All @@ -456,9 +477,13 @@ impl<C: crate::config::Configuration> AggregatedControlPlaneDiscoveryService for
.map_err(|_| tonic::Status::internal("this should not be reachable"))?;
} else {
tracing::trace!("exceeded idle interval, sending request");
ds.refresh(&identifier, config.interested_resources().collect(), &local)
.await
.map_err(|error| tonic::Status::internal(error.to_string()))?;
ds.refresh(
&identifier,
config.interested_resources(&server_version).collect(),
&local,
)
.await
.map_err(|error| tonic::Status::internal(error.to_string()))?;
}
}
}
Expand Down
49 changes: 28 additions & 21 deletions examples/xds/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ impl xds::config::Configuration for ClientConfig {
unreachable!();
}

fn interested_resources(&self) -> impl Iterator<Item = (&'static str, Vec<String>)> {
fn interested_resources(
&self,
_server_version: &str,
) -> impl Iterator<Item = (&'static str, Vec<String>)> {
[].into_iter()
}

Expand Down Expand Up @@ -145,13 +148,13 @@ impl xds::config::Configuration for ServerConfig {
}
}

Ok(DeltaDiscoveryRes {
resources,
removed,
})
Ok(DeltaDiscoveryRes { resources, removed })
}

fn interested_resources(&self) -> impl Iterator<Item = (&'static str, Vec<String>)> {
fn interested_resources(
&self,
_server_version: &str,
) -> impl Iterator<Item = (&'static str, Vec<String>)> {
[(TYPE, Vec::new())].into_iter()
}

Expand All @@ -164,7 +167,9 @@ impl xds::config::Configuration for ServerConfig {

async move {
loop {
if item_watcher.recv().await.is_err() { break; };
if item_watcher.recv().await.is_err() {
break;
};
control_plane.push_update(TYPE);
}
}
Expand All @@ -189,33 +194,35 @@ async fn main() {
let relay_listener = xds::net::TcpListener::bind(None).unwrap();
let addr = relay_listener.local_addr();

let server = xds::server::ControlPlane::from_arc(sc.clone(), std::time::Duration::from_secs(60)).management_server(relay_listener).unwrap();
let server =
xds::server::ControlPlane::from_arc(sc.clone(), std::time::Duration::from_secs(60))
.management_server(relay_listener)
.unwrap();

tokio::task::spawn(async move {
server.await
});
tokio::task::spawn(async move { server.await });

let client = xds::client::AdsClient::connect(
"client".into(),
vec![format!("http://{addr}").try_into().unwrap()],
)
.await.unwrap();
.await
.unwrap();

let (stx, srx) = tokio::sync::oneshot::channel();

tokio::task::spawn({
let cc = cc.clone();
async move {
let _stream = client
.delta_subscribe(
cc,
Arc::new(std::sync::atomic::AtomicBool::new(true)),
None,
[
(TYPE, Vec::new()),
],
)
.await.map_err(|_| "failed to subscribe").unwrap();
.delta_subscribe(
cc,
Arc::new(std::sync::atomic::AtomicBool::new(true)),
None,
&[("", &[(TYPE, Vec::new())])],
)
.await
.map_err(|_| "failed to subscribe")
.unwrap();

srx.await.unwrap();
}
Expand Down
Loading

0 comments on commit 3363452

Please sign in to comment.