From e51b3f43d394bf9f90f2c528e2a4105929e09bb1 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 10 Dec 2024 13:14:30 +0100 Subject: [PATCH] Ensures the affinity function is the same as in Quickwit 0.8 Closes #5576 --- quickwit/quickwit-common/src/lib.rs | 4 + .../quickwit-common/src/rendezvous_hasher.rs | 20 ++-- .../src/socket_addr_legacy_hash.rs | 99 +++++++++++++++++++ .../quickwit-search/src/search_job_placer.rs | 10 +- 4 files changed, 122 insertions(+), 11 deletions(-) create mode 100644 quickwit/quickwit-common/src/socket_addr_legacy_hash.rs diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index dff26829584..199dcf5cc88 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -48,6 +48,10 @@ pub mod tower; pub mod type_map; pub mod uri; +mod socket_addr_legacy_hash; + +pub use socket_addr_legacy_hash::SocketAddrLegacyHash; + use std::env; use std::fmt::{Debug, Display}; use std::future::Future; diff --git a/quickwit/quickwit-common/src/rendezvous_hasher.rs b/quickwit/quickwit-common/src/rendezvous_hasher.rs index 2d3c24efd3f..68dfc7bb581 100644 --- a/quickwit/quickwit-common/src/rendezvous_hasher.rs +++ b/quickwit/quickwit-common/src/rendezvous_hasher.rs @@ -19,7 +19,6 @@ use std::cmp::Reverse; use std::hash::{Hash, Hasher}; - use siphasher::sip::SipHasher; /// Computes the affinity of a node for a given `key`. @@ -42,6 +41,8 @@ pub fn sort_by_rendez_vous_hash(nodes: &mut [T], key: U) { mod tests { use std::net::SocketAddr; + use crate::SocketAddrLegacyHash; + use super::*; fn test_socket_addr(last_byte: u8) -> SocketAddr { @@ -55,17 +56,22 @@ mod tests { let socket3 = test_socket_addr(3); let socket4 = test_socket_addr(4); - let mut socket_set1 = vec![socket4, socket3, socket1, socket2]; + let legacy_socket1 = SocketAddrLegacyHash(&socket1); + let legacy_socket2 = SocketAddrLegacyHash(&socket2); + let legacy_socket3 = SocketAddrLegacyHash(&socket3); + let legacy_socket4 = SocketAddrLegacyHash(&socket4); + + let mut socket_set1 = vec![legacy_socket4, legacy_socket3, legacy_socket1, legacy_socket2]; sort_by_rendez_vous_hash(&mut socket_set1, "key"); - let mut socket_set2 = vec![socket1, socket2, socket4]; + let mut socket_set2 = vec![legacy_socket1, legacy_socket2, legacy_socket4]; sort_by_rendez_vous_hash(&mut socket_set2, "key"); - let mut socket_set3 = vec![socket1, socket4]; + let mut socket_set3 = vec![legacy_socket1, legacy_socket4]; sort_by_rendez_vous_hash(&mut socket_set3, "key"); - assert_eq!(socket_set1, &[socket1, socket3, socket2, socket4]); - assert_eq!(socket_set2, &[socket1, socket2, socket4]); - assert_eq!(socket_set3, &[socket1, socket4]); + assert_eq!(socket_set1, &[legacy_socket1, legacy_socket2, legacy_socket3, legacy_socket4]); + assert_eq!(socket_set2, &[legacy_socket1, legacy_socket2, legacy_socket4]); + assert_eq!(socket_set3, &[legacy_socket1, legacy_socket4]); } } diff --git a/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs b/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs new file mode 100644 index 00000000000..cda848297a4 --- /dev/null +++ b/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs @@ -0,0 +1,99 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::hash::Hasher; +use std::net::SocketAddr; + +/// Computes the hash of socket addr, the way it was done before Rust 1.81 +/// +/// In https://github.com/rust-lang/rust/commit/ba620344301aaa3b2733575a0696cdfd877edbdf +/// rustc change the implementation of Hash for IpAddr v4 and v6. +/// +/// The idea was to not hash an array of bytes but instead interpret it as a register +/// and hash this. +/// +/// This was done for performance reason, but this change the result of the hash function +/// used to compute affinity in quickwit. As a result, the switch would invalidate all +/// existing cache. +/// +/// In order to avoid this, we introduce the following function that reproduces the old +/// behavior. +#[repr(transparent)] +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct SocketAddrLegacyHash<'a>(pub &'a SocketAddr); + +impl<'a> std::hash::Hash for SocketAddrLegacyHash<'a> { + fn hash(&self, state: &mut H) { + std::mem::discriminant(self.0).hash(state); + match self.0 { + SocketAddr::V4(socket_addr_v4) => { + socket_addr_v4.ip().octets().hash(state); + socket_addr_v4.port().hash(state); + } + SocketAddr::V6(socket_addr_v6) => { + socket_addr_v6.ip().octets().hash(state); + socket_addr_v6.port().hash(state); + socket_addr_v6.flowinfo().hash(state); + socket_addr_v6.scope_id().hash(state); + }, + } + } +} + + + +#[cfg(test)] +mod tests { + use std::net::SocketAddrV6; + use super::*; + + fn sample_socket_addr_v4() -> SocketAddr { + "17.12.15.3:1834".parse().unwrap() + } + + fn sample_socket_addr_v6() -> SocketAddr { + let mut socket_addr_v6: SocketAddrV6 = "[fe80::240:63ff:fede:3c19]:8080".parse().unwrap(); + socket_addr_v6.set_scope_id(4047u32); + socket_addr_v6.set_flowinfo(303u32); + socket_addr_v6.into() + } + + fn compute_hash(hashable: impl std::hash::Hash) -> u64 { + // I wish I could have used the sip hasher but we don't have the deps here and I did + // not want to move that code to quickwit-common. + // + // If test break because rust changed its default hasher, we can just update the tests in this + // file with the new values. + let mut hasher = siphasher::sip::SipHasher::default(); + hashable.hash(&mut hasher); + hasher.finish() + } + + #[test] + fn test_legacy_hash_socket_addr_v4() { + let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v4())); + assert_eq!(h, 8725442259486497862); + } + + #[test] + fn test_legacy_hash_socket_addr_v6() { + let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v6())); + assert_eq!(h, 14277248675058176752); + } +} diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index eb15513a76c..f832ca1101a 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -30,6 +30,7 @@ use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; use tracing::{info, warn}; +use quickwit_common::SocketAddrLegacyHash; use crate::{SearchJob, SearchServiceClient, SearcherPool, SEARCH_METRICS}; /// Job. @@ -64,6 +65,7 @@ pub struct SearchJobPlacer { searcher_pool: SearcherPool, } + #[async_trait] impl EventSubscriber for SearchJobPlacer { async fn handle_event(&mut self, evt: ReportSplitsRequest) { @@ -77,7 +79,7 @@ impl EventSubscriber for SearchJobPlacer { for report_split in evt.report_splits { let node_addr = nodes .keys() - .max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id)) + .max_by_key(|node_addr| node_affinity(SocketAddrLegacyHash(*node_addr), &report_split.split_id)) // This actually never happens thanks to the if-condition at the // top of this function. .expect("`nodes` should not be empty"); @@ -115,7 +117,7 @@ struct SocketAddrAndClient { impl Hash for SocketAddrAndClient { fn hash(&self, hasher: &mut H) { - self.socket_addr.hash(hasher); + SocketAddrLegacyHash(&self.socket_addr).hash(hasher); } } @@ -174,7 +176,7 @@ impl SearchJobPlacer { all_nodes.len() ); } - let mut candidate_nodes: Vec<_> = all_nodes + let mut candidate_nodes: Vec = all_nodes .into_iter() .map(|(grpc_addr, client)| CandidateNode { grpc_addr, @@ -259,7 +261,7 @@ struct CandidateNode { impl Hash for CandidateNode { fn hash(&self, state: &mut H) { - self.grpc_addr.hash(state); + SocketAddrLegacyHash(&self.grpc_addr).hash(state); } }