diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs
index dff26829584..fbda5acbacb 100644
--- a/quickwit/quickwit-common/src/lib.rs
+++ b/quickwit/quickwit-common/src/lib.rs
@@ -48,6 +48,8 @@ pub mod tower;
pub mod type_map;
pub mod uri;
+mod socket_addr_legacy_hash;
+
use std::env;
use std::fmt::{Debug, Display};
use std::future::Future;
@@ -58,6 +60,7 @@ pub use coolid::new_coolid;
pub use kill_switch::KillSwitch;
pub use path_hasher::PathHasher;
pub use progress::{Progress, ProtectedZoneGuard};
+pub use socket_addr_legacy_hash::SocketAddrLegacyHash;
pub use stream_utils::{BoxStream, ServiceStream};
use tracing::{error, info};
diff --git a/quickwit/quickwit-common/src/rendezvous_hasher.rs b/quickwit/quickwit-common/src/rendezvous_hasher.rs
index 2d3c24efd3f..aadfd314768 100644
--- a/quickwit/quickwit-common/src/rendezvous_hasher.rs
+++ b/quickwit/quickwit-common/src/rendezvous_hasher.rs
@@ -43,6 +43,7 @@ mod tests {
use std::net::SocketAddr;
use super::*;
+ use crate::SocketAddrLegacyHash;
fn test_socket_addr(last_byte: u8) -> SocketAddr {
([127, 0, 0, last_byte], 10_000u16).into()
@@ -55,17 +56,38 @@ mod tests {
let socket3 = test_socket_addr(3);
let socket4 = test_socket_addr(4);
- let mut socket_set1 = vec![socket4, socket3, socket1, socket2];
+ let legacy_socket1 = SocketAddrLegacyHash(&socket1);
+ let legacy_socket2 = SocketAddrLegacyHash(&socket2);
+ let legacy_socket3 = SocketAddrLegacyHash(&socket3);
+ let legacy_socket4 = SocketAddrLegacyHash(&socket4);
+
+ let mut socket_set1 = vec![
+ legacy_socket4,
+ legacy_socket3,
+ legacy_socket1,
+ legacy_socket2,
+ ];
sort_by_rendez_vous_hash(&mut socket_set1, "key");
- let mut socket_set2 = vec![socket1, socket2, socket4];
+ let mut socket_set2 = vec![legacy_socket1, legacy_socket2, legacy_socket4];
sort_by_rendez_vous_hash(&mut socket_set2, "key");
- let mut socket_set3 = vec![socket1, socket4];
+ let mut socket_set3 = vec![legacy_socket1, legacy_socket4];
sort_by_rendez_vous_hash(&mut socket_set3, "key");
- assert_eq!(socket_set1, &[socket1, socket3, socket2, socket4]);
- assert_eq!(socket_set2, &[socket1, socket2, socket4]);
- assert_eq!(socket_set3, &[socket1, socket4]);
+ assert_eq!(
+ socket_set1,
+ &[
+ legacy_socket1,
+ legacy_socket2,
+ legacy_socket3,
+ legacy_socket4
+ ]
+ );
+ assert_eq!(
+ socket_set2,
+ &[legacy_socket1, legacy_socket2, legacy_socket4]
+ );
+ assert_eq!(socket_set3, &[legacy_socket1, legacy_socket4]);
}
}
diff --git a/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs b/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs
new file mode 100644
index 00000000000..0adadf8f2e4
--- /dev/null
+++ b/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs
@@ -0,0 +1,100 @@
+// Copyright (C) 2024 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::hash::Hasher;
+use std::net::SocketAddr;
+
+/// Computes the hash of socket addr, the way it was done before Rust 1.81
+///
+/// In
+/// rustc change the implementation of Hash for IpAddr v4 and v6.
+///
+/// The idea was to not hash an array of bytes but instead interpret it as a register
+/// and hash this.
+///
+/// This was done for performance reason, but this change the result of the hash function
+/// used to compute affinity in quickwit. As a result, the switch would invalidate all
+/// existing cache.
+///
+/// In order to avoid this, we introduce the following function that reproduces the old
+/// behavior.
+#[repr(transparent)]
+#[derive(Debug, Eq, PartialEq, Copy, Clone)]
+pub struct SocketAddrLegacyHash<'a>(pub &'a SocketAddr);
+
+impl<'a> std::hash::Hash for SocketAddrLegacyHash<'a> {
+ fn hash(&self, state: &mut H) {
+ std::mem::discriminant(self.0).hash(state);
+ match self.0 {
+ SocketAddr::V4(socket_addr_v4) => {
+ socket_addr_v4.ip().octets().hash(state);
+ socket_addr_v4.port().hash(state);
+ }
+ SocketAddr::V6(socket_addr_v6) => {
+ socket_addr_v6.ip().octets().hash(state);
+ socket_addr_v6.port().hash(state);
+ socket_addr_v6.flowinfo().hash(state);
+ socket_addr_v6.scope_id().hash(state);
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::net::SocketAddrV6;
+
+ use super::*;
+
+ fn sample_socket_addr_v4() -> SocketAddr {
+ "17.12.15.3:1834".parse().unwrap()
+ }
+
+ fn sample_socket_addr_v6() -> SocketAddr {
+ let mut socket_addr_v6: SocketAddrV6 = "[fe80::240:63ff:fede:3c19]:8080".parse().unwrap();
+ socket_addr_v6.set_scope_id(4047u32);
+ socket_addr_v6.set_flowinfo(303u32);
+ socket_addr_v6.into()
+ }
+
+ fn compute_hash(hashable: impl std::hash::Hash) -> u64 {
+ // I wish I could have used the sip hasher but we don't have the deps here and I did
+ // not want to move that code to quickwit-common.
+ //
+ // If test break because rust changed its default hasher, we can just update the tests in
+ // this file with the new values.
+ let mut hasher = siphasher::sip::SipHasher::default();
+ hashable.hash(&mut hasher);
+ hasher.finish()
+ }
+
+ #[test]
+ fn test_legacy_hash_socket_addr_v4() {
+ let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v4()));
+ // This value is coming from using rust 1.80 to hash socket addr
+ assert_eq!(h, 8725442259486497862);
+ }
+
+ #[test]
+ fn test_legacy_hash_socket_addr_v6() {
+ let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v6()));
+ // This value is coming from using rust 1.80 to hash socket addr
+ assert_eq!(h, 14277248675058176752);
+ }
+}
diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs
index eb15513a76c..7dd21f3d875 100644
--- a/quickwit/quickwit-search/src/search_job_placer.rs
+++ b/quickwit/quickwit-search/src/search_job_placer.rs
@@ -27,6 +27,7 @@ use anyhow::bail;
use async_trait::async_trait;
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash};
+use quickwit_common::SocketAddrLegacyHash;
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};
use tracing::{info, warn};
@@ -77,7 +78,9 @@ impl EventSubscriber for SearchJobPlacer {
for report_split in evt.report_splits {
let node_addr = nodes
.keys()
- .max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id))
+ .max_by_key(|node_addr| {
+ node_affinity(SocketAddrLegacyHash(node_addr), &report_split.split_id)
+ })
// This actually never happens thanks to the if-condition at the
// top of this function.
.expect("`nodes` should not be empty");
@@ -115,7 +118,7 @@ struct SocketAddrAndClient {
impl Hash for SocketAddrAndClient {
fn hash(&self, hasher: &mut H) {
- self.socket_addr.hash(hasher);
+ SocketAddrLegacyHash(&self.socket_addr).hash(hasher);
}
}
@@ -174,7 +177,7 @@ impl SearchJobPlacer {
all_nodes.len()
);
}
- let mut candidate_nodes: Vec<_> = all_nodes
+ let mut candidate_nodes: Vec = all_nodes
.into_iter()
.map(|(grpc_addr, client)| CandidateNode {
grpc_addr,
@@ -259,7 +262,7 @@ struct CandidateNode {
impl Hash for CandidateNode {
fn hash(&self, state: &mut H) {
- self.grpc_addr.hash(state);
+ SocketAddrLegacyHash(&self.grpc_addr).hash(state);
}
}