diff --git a/agones/Cargo.toml b/agones/Cargo.toml index e79de498ea..6eb3769db0 100644 --- a/agones/Cargo.toml +++ b/agones/Cargo.toml @@ -25,9 +25,10 @@ readme = "README.md" [dependencies] base64.workspace = true +futures.workspace = true k8s-openapi.workspace = true kube = { workspace = true, features = ["openssl-tls", "client", "derive", "runtime"] } quilkin = { path = "../" } +serial_test = "2.0.0" tokio.workspace = true -futures.workspace = true tracing.workspace = true diff --git a/agones/src/lib.rs b/agones/src/lib.rs index b9941ff214..5b96e0a9d5 100644 --- a/agones/src/lib.rs +++ b/agones/src/lib.rs @@ -17,30 +17,37 @@ use std::{ collections::BTreeMap, env, - time::{SystemTime, UNIX_EPOCH}, + net::SocketAddr, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use futures::{AsyncBufReadExt, TryStreamExt}; use k8s_openapi::{ api::{ - apps::v1::Deployment, + apps::v1::{Deployment, DeploymentSpec}, core::v1::{ ConfigMap, Container, EnvVar, Event, HTTPGetAction, Namespace, Pod, PodSpec, PodTemplateSpec, Probe, ResourceRequirements, ServiceAccount, VolumeMount, }, - rbac::v1::{RoleBinding, RoleRef, Subject}, + core::v1::{ContainerPort, Node}, + rbac::{ + v1::PolicyRule, + v1::{ClusterRole, RoleBinding, RoleRef, Subject}, + }, }, apimachinery::pkg::{ - api::resource::Quantity, apis::meta::v1::ObjectMeta, util::intstr::IntOrString, + api::resource::Quantity, + apis::meta::v1::{LabelSelector, ObjectMeta}, + util::intstr::IntOrString, }, chrono, }; use kube::{ api::{DeleteParams, ListParams, LogParams, PostParams}, - runtime::wait::Condition, + runtime::wait::{await_condition, Condition}, Api, Resource, ResourceExt, }; -use tokio::sync::OnceCell; +use tokio::{sync::OnceCell, time::timeout}; use tracing::debug; use quilkin::config::providers::k8s::agones::{ @@ -49,8 +56,9 @@ use quilkin::config::providers::k8s::agones::{ }; mod pod; +mod provider; +mod relay; mod sidecar; -mod xds; #[allow(dead_code)] static CLIENT: OnceCell = OnceCell::const_new(); @@ -63,6 +71,9 @@ const DELETE_DELAY_SECONDS: &str = "DELETE_DELAY_SECONDS"; pub const GAMESERVER_IMAGE: &str = "us-docker.pkg.dev/agones-images/examples/simple-game-server:0.16"; +/// The dynamic metadata key for routing tokens +pub const TOKEN_KEY: &str = "quilkin.dev/tokens"; + #[derive(Clone)] pub struct Client { /// The Kubernetes client @@ -215,6 +226,230 @@ async fn add_agones_service_account(client: kube::Client, namespace: String) { let _ = role_bindings.create(&pp, &role_binding).await.unwrap(); } +/// Creates a Service account and related RBAC objects to enable a process to query Agones +/// and ConfigMap resources within a cluster +pub async fn create_agones_rbac_read_account( + client: &Client, + service_accounts: Api, + cluster_roles: Api, + role_bindings: Api, +) -> String { + let pp = PostParams::default(); + let rbac_name = "quilkin-agones"; + + // check if sevice account already exists, otherwise create it. + if service_accounts.get(rbac_name).await.is_ok() { + return rbac_name.into(); + } + + // create all the rbac rules + + let rbac_meta = ObjectMeta { + name: Some(rbac_name.into()), + ..Default::default() + }; + let service_account = ServiceAccount { + metadata: rbac_meta.clone(), + ..Default::default() + }; + service_accounts + .create(&pp, &service_account) + .await + .unwrap(); + + // Delete the cluster role if it already exists, since it's cluster wide. + match cluster_roles + .delete(rbac_name, &DeleteParams::default()) + .await + { + Ok(_) => {} + Err(err) => println!("Cluster role not found: {err}"), + }; + let cluster_role = ClusterRole { + metadata: rbac_meta.clone(), + rules: Some(vec![ + PolicyRule { + api_groups: Some(vec!["agones.dev".into()]), + resources: Some(vec!["gameservers".into()]), + verbs: ["get", "list", "watch"].map(String::from).to_vec(), + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["".into()]), + resources: Some(vec!["configmaps".into()]), + verbs: ["get", "list", "watch"].map(String::from).to_vec(), + ..Default::default() + }, + ]), + ..Default::default() + }; + cluster_roles.create(&pp, &cluster_role).await.unwrap(); + + let binding = RoleBinding { + metadata: rbac_meta, + subjects: Some(vec![Subject { + kind: "User".into(), + name: format!("system:serviceaccount:{}:{rbac_name}", client.namespace), + api_group: Some("rbac.authorization.k8s.io".into()), + ..Default::default() + }]), + role_ref: RoleRef { + api_group: "rbac.authorization.k8s.io".into(), + kind: "ClusterRole".into(), + name: rbac_name.into(), + }, + }; + role_bindings.create(&pp, &binding).await.unwrap(); + rbac_name.into() +} + +/// Create a Deployment with a singular Quilkin proxy, and return it's address. +/// The `name` variable is used as role={name} for label lookup. +pub async fn quilkin_proxy_deployment( + client: &Client, + deployments: Api, + name: String, + host_port: u16, + management_server: String, +) -> SocketAddr { + let pp = PostParams::default(); + let mut container = quilkin_container( + client, + Some(vec![ + "proxy".into(), + format!("--management-server={management_server}"), + ]), + None, + ); + + // we'll use a host port, since spinning up a load balancer takes a long time. + // we know that port 7777 is open because this is an Agones cluster and it has associated + // firewall rules , and even if we conflict with a GameServer + // the k8s scheduler will move us to another node. + container.ports = Some(vec![ContainerPort { + container_port: 7777, + host_port: Some(host_port as i32), + protocol: Some("UDP".into()), + ..Default::default() + }]); + + let labels = BTreeMap::from([("role".to_string(), name.clone())]); + let deployment = Deployment { + metadata: ObjectMeta { + name: Some(name), + labels: Some(labels.clone()), + ..Default::default() + }, + spec: Some(DeploymentSpec { + replicas: Some(1), + selector: LabelSelector { + match_expressions: None, + match_labels: Some(labels.clone()), + }, + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(labels.clone()), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![container], + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + }; + + let deployment = deployments.create(&pp, &deployment).await.unwrap(); + let name = deployment.name_unchecked(); + // should not be ready, since there are no endpoints, but let's wait 3 seconds, make sure it doesn't do something we don't expect + let result = timeout( + Duration::from_secs(3), + await_condition(deployments.clone(), name.as_str(), is_deployment_ready()), + ) + .await; + assert!(result.is_err()); + + // get the address to send data to + let pods = client.namespaced_api::(); + let list = pods + .list(&ListParams { + label_selector: Some(format!("role={name}")), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(1, list.items.len()); + + let nodes: Api = Api::all(client.kubernetes.clone()); + let name = list.items[0] + .spec + .as_ref() + .unwrap() + .node_name + .as_ref() + .unwrap(); + let node = nodes.get(name.as_str()).await.unwrap(); + let external_ip = node + .status + .unwrap() + .addresses + .unwrap() + .iter() + .find(|addr| addr.type_ == "ExternalIP") + .unwrap() + .address + .clone(); + + SocketAddr::new(external_ip.parse().unwrap(), host_port) +} + +/// Create a Fleet, and pick on it's GameServers and add the token to it. +/// Returns the details of the GameServer that has been selected. +pub async fn create_tokenised_gameserver( + fleets: Api, + gameservers: Api, + token: &str, +) -> GameServer { + let pp = PostParams::default(); + + // create a fleet so we can ensure that a packet is going to the GameServer we expect, and not + // any other. + let fleet = fleet(); + let fleet = fleets.create(&pp, &fleet).await.unwrap(); + let name = fleet.name_unchecked(); + timeout( + Duration::from_secs(30), + await_condition(fleets.clone(), name.as_str(), is_fleet_ready()), + ) + .await + .expect("Fleet should be ready") + .unwrap(); + + let lp = ListParams { + label_selector: Some(format!("agones.dev/fleet={}", fleet.name_unchecked())), + ..Default::default() + }; + let list = gameservers.list(&lp).await.unwrap(); + + let mut gs = list.items[0].clone(); + // add routing label to the GameServer + assert_eq!(3, token.as_bytes().len()); + gs.metadata + .annotations + .get_or_insert(Default::default()) + .insert( + TOKEN_KEY.into(), + base64::Engine::encode(&base64::engine::general_purpose::STANDARD, token), + ); + gameservers + .replace(gs.name_unchecked().as_str(), &pp, &gs) + .await + .unwrap(); + gs +} + /// Returns a test GameServer with the UDP test binary that is used for /// Agones e2e tests. pub fn game_server() -> GameServer { @@ -371,7 +606,7 @@ pub fn quilkin_container( ..Default::default() }), initial_delay_seconds: Some(3), - period_seconds: Some(2), + period_seconds: Some(1), ..Default::default() }), ..Default::default() @@ -404,6 +639,30 @@ pub fn quilkin_config_map(config: &str) -> ConfigMap { } } +/// Return a ConfigMap that has a standard testing Token Router configuration +pub async fn create_token_router_config(config_maps: &Api) -> ConfigMap { + let pp = PostParams::default(); + + let config = r#" +version: v1alpha1 +filters: + - name: quilkin.filters.capture.v1alpha1.Capture # Capture and remove the authentication token + config: + suffix: + size: 3 + remove: true + - name: quilkin.filters.token_router.v1alpha1.TokenRouter +"#; + let mut config_map = quilkin_config_map(config); + config_map + .metadata + .labels + .get_or_insert(Default::default()) + .insert("quilkin.dev/configmap".into(), "true".into()); + + config_maps.create(&pp, &config_map).await.unwrap() +} + /// Convenience function to return the address with the first port of GameServer pub fn gameserver_address(gs: &GameServer) -> String { let status = gs.status.as_ref().unwrap(); diff --git a/agones/src/provider.rs b/agones/src/provider.rs new file mode 100644 index 0000000000..74c9d77217 --- /dev/null +++ b/agones/src/provider.rs @@ -0,0 +1,254 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, time::Duration}; + + use k8s_openapi::{ + api::{ + apps::v1::{Deployment, DeploymentSpec}, + core::v1::{ + ConfigMap, PodSpec, PodTemplateSpec, Service, ServiceAccount, ServicePort, + ServiceSpec, + }, + rbac::v1::{ClusterRole, RoleBinding}, + }, + apimachinery::pkg::{ + apis::meta::v1::{LabelSelector, ObjectMeta}, + util::intstr::IntOrString, + }, + }; + use kube::{ + api::{DeleteParams, PostParams}, + runtime::wait::await_condition, + Api, ResourceExt, + }; + use serial_test::serial; + use tokio::time::timeout; + + use quilkin::{ + config::providers::k8s::agones::{Fleet, GameServer}, + test_utils::TestHelper, + }; + + use crate::{ + create_agones_rbac_read_account, create_token_router_config, create_tokenised_gameserver, + debug_pods, is_deployment_ready, quilkin_container, quilkin_proxy_deployment, Client, + TOKEN_KEY, + }; + + const PROXY_DEPLOYMENT: &str = "quilkin-xds-proxies"; + + #[tokio::test] + #[serial] + /// Test for Agones Provider integration. Since this will look at all GameServers in the namespace + /// for this test, we should only run Agones integration test in a serial manner, since they + /// could easily collide with each other. + async fn agones_token_router() { + let client = Client::new().await; + + let deployments: Api = client.namespaced_api(); + let fleets: Api = client.namespaced_api(); + let gameservers: Api = client.namespaced_api(); + let config_maps: Api = client.namespaced_api(); + + let pp = PostParams::default(); + let dp = DeleteParams::default(); + + let config_map = create_token_router_config(&config_maps).await; + + agones_control_plane(&client, deployments.clone()).await; + let proxy_address = quilkin_proxy_deployment( + &client, + deployments.clone(), + PROXY_DEPLOYMENT.into(), + 7005, + "http://quilkin-manage-agones:7800".into(), + ) + .await; + + let token = "456"; // NDU2 + let gs = create_tokenised_gameserver(fleets, gameservers.clone(), token).await; + let gs_address = crate::gameserver_address(&gs); + // and allocate it such that we have an endpoint. + // let's allocate this specific game server + let mut t = TestHelper::default(); + let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await; + socket + .send_to("ALLOCATE".as_bytes(), gs_address) + .await + .unwrap(); + + let response = timeout(Duration::from_secs(30), rx.recv()) + .await + .expect("should receive packet from GameServer") + .unwrap(); + assert_eq!("ACK: ALLOCATE\n", response); + + // Proxy Deployment should be ready, since there is now an endpoint + if timeout( + Duration::from_secs(30), + await_condition(deployments.clone(), PROXY_DEPLOYMENT, is_deployment_ready()), + ) + .await + .is_err() + { + debug_pods(&client, format!("role={PROXY_DEPLOYMENT}")).await; + panic!("Quilkin proxy deployment should be ready"); + } + + // keep trying to send the packet to the proxy until it works, since distributed systems are eventually consistent. + let mut response: String = "not-found".into(); + for i in 0..30 { + println!("Connection Attempt: {i}"); + + // returns the nae of the GameServer. This proves we are routing the the allocated + // GameServer with the correct token attached. + socket + .send_to(format!("GAMESERVER{token}").as_bytes(), proxy_address) + .await + .unwrap(); + + let result = timeout(Duration::from_secs(1), rx.recv()).await; + if let Ok(Some(value)) = result { + response = value; + break; + } + } + assert_eq!(format!("NAME: {}\n", gs.name_unchecked()), response); + + // let's remove the token from the gameserver, which should remove access. + let mut gs = gameservers.get(gs.name_unchecked().as_str()).await.unwrap(); + let name = gs.name_unchecked(); + gs.metadata + .annotations + .as_mut() + .map(|annotations| annotations.remove(TOKEN_KEY).unwrap()); + gameservers.replace(name.as_str(), &pp, &gs).await.unwrap(); + // now we should send a packet, and not get a response. + let mut failed = false; + for i in 0..30 { + println!("Disconnection Attempt: {i}"); + socket + .send_to(format!("GAMESERVER{token}").as_bytes(), proxy_address) + .await + .unwrap(); + + let result = timeout(Duration::from_secs(1), rx.recv()).await; + if result.is_err() { + failed = true; + break; + } + } + if !failed { + debug_pods(&client, format!("role={PROXY_DEPLOYMENT}")).await; + debug_pods(&client, "role=xds".into()).await; + } + assert!(failed, "Packet should have failed"); + + // cleanup + config_maps + .delete(&config_map.name_unchecked(), &dp) + .await + .unwrap(); + } + + /// Creates Quilkin xDS management instance that is in the mode to watch Agones GameServers + /// in this test namespace + async fn agones_control_plane(client: &Client, deployments: Api) { + let services: Api = client.namespaced_api(); + let service_accounts: Api = client.namespaced_api(); + let cluster_roles: Api = Api::all(client.kubernetes.clone()); + let role_bindings: Api = client.namespaced_api(); + let pp = PostParams::default(); + + let rbac_name = + create_agones_rbac_read_account(client, service_accounts, cluster_roles, role_bindings) + .await; + + // Setup the xDS Agones provider server + let args = [ + "manage", + "agones", + "--config-namespace", + client.namespace.as_str(), + "--gameservers-namespace", + client.namespace.as_str(), + ] + .map(String::from) + .to_vec(); + let labels = BTreeMap::from([("role".to_string(), "xds".to_string())]); + let deployment = Deployment { + metadata: ObjectMeta { + name: Some("quilkin-manage-agones".into()), + labels: Some(labels.clone()), + ..Default::default() + }, + spec: Some(DeploymentSpec { + replicas: Some(1), + selector: LabelSelector { + match_expressions: None, + match_labels: Some(labels.clone()), + }, + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(labels.clone()), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![quilkin_container(client, Some(args), None)], + service_account_name: Some(rbac_name), + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + }; + + let deployment = deployments.create(&pp, &deployment).await.unwrap(); + + let service = Service { + metadata: ObjectMeta { + name: Some("quilkin-manage-agones".into()), + ..Default::default() + }, + spec: Some(ServiceSpec { + selector: Some(labels), + ports: Some(vec![ServicePort { + protocol: Some("TCP".into()), + port: 7800, + target_port: Some(IntOrString::Int(7800)), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + }; + services.create(&pp, &service).await.unwrap(); + + // make sure the deployment and service are ready + let name = deployment.name_unchecked(); + timeout( + Duration::from_secs(30), + await_condition(deployments.clone(), name.as_str(), is_deployment_ready()), + ) + .await + .expect("xDS provider deployment should be ready") + .unwrap(); + } +} diff --git a/agones/src/relay.rs b/agones/src/relay.rs new file mode 100644 index 0000000000..5081ef0e43 --- /dev/null +++ b/agones/src/relay.rs @@ -0,0 +1,318 @@ +/* + * Copyright 2023 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, time::Duration}; + + use k8s_openapi::{ + api::{ + apps::v1::{Deployment, DeploymentSpec}, + core::v1::{ + ConfigMap, PodSpec, PodTemplateSpec, Service, ServiceAccount, ServicePort, + ServiceSpec, + }, + rbac::v1::{ClusterRole, RoleBinding}, + }, + apimachinery::pkg::{ + apis::meta::v1::{LabelSelector, ObjectMeta}, + util::intstr::IntOrString, + }, + }; + use kube::{ + api::{DeleteParams, PostParams}, + runtime::wait::await_condition, + Api, ResourceExt, + }; + use serial_test::serial; + use tokio::time::timeout; + + use quilkin::{ + config::providers::k8s::agones::{Fleet, GameServer}, + test_utils::TestHelper, + }; + + use crate::{ + create_agones_rbac_read_account, create_token_router_config, create_tokenised_gameserver, + debug_pods, is_deployment_ready, quilkin_container, quilkin_proxy_deployment, Client, + TOKEN_KEY, + }; + + // TOXO: add documentation to Agent and Relay on config options. + + #[tokio::test] + #[serial] + /// Test for Agones Provider integration. Since this will look at all GameServers in the namespace + /// for this test, we should only run Agones integration test in a serial manner, since they + /// could easily collide with each other. + async fn agones_token_router() { + let client = Client::new().await; + let config_maps: Api = client.namespaced_api(); + let deployments: Api = client.namespaced_api(); + let fleets: Api = client.namespaced_api(); + let gameservers: Api = client.namespaced_api(); + + let pp = PostParams::default(); + let dp = DeleteParams::default(); + + let config_map = create_token_router_config(&config_maps).await; + agones_agent_deployment(&client, deployments.clone()).await; + + let relay_proxy_name = "quilkin-relay-proxy"; + let proxy_address = quilkin_proxy_deployment( + &client, + deployments.clone(), + relay_proxy_name.into(), + 7005, + "http://quilkin-relay-agones:7800".into(), + ) + .await; + + let token = "789"; + let gs = create_tokenised_gameserver(fleets, gameservers.clone(), token).await; + let gs_address = crate::gameserver_address(&gs); + + let mut t = TestHelper::default(); + let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await; + socket + .send_to("ALLOCATE".as_bytes(), gs_address) + .await + .unwrap(); + + let response = timeout(Duration::from_secs(30), rx.recv()) + .await + .expect("should receive packet from GameServer") + .unwrap(); + assert_eq!("ACK: ALLOCATE\n", response); + + // Proxy Deployment should be ready, since there is now an endpoint + if timeout( + Duration::from_secs(30), + await_condition(deployments.clone(), relay_proxy_name, is_deployment_ready()), + ) + .await + .is_err() + { + debug_pods(&client, format!("role={relay_proxy_name}")).await; + panic!("Quilkin proxy deployment should be ready"); + } + + // keep trying to send the packet to the proxy until it works, since distributed systems are eventually consistent. + let mut response: String = "not-found".into(); + for i in 0..30 { + println!("Connection Attempt: {i}"); + + // returns the nae of the GameServer. This proves we are routing the the allocated + // GameServer with the correct token attached. + socket + .send_to(format!("GAMESERVER{token}").as_bytes(), proxy_address) + .await + .unwrap(); + + let result = timeout(Duration::from_secs(1), rx.recv()).await; + if let Ok(Some(value)) = result { + response = value; + break; + } + } + assert_eq!(format!("NAME: {}\n", gs.name_unchecked()), response); + + // let's remove the token from the gameserver, which should remove access. + let mut gs = gameservers.get(gs.name_unchecked().as_str()).await.unwrap(); + let name = gs.name_unchecked(); + gs.metadata + .annotations + .as_mut() + .map(|annotations| annotations.remove(TOKEN_KEY).unwrap()); + gameservers.replace(name.as_str(), &pp, &gs).await.unwrap(); + // now we should send a packet, and not get a response. + let mut failed = false; + for i in 0..30 { + println!("Disconnection Attempt: {i}"); + socket + .send_to(format!("GAMESERVER{token}").as_bytes(), proxy_address) + .await + .unwrap(); + + let result = timeout(Duration::from_secs(1), rx.recv()).await; + if result.is_err() { + failed = true; + break; + } + } + if !failed { + debug_pods(&client, format!("role={relay_proxy_name}")).await; + debug_pods(&client, "role=xds".into()).await; + } + assert!(failed, "Packet should have failed"); + + // cleanup + config_maps + .delete(&config_map.name_unchecked(), &dp) + .await + .unwrap(); + } + + /// Deploys the Agent and Relay Server Deployents and Services + async fn agones_agent_deployment(client: &Client, deployments: Api) { + let service_accounts: Api = client.namespaced_api(); + let cluster_roles: Api = Api::all(client.kubernetes.clone()); + let role_bindings: Api = client.namespaced_api(); + let services: Api = client.namespaced_api(); + + let pp = PostParams::default(); + + let rbac_name = + create_agones_rbac_read_account(client, service_accounts, cluster_roles, role_bindings) + .await; + + // Setup the relay + let args = [ + "relay", + "agones", + "--config-namespace", + client.namespace.as_str(), + ] + .map(String::from) + .to_vec(); + let labels = BTreeMap::from([("role".to_string(), "relay".to_string())]); + let deployment = Deployment { + metadata: ObjectMeta { + name: Some("quilkin-relay-agones".into()), + labels: Some(labels.clone()), + ..Default::default() + }, + spec: Some(DeploymentSpec { + replicas: Some(1), + selector: LabelSelector { + match_expressions: None, + match_labels: Some(labels.clone()), + }, + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(labels.clone()), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![quilkin_container(client, Some(args), None)], + service_account_name: Some(rbac_name.clone()), + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + }; + let relay_deployment = deployments.create(&pp, &deployment).await.unwrap(); + + // relay service + let service = Service { + metadata: ObjectMeta { + name: Some("quilkin-relay-agones".into()), + ..Default::default() + }, + spec: Some(ServiceSpec { + selector: Some(labels), + ports: Some(vec![ + ServicePort { + name: Some("ads".into()), + protocol: Some("TCP".into()), + port: 7800, + target_port: Some(IntOrString::Int(7800)), + ..Default::default() + }, + ServicePort { + name: Some("cpds".into()), + protocol: Some("TCP".into()), + port: 7900, + target_port: Some(IntOrString::Int(7900)), + ..Default::default() + }, + ]), + ..Default::default() + }), + ..Default::default() + }; + services.create(&pp, &service).await.unwrap(); + + let name = relay_deployment.name_unchecked(); + let result = timeout( + Duration::from_secs(30), + await_condition(deployments.clone(), name.as_str(), is_deployment_ready()), + ) + .await; + if result.is_err() { + debug_pods(client, "role=relay".into()).await; + + panic!("Relay Deployment should be ready"); + } + result.unwrap().expect("Should have a relay deployment"); + + // agent deployment + let args = [ + "agent", + "--relay", + "http://quilkin-relay-agones:7900", + "agones", + "--config-namespace", + client.namespace.as_str(), + "--gameservers-namespace", + client.namespace.as_str(), + ] + .map(String::from) + .to_vec(); + let labels = BTreeMap::from([("role".to_string(), "agent".to_string())]); + let deployment = Deployment { + metadata: ObjectMeta { + name: Some("quilkin-agones-agent".into()), + labels: Some(labels.clone()), + ..Default::default() + }, + spec: Some(DeploymentSpec { + replicas: Some(1), + selector: LabelSelector { + match_expressions: None, + match_labels: Some(labels.clone()), + }, + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(labels.clone()), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![quilkin_container(client, Some(args), None)], + service_account_name: Some(rbac_name), + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + }; + let agent_deployment = deployments.create(&pp, &deployment).await.unwrap(); + let name = agent_deployment.name_unchecked(); + let result = timeout( + Duration::from_secs(30), + await_condition(deployments.clone(), name.as_str(), is_deployment_ready()), + ) + .await; + if result.is_err() { + debug_pods(client, "role=agent".into()).await; + panic!("Agent Deployment should be ready"); + } + result.unwrap().expect("Should have an agent deployment"); + } +} diff --git a/agones/src/xds.rs b/agones/src/xds.rs deleted file mode 100644 index 48689dfdec..0000000000 --- a/agones/src/xds.rs +++ /dev/null @@ -1,447 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#[cfg(test)] -mod tests { - use std::net::SocketAddr; - use std::{collections::BTreeMap, time::Duration}; - - use k8s_openapi::{ - api::{ - apps::v1::{Deployment, DeploymentSpec}, - core::v1::{ - ConfigMap, ContainerPort, Node, Pod, PodSpec, PodTemplateSpec, Service, - ServiceAccount, ServicePort, ServiceSpec, - }, - rbac::v1::{ClusterRole, PolicyRule, RoleBinding, RoleRef, Subject}, - }, - apimachinery::pkg::{ - apis::meta::v1::{LabelSelector, ObjectMeta}, - util::intstr::IntOrString, - }, - }; - use kube::{ - api::{DeleteParams, ListParams, PostParams}, - runtime::wait::await_condition, - Api, ResourceExt, - }; - use tokio::time::timeout; - - use quilkin::{ - config::providers::k8s::agones::{Fleet, GameServer}, - test_utils::TestHelper, - }; - - use crate::{ - debug_pods, fleet, is_deployment_ready, is_fleet_ready, quilkin_config_map, - quilkin_container, Client, - }; - - const PROXY_DEPLOYMENT: &str = "quilkin-proxies"; - - #[tokio::test] - /// Test for Agones integration. Since this will look at all GameServers in the namespace - /// for this test, we should only have single Agones integration test in this suite, since they - /// could easily collide with each other. - async fn agones_token_router() { - let client = Client::new().await; - - let deployments: Api = client.namespaced_api(); - let fleets: Api = client.namespaced_api(); - let gameservers: Api = client.namespaced_api(); - let config_maps: Api = client.namespaced_api(); - - let pp = PostParams::default(); - - let config = r#" -version: v1alpha1 -filters: - - name: quilkin.filters.capture.v1alpha1.Capture # Capture and remove the authentication token - config: - suffix: - size: 3 - remove: true - - name: quilkin.filters.token_router.v1alpha1.TokenRouter -"#; - let mut config_map = quilkin_config_map(config); - config_map - .metadata - .labels - .get_or_insert(Default::default()) - .insert("quilkin.dev/configmap".into(), "true".into()); - - config_maps.create(&pp, &config_map).await.unwrap(); - - agones_control_plane(&client, deployments.clone()).await; - let proxy_address = quilkin_proxy_deployment(&client, deployments.clone()).await; - - // create a fleet so we can ensure that a packet is going to the GameServer we expect, and not - // any other. - let fleet = fleet(); - let fleet = fleets.create(&pp, &fleet).await.unwrap(); - let name = fleet.name_unchecked(); - timeout( - Duration::from_secs(30), - await_condition(fleets.clone(), name.as_str(), is_fleet_ready()), - ) - .await - .expect("Fleet should be ready") - .unwrap(); - - let lp = ListParams { - label_selector: Some(format!("agones.dev/fleet={}", fleet.name_unchecked())), - ..Default::default() - }; - let list = gameservers.list(&lp).await.unwrap(); - - // let's allocate this specific game server - let mut t = TestHelper::default(); - let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await; - - let mut gs = list.items[0].clone(); - let gs_address = crate::gameserver_address(&gs); - - // add routing label to the GameServer - let token = "456"; // NDU2 - assert_eq!(3, token.as_bytes().len()); - let token_key = "quilkin.dev/tokens"; - gs.metadata - .annotations - .get_or_insert(Default::default()) - .insert( - token_key.into(), - base64::Engine::encode(&base64::engine::general_purpose::STANDARD, token), - ); - gameservers - .replace(gs.name_unchecked().as_str(), &pp, &gs) - .await - .unwrap(); - // and allocate it such that we have an endpoint. - socket - .send_to("ALLOCATE".as_bytes(), gs_address) - .await - .unwrap(); - - let response = timeout(Duration::from_secs(30), rx.recv()) - .await - .expect("should receive packet from GameServer") - .unwrap(); - assert_eq!("ACK: ALLOCATE\n", response); - - // Proxy Deployment should be ready, since there is now an endpoint - if timeout( - Duration::from_secs(30), - await_condition(deployments.clone(), PROXY_DEPLOYMENT, is_deployment_ready()), - ) - .await - .is_err() - { - debug_pods(&client, "role=proxy".into()).await; - panic!("Quilkin proxy deployment should be ready"); - } - - // keep trying to send the packet to the proxy until it works, since distributed systems are eventually consistent. - let mut response: String = "not-found".into(); - for i in 0..30 { - println!("Connection Attempt: {i}"); - - // returns the nae of the GameServer. This proves we are routing the the allocated - // GameServer with the correct token attached. - socket - .send_to(format!("GAMESERVER{token}").as_bytes(), proxy_address) - .await - .unwrap(); - - let result = timeout(Duration::from_secs(1), rx.recv()).await; - if let Ok(Some(value)) = result { - response = value; - break; - } - } - assert_eq!(format!("NAME: {}\n", gs.name_unchecked()), response); - - // let's remove the token from the gameserver, which should remove access. - let mut gs = gameservers.get(gs.name_unchecked().as_str()).await.unwrap(); - let name = gs.name_unchecked(); - gs.metadata - .annotations - .as_mut() - .map(|annotations| annotations.remove(token_key).unwrap()); - gameservers.replace(name.as_str(), &pp, &gs).await.unwrap(); - // now we should send a packet, and not get a response. - let mut failed = false; - for i in 0..30 { - println!("Disconnection Attempt: {i}"); - socket - .send_to(format!("GAMESERVER{token}").as_bytes(), proxy_address) - .await - .unwrap(); - - let result = timeout(Duration::from_secs(1), rx.recv()).await; - if result.is_err() { - failed = true; - break; - } - } - if !failed { - debug_pods(&client, "role=proxy".into()).await; - debug_pods(&client, "role=xds".into()).await; - } - assert!(failed, "Packet should have failed"); - } - - /// Creates Quilkin xDS management instance that is in the mode to watch Agones GameServers - /// in this test namespace - async fn agones_control_plane(client: &Client, deployments: Api) { - let services: Api = client.namespaced_api(); - let service_accounts: Api = client.namespaced_api(); - let cluster_roles: Api = Api::all(client.kubernetes.clone()); - let role_bindings: Api = client.namespaced_api(); - let pp = PostParams::default(); - - // create all the rbac rules - let rbac_name = "quilkin-agones"; - let rbac_meta = ObjectMeta { - name: Some(rbac_name.into()), - ..Default::default() - }; - let service_account = ServiceAccount { - metadata: rbac_meta.clone(), - ..Default::default() - }; - service_accounts - .create(&pp, &service_account) - .await - .unwrap(); - - // Delete the cluster role if it already exists, since it's cluster wide. - match cluster_roles - .delete(rbac_name, &DeleteParams::default()) - .await - { - Ok(_) => {} - Err(err) => println!("Cluster role not found: {err}"), - }; - let cluster_role = ClusterRole { - metadata: rbac_meta.clone(), - rules: Some(vec![ - PolicyRule { - api_groups: Some(vec!["agones.dev".into()]), - resources: Some(vec!["gameservers".into()]), - verbs: ["get", "list", "watch"].map(String::from).to_vec(), - ..Default::default() - }, - PolicyRule { - api_groups: Some(vec!["".into()]), - resources: Some(vec!["configmaps".into()]), - verbs: ["get", "list", "watch"].map(String::from).to_vec(), - ..Default::default() - }, - ]), - ..Default::default() - }; - cluster_roles.create(&pp, &cluster_role).await.unwrap(); - - let binding = RoleBinding { - metadata: rbac_meta, - subjects: Some(vec![Subject { - kind: "User".into(), - name: format!("system:serviceaccount:{}:{rbac_name}", client.namespace), - api_group: Some("rbac.authorization.k8s.io".into()), - ..Default::default() - }]), - role_ref: RoleRef { - api_group: "rbac.authorization.k8s.io".into(), - kind: "ClusterRole".into(), - name: rbac_name.into(), - }, - }; - role_bindings.create(&pp, &binding).await.unwrap(); - - // Setup the xDS Agones provider server - let args = [ - "manage", - "agones", - "--config-namespace", - client.namespace.as_str(), - "--gameservers-namespace", - client.namespace.as_str(), - ] - .map(String::from) - .to_vec(); - let mut container = quilkin_container(client, Some(args), None); - container.ports = Some(vec![ContainerPort { - container_port: 7777, - ..Default::default() - }]); - let labels = BTreeMap::from([("role".to_string(), "xds".to_string())]); - let deployment = Deployment { - metadata: ObjectMeta { - name: Some("quilkin-manage-agones".into()), - labels: Some(labels.clone()), - ..Default::default() - }, - spec: Some(DeploymentSpec { - replicas: Some(1), - selector: LabelSelector { - match_expressions: None, - match_labels: Some(labels.clone()), - }, - template: PodTemplateSpec { - metadata: Some(ObjectMeta { - labels: Some(labels.clone()), - ..Default::default() - }), - spec: Some(PodSpec { - containers: vec![container], - service_account_name: Some(rbac_name.into()), - ..Default::default() - }), - }, - ..Default::default() - }), - ..Default::default() - }; - - let deployment = deployments.create(&pp, &deployment).await.unwrap(); - - let service = Service { - metadata: ObjectMeta { - name: Some("quilkin-manage-agones".into()), - ..Default::default() - }, - spec: Some(ServiceSpec { - selector: Some(labels), - ports: Some(vec![ServicePort { - protocol: Some("TCP".into()), - port: 7800, - target_port: Some(IntOrString::Int(7800)), - ..Default::default() - }]), - ..Default::default() - }), - ..Default::default() - }; - services.create(&pp, &service).await.unwrap(); - - // make sure the deployment and service are ready - let name = deployment.name_unchecked(); - timeout( - Duration::from_secs(30), - await_condition(deployments.clone(), name.as_str(), is_deployment_ready()), - ) - .await - .expect("xDS provider deployment should be ready") - .unwrap(); - } - - /// create a Deployment with a singular Quilkin proxy, that is configured - /// to be attached to the Quilkin Agones xDS server in `agones_control_plane()`. - async fn quilkin_proxy_deployment(client: &Client, deployments: Api) -> SocketAddr { - let pp = PostParams::default(); - let mut container = quilkin_container( - client, - Some(vec![ - "proxy".into(), - "--management-server=http://quilkin-manage-agones:7800".into(), - ]), - None, - ); - - // we'll use a host port, since spinning up a load balancer takes a long time. - // we know that port 7777 is open because this is an Agones cluster and it has associated - // firewall rules , and even if we conflict with a GameServer - // the k8s scheduler will move us to another node. - let host_port: u16 = 7005; - container.ports = Some(vec![ContainerPort { - container_port: 7777, - host_port: Some(host_port as i32), - protocol: Some("UDP".into()), - ..Default::default() - }]); - - let labels = BTreeMap::from([("role".to_string(), "proxy".to_string())]); - let deployment = Deployment { - metadata: ObjectMeta { - name: Some(PROXY_DEPLOYMENT.into()), - labels: Some(labels.clone()), - ..Default::default() - }, - spec: Some(DeploymentSpec { - replicas: Some(1), - selector: LabelSelector { - match_expressions: None, - match_labels: Some(labels.clone()), - }, - template: PodTemplateSpec { - metadata: Some(ObjectMeta { - labels: Some(labels.clone()), - ..Default::default() - }), - spec: Some(PodSpec { - containers: vec![container], - ..Default::default() - }), - }, - ..Default::default() - }), - ..Default::default() - }; - - let deployment = deployments.create(&pp, &deployment).await.unwrap(); - let name = deployment.name_unchecked(); - // should not be ready, since there are no endpoints, but let's wait 3 seconds, make sure it doesn't do something we don't expect - let result = timeout( - Duration::from_secs(3), - await_condition(deployments.clone(), name.as_str(), is_deployment_ready()), - ) - .await; - assert!(result.is_err()); - - // get the address to send data to - let pods = client.namespaced_api::(); - let list = pods - .list(&ListParams { - label_selector: Some("role=proxy".into()), - ..Default::default() - }) - .await - .unwrap(); - assert_eq!(1, list.items.len()); - - let nodes: Api = Api::all(client.kubernetes.clone()); - let name = list.items[0] - .spec - .as_ref() - .unwrap() - .node_name - .as_ref() - .unwrap(); - let node = nodes.get(name.as_str()).await.unwrap(); - let external_ip = node - .status - .unwrap() - .addresses - .unwrap() - .iter() - .find(|addr| addr.type_ == "ExternalIP") - .unwrap() - .address - .clone(); - - SocketAddr::new(external_ip.parse().unwrap(), host_port) - } -} diff --git a/src/cli.rs b/src/cli.rs index ebaba32e35..96111a70dd 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -108,8 +108,8 @@ pub enum Commands { impl Commands { pub fn admin_mode(&self) -> Option { match self { - Self::Proxy(_) | Self::Agent(_) => Some(Mode::Proxy), - Self::Relay(_) | Self::Manage(_) => Some(Mode::Xds), + Self::Proxy(_) => Some(Mode::Proxy), + Self::Relay(_) | Self::Manage(_) | Self::Agent(_) => Some(Mode::Xds), Self::GenerateConfigSchema(_) | Self::Qcmp(_) => None, } }