Skip to content

Commit

Permalink
Abstract and fix draining (istio#1176)
Browse files Browse the repository at this point in the history
* Abstract and fix draining

* Centralize draining logic in one helper function
* Fix inbound draining (HBONE). Before, we did not shut down the
  listener upon draining. This meant new connections would go to the old
ztunnel on a ztunnel restart.
* Simplify inbound draining; do not re-create the force shutdown logic,
  and instead let the common abstraction do it (which does it slightly
better)
* socsk5: add propery draining with force shutdown. Remove double-spawn,
  which adds some complexity around the proxy_to_cancellable.

This is primarily tested in istio/istio#51710,
which sends a large stream of requests and restarts ztunnel and the
backend app (2 different tests). With this change, these tests pass.

It would be good to get more isolated tests in this repo in the future
as well

* Refactor our into own package

* Add tests for draining

* unclean but forceful shutdown

* fmt

* Fix flakes

* fix flake
  • Loading branch information
howardjohn authored Jul 5, 2024
1 parent 7a34469 commit c68a919
Show file tree
Hide file tree
Showing 31 changed files with 736 additions and 399 deletions.
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ base64 = "0.22"
byteorder = "1.5"
bytes = { version = "1.5", features = ["serde"] }
chrono = "0.4"
drain = "0.1"
duration-str = "0.7"
futures = "0.3"
futures-core = "0.3"
Expand Down
10 changes: 0 additions & 10 deletions fuzz/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{signal, telemetry};

use base64::engine::general_purpose::STANDARD;
use bytes::Bytes;
use drain::Watch;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::{header::HeaderValue, header::CONTENT_TYPE, Request, Response};
Expand All @@ -36,6 +35,7 @@ use std::sync::Arc;
use std::time::SystemTime;
use std::{net::SocketAddr, time::Duration};

use crate::drain::DrainWatcher;
use tokio::time;
use tracing::{error, info, warn};
use tracing_subscriber::filter;
Expand Down Expand Up @@ -106,7 +106,7 @@ impl Service {
config: Arc<Config>,
proxy_state: DemandProxyState,
shutdown_trigger: signal::ShutdownTrigger,
drain_rx: Watch,
drain_rx: DrainWatcher,
cert_manager: Arc<SecretManager>,
) -> anyhow::Result<Self> {
Server::<State>::bind(
Expand Down
16 changes: 9 additions & 7 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use std::future::Future;

use crate::proxyfactory::ProxyFactory;

use crate::drain;
use anyhow::Context;
use prometheus_client::registry::Registry;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;

use anyhow::Context;
use prometheus_client::registry::Registry;
use tokio::task::JoinSet;
use tracing::{warn, Instrument};

Expand All @@ -45,7 +45,7 @@ pub async fn build_with_cert(
// Any component which wants time to gracefully exit should take in a drain_rx clone,
// await drain_rx.signaled(), then cleanup.
// Note: there is still a hard timeout if the draining takes too long
let (drain_tx, drain_rx) = drain::channel();
let (drain_tx, drain_rx) = drain::new();

// Register readiness tasks.
let ready = readiness::Ready::new();
Expand Down Expand Up @@ -320,7 +320,7 @@ fn init_inpod_proxy_mgr(
config: &config::Config,
proxy_gen: ProxyFactory,
ready: readiness::Ready,
drain_rx: drain::Watch,
drain_rx: drain::DrainWatcher,
) -> anyhow::Result<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>> {
let metrics = Arc::new(crate::inpod::metrics::Metrics::new(
registry.sub_registry_with_prefix("workload_manager"),
Expand Down Expand Up @@ -349,7 +349,7 @@ pub struct Bound {
pub udp_dns_proxy_address: Option<SocketAddr>,

pub shutdown: signal::Shutdown,
drain_tx: drain::Signal,
drain_tx: drain::DrainTrigger,
}

impl Bound {
Expand All @@ -359,7 +359,9 @@ impl Bound {

// Start a drain; this will attempt to end all connections
// or itself be interrupted by a stronger TERM signal, whichever comes first.
self.drain_tx.drain().await;
self.drain_tx
.start_drain_and_wait(drain::DrainMode::Graceful)
.await;

Ok(())
}
Expand Down
20 changes: 11 additions & 9 deletions src/dns/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use drain::Watch;
use hickory_proto::error::ProtoErrorKind;
use hickory_proto::op::ResponseCode;
use hickory_proto::rr::rdata::{A, AAAA, CNAME};
Expand Down Expand Up @@ -43,6 +42,7 @@ use crate::dns::metrics::{
};
use crate::dns::name_util::{has_domain, trim_domain};
use crate::dns::resolver::{Answer, Resolver};
use crate::drain::{DrainMode, DrainWatcher};
use crate::metrics::{DeferRecorder, IncrementRecorder, Recorder};
use crate::proxy::Error;
use crate::socket::to_canonical;
Expand All @@ -65,7 +65,7 @@ pub struct Server {
tcp_addr: SocketAddr,
udp_addr: SocketAddr,
server: ServerFuture<dns::handler::Handler>,
drain: Watch,
drain: DrainWatcher,
}

impl Server {
Expand All @@ -85,7 +85,7 @@ impl Server {
state: DemandProxyState,
forwarder: Arc<dyn Forwarder>,
metrics: Arc<Metrics>,
drain: Watch,
drain: DrainWatcher,
socket_factory: &(dyn SocketFactory + Send + Sync),
allow_unknown_source: bool,
) -> Result<Self, Error> {
Expand Down Expand Up @@ -171,9 +171,11 @@ impl Server {
}
}
}
_ = self.drain.signaled() => {
res = self.drain.wait_for_drain() => {
info!("shutting down the DNS server");
let _ = self.server.shutdown_gracefully().await;
if res.mode() == DrainMode::Graceful {
let _ = self.server.shutdown_gracefully().await;
}
}
}
info!("dns server drained");
Expand Down Expand Up @@ -875,7 +877,6 @@ mod tests {
use prometheus_client::registry::Registry;

use super::*;
use crate::strng;
use crate::test_helpers::dns::{
a, aaaa, cname, ip, ipv4, ipv6, n, new_message, new_tcp_client, new_udp_client, run_dns,
send_request, server_request,
Expand All @@ -887,6 +888,7 @@ mod tests {
use crate::xds::istio::workload::Service as XdsService;
use crate::xds::istio::workload::Workload as XdsWorkload;
use crate::xds::istio::workload::{IpFamilies, NetworkAddress as XdsNetworkAddress};
use crate::{drain, strng};
use crate::{metrics, test_helpers};

const NS1: &str = "ns1";
Expand Down Expand Up @@ -1308,7 +1310,7 @@ mod tests {
let domain = "cluster.local".to_string();
let state = state();
let forwarder = forwarder();
let (_signal, drain) = drain::channel();
let (_signal, drain) = drain::new();
let factory = crate::proxy::DefaultSocketFactory;
let proxy = Server::new(
domain,
Expand Down Expand Up @@ -1426,7 +1428,7 @@ mod tests {
.await
.unwrap(),
);
let (_signal, drain) = drain::channel();
let (_signal, drain) = drain::new();
let factory = crate::proxy::DefaultSocketFactory;
let server = Server::new(
domain,
Expand Down Expand Up @@ -1503,7 +1505,7 @@ mod tests {
ips: HashMap::from([(n("large.com."), new_large_response())]),
});
let domain = "cluster.local".to_string();
let (_signal, drain) = drain::channel();
let (_signal, drain) = drain::new();
let factory = crate::proxy::DefaultSocketFactory;
let server = Server::new(
domain,
Expand Down
Loading

0 comments on commit c68a919

Please sign in to comment.