From d916e2c4aa57b20b63155049af7f39751e0da46a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 10 Dec 2024 13:14:30 +0100 Subject: [PATCH] Ensures the affinity function is the same as in Quickwit 0.8 Closes #5576 --- quickwit/quickwit-common/src/lib.rs | 3 + .../quickwit-common/src/rendezvous_hasher.rs | 34 ++++-- .../src/socket_addr_legacy_hash.rs | 100 ++++++++++++++++++ .../quickwit-search/src/search_job_placer.rs | 11 +- 4 files changed, 138 insertions(+), 10 deletions(-) create mode 100644 quickwit/quickwit-common/src/socket_addr_legacy_hash.rs diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index dff26829584..fbda5acbacb 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -48,6 +48,8 @@ pub mod tower; pub mod type_map; pub mod uri; +mod socket_addr_legacy_hash; + use std::env; use std::fmt::{Debug, Display}; use std::future::Future; @@ -58,6 +60,7 @@ pub use coolid::new_coolid; pub use kill_switch::KillSwitch; pub use path_hasher::PathHasher; pub use progress::{Progress, ProtectedZoneGuard}; +pub use socket_addr_legacy_hash::SocketAddrLegacyHash; pub use stream_utils::{BoxStream, ServiceStream}; use tracing::{error, info}; diff --git a/quickwit/quickwit-common/src/rendezvous_hasher.rs b/quickwit/quickwit-common/src/rendezvous_hasher.rs index 2d3c24efd3f..aadfd314768 100644 --- a/quickwit/quickwit-common/src/rendezvous_hasher.rs +++ b/quickwit/quickwit-common/src/rendezvous_hasher.rs @@ -43,6 +43,7 @@ mod tests { use std::net::SocketAddr; use super::*; + use crate::SocketAddrLegacyHash; fn test_socket_addr(last_byte: u8) -> SocketAddr { ([127, 0, 0, last_byte], 10_000u16).into() @@ -55,17 +56,38 @@ mod tests { let socket3 = test_socket_addr(3); let socket4 = test_socket_addr(4); - let mut socket_set1 = vec![socket4, socket3, socket1, socket2]; + let legacy_socket1 = SocketAddrLegacyHash(&socket1); + let legacy_socket2 = SocketAddrLegacyHash(&socket2); + let legacy_socket3 = SocketAddrLegacyHash(&socket3); + let legacy_socket4 = SocketAddrLegacyHash(&socket4); + + let mut socket_set1 = vec![ + legacy_socket4, + legacy_socket3, + legacy_socket1, + legacy_socket2, + ]; sort_by_rendez_vous_hash(&mut socket_set1, "key"); - let mut socket_set2 = vec![socket1, socket2, socket4]; + let mut socket_set2 = vec![legacy_socket1, legacy_socket2, legacy_socket4]; sort_by_rendez_vous_hash(&mut socket_set2, "key"); - let mut socket_set3 = vec![socket1, socket4]; + let mut socket_set3 = vec![legacy_socket1, legacy_socket4]; sort_by_rendez_vous_hash(&mut socket_set3, "key"); - assert_eq!(socket_set1, &[socket1, socket3, socket2, socket4]); - assert_eq!(socket_set2, &[socket1, socket2, socket4]); - assert_eq!(socket_set3, &[socket1, socket4]); + assert_eq!( + socket_set1, + &[ + legacy_socket1, + legacy_socket2, + legacy_socket3, + legacy_socket4 + ] + ); + assert_eq!( + socket_set2, + &[legacy_socket1, legacy_socket2, legacy_socket4] + ); + assert_eq!(socket_set3, &[legacy_socket1, legacy_socket4]); } } diff --git a/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs b/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs new file mode 100644 index 00000000000..48da5d05bc6 --- /dev/null +++ b/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs @@ -0,0 +1,100 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::hash::Hasher; +use std::net::SocketAddr; + +/// Computes the hash of socket addr, the way it was done before Rust 1.81 +/// +/// In https://github.com/rust-lang/rust/commit/ba620344301aaa3b2733575a0696cdfd877edbdf +/// rustc change the implementation of Hash for IpAddr v4 and v6. +/// +/// The idea was to not hash an array of bytes but instead interpret it as a register +/// and hash this. +/// +/// This was done for performance reason, but this change the result of the hash function +/// used to compute affinity in quickwit. As a result, the switch would invalidate all +/// existing cache. +/// +/// In order to avoid this, we introduce the following function that reproduces the old +/// behavior. +#[repr(transparent)] +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct SocketAddrLegacyHash<'a>(pub &'a SocketAddr); + +impl<'a> std::hash::Hash for SocketAddrLegacyHash<'a> { + fn hash(&self, state: &mut H) { + std::mem::discriminant(self.0).hash(state); + match self.0 { + SocketAddr::V4(socket_addr_v4) => { + socket_addr_v4.ip().octets().hash(state); + socket_addr_v4.port().hash(state); + } + SocketAddr::V6(socket_addr_v6) => { + socket_addr_v6.ip().octets().hash(state); + socket_addr_v6.port().hash(state); + socket_addr_v6.flowinfo().hash(state); + socket_addr_v6.scope_id().hash(state); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddrV6; + + use super::*; + + fn sample_socket_addr_v4() -> SocketAddr { + "17.12.15.3:1834".parse().unwrap() + } + + fn sample_socket_addr_v6() -> SocketAddr { + let mut socket_addr_v6: SocketAddrV6 = "[fe80::240:63ff:fede:3c19]:8080".parse().unwrap(); + socket_addr_v6.set_scope_id(4047u32); + socket_addr_v6.set_flowinfo(303u32); + socket_addr_v6.into() + } + + fn compute_hash(hashable: impl std::hash::Hash) -> u64 { + // I wish I could have used the sip hasher but we don't have the deps here and I did + // not want to move that code to quickwit-common. + // + // If test break because rust changed its default hasher, we can just update the tests in + // this file with the new values. + let mut hasher = siphasher::sip::SipHasher::default(); + hashable.hash(&mut hasher); + hasher.finish() + } + + #[test] + fn test_legacy_hash_socket_addr_v4() { + let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v4())); + // This value is coming from using rust 1.80 to hash socket addr + assert_eq!(h, 8725442259486497862); + } + + #[test] + fn test_legacy_hash_socket_addr_v6() { + let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v6())); + // This value is coming from using rust 1.80 to hash socket addr + assert_eq!(h, 14277248675058176752); + } +} diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index eb15513a76c..62e7224eec4 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -27,6 +27,7 @@ use anyhow::bail; use async_trait::async_trait; use quickwit_common::pubsub::EventSubscriber; use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash}; +use quickwit_common::SocketAddrLegacyHash; use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; use tracing::{info, warn}; @@ -77,7 +78,9 @@ impl EventSubscriber for SearchJobPlacer { for report_split in evt.report_splits { let node_addr = nodes .keys() - .max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id)) + .max_by_key(|node_addr| { + node_affinity(SocketAddrLegacyHash(*node_addr), &report_split.split_id) + }) // This actually never happens thanks to the if-condition at the // top of this function. .expect("`nodes` should not be empty"); @@ -115,7 +118,7 @@ struct SocketAddrAndClient { impl Hash for SocketAddrAndClient { fn hash(&self, hasher: &mut H) { - self.socket_addr.hash(hasher); + SocketAddrLegacyHash(&self.socket_addr).hash(hasher); } } @@ -174,7 +177,7 @@ impl SearchJobPlacer { all_nodes.len() ); } - let mut candidate_nodes: Vec<_> = all_nodes + let mut candidate_nodes: Vec = all_nodes .into_iter() .map(|(grpc_addr, client)| CandidateNode { grpc_addr, @@ -259,7 +262,7 @@ struct CandidateNode { impl Hash for CandidateNode { fn hash(&self, state: &mut H) { - self.grpc_addr.hash(state); + SocketAddrLegacyHash(&self.grpc_addr).hash(state); } }