Skip to content

Commit

Permalink
Ensures the affinity function is the same as in Quickwit 0.8
Browse files Browse the repository at this point in the history
Closes #5576
  • Loading branch information
fulmicoton committed Dec 10, 2024
1 parent 7ec03f9 commit e51b3f4
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 11 deletions.
4 changes: 4 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ pub mod tower;
pub mod type_map;
pub mod uri;

mod socket_addr_legacy_hash;

pub use socket_addr_legacy_hash::SocketAddrLegacyHash;

use std::env;
use std::fmt::{Debug, Display};
use std::future::Future;
Expand Down
20 changes: 13 additions & 7 deletions quickwit/quickwit-common/src/rendezvous_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use std::cmp::Reverse;
use std::hash::{Hash, Hasher};

use siphasher::sip::SipHasher;

/// Computes the affinity of a node for a given `key`.
Expand All @@ -42,6 +41,8 @@ pub fn sort_by_rendez_vous_hash<T: Hash, U: Hash>(nodes: &mut [T], key: U) {
mod tests {
use std::net::SocketAddr;

use crate::SocketAddrLegacyHash;

use super::*;

fn test_socket_addr(last_byte: u8) -> SocketAddr {
Expand All @@ -55,17 +56,22 @@ mod tests {
let socket3 = test_socket_addr(3);
let socket4 = test_socket_addr(4);

let mut socket_set1 = vec![socket4, socket3, socket1, socket2];
let legacy_socket1 = SocketAddrLegacyHash(&socket1);
let legacy_socket2 = SocketAddrLegacyHash(&socket2);
let legacy_socket3 = SocketAddrLegacyHash(&socket3);
let legacy_socket4 = SocketAddrLegacyHash(&socket4);

let mut socket_set1 = vec![legacy_socket4, legacy_socket3, legacy_socket1, legacy_socket2];
sort_by_rendez_vous_hash(&mut socket_set1, "key");

let mut socket_set2 = vec![socket1, socket2, socket4];
let mut socket_set2 = vec![legacy_socket1, legacy_socket2, legacy_socket4];
sort_by_rendez_vous_hash(&mut socket_set2, "key");

let mut socket_set3 = vec![socket1, socket4];
let mut socket_set3 = vec![legacy_socket1, legacy_socket4];
sort_by_rendez_vous_hash(&mut socket_set3, "key");

assert_eq!(socket_set1, &[socket1, socket3, socket2, socket4]);
assert_eq!(socket_set2, &[socket1, socket2, socket4]);
assert_eq!(socket_set3, &[socket1, socket4]);
assert_eq!(socket_set1, &[legacy_socket1, legacy_socket2, legacy_socket3, legacy_socket4]);
assert_eq!(socket_set2, &[legacy_socket1, legacy_socket2, legacy_socket4]);
assert_eq!(socket_set3, &[legacy_socket1, legacy_socket4]);
}
}
99 changes: 99 additions & 0 deletions quickwit/quickwit-common/src/socket_addr_legacy_hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::hash::Hasher;
use std::net::SocketAddr;

/// Computes the hash of socket addr, the way it was done before Rust 1.81
///
/// In https://github.com/rust-lang/rust/commit/ba620344301aaa3b2733575a0696cdfd877edbdf
/// rustc change the implementation of Hash for IpAddr v4 and v6.
///
/// The idea was to not hash an array of bytes but instead interpret it as a register
/// and hash this.
///
/// This was done for performance reason, but this change the result of the hash function
/// used to compute affinity in quickwit. As a result, the switch would invalidate all
/// existing cache.
///
/// In order to avoid this, we introduce the following function that reproduces the old
/// behavior.
#[repr(transparent)]
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub struct SocketAddrLegacyHash<'a>(pub &'a SocketAddr);

impl<'a> std::hash::Hash for SocketAddrLegacyHash<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
std::mem::discriminant(self.0).hash(state);
match self.0 {
SocketAddr::V4(socket_addr_v4) => {
socket_addr_v4.ip().octets().hash(state);
socket_addr_v4.port().hash(state);
}
SocketAddr::V6(socket_addr_v6) => {
socket_addr_v6.ip().octets().hash(state);
socket_addr_v6.port().hash(state);
socket_addr_v6.flowinfo().hash(state);
socket_addr_v6.scope_id().hash(state);
},
}
}
}



#[cfg(test)]
mod tests {
use std::net::SocketAddrV6;
use super::*;

fn sample_socket_addr_v4() -> SocketAddr {
"17.12.15.3:1834".parse().unwrap()
}

fn sample_socket_addr_v6() -> SocketAddr {
let mut socket_addr_v6: SocketAddrV6 = "[fe80::240:63ff:fede:3c19]:8080".parse().unwrap();
socket_addr_v6.set_scope_id(4047u32);
socket_addr_v6.set_flowinfo(303u32);
socket_addr_v6.into()
}

fn compute_hash(hashable: impl std::hash::Hash) -> u64 {
// I wish I could have used the sip hasher but we don't have the deps here and I did
// not want to move that code to quickwit-common.
//
// If test break because rust changed its default hasher, we can just update the tests in this
// file with the new values.
let mut hasher = siphasher::sip::SipHasher::default();
hashable.hash(&mut hasher);
hasher.finish()
}

#[test]
fn test_legacy_hash_socket_addr_v4() {
let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v4()));
assert_eq!(h, 8725442259486497862);
}

#[test]
fn test_legacy_hash_socket_addr_v6() {
let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v6()));
assert_eq!(h, 14277248675058176752);
}
}
10 changes: 6 additions & 4 deletions quickwit/quickwit-search/src/search_job_placer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};
use tracing::{info, warn};

use quickwit_common::SocketAddrLegacyHash;
use crate::{SearchJob, SearchServiceClient, SearcherPool, SEARCH_METRICS};

/// Job.
Expand Down Expand Up @@ -64,6 +65,7 @@ pub struct SearchJobPlacer {
searcher_pool: SearcherPool,
}


#[async_trait]
impl EventSubscriber<ReportSplitsRequest> for SearchJobPlacer {
async fn handle_event(&mut self, evt: ReportSplitsRequest) {
Expand All @@ -77,7 +79,7 @@ impl EventSubscriber<ReportSplitsRequest> for SearchJobPlacer {
for report_split in evt.report_splits {
let node_addr = nodes
.keys()
.max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id))
.max_by_key(|node_addr| node_affinity(SocketAddrLegacyHash(*node_addr), &report_split.split_id))
// This actually never happens thanks to the if-condition at the
// top of this function.
.expect("`nodes` should not be empty");
Expand Down Expand Up @@ -115,7 +117,7 @@ struct SocketAddrAndClient {

impl Hash for SocketAddrAndClient {
fn hash<H: Hasher>(&self, hasher: &mut H) {
self.socket_addr.hash(hasher);
SocketAddrLegacyHash(&self.socket_addr).hash(hasher);
}
}

Expand Down Expand Up @@ -174,7 +176,7 @@ impl SearchJobPlacer {
all_nodes.len()
);
}
let mut candidate_nodes: Vec<_> = all_nodes
let mut candidate_nodes: Vec<CandidateNode> = all_nodes
.into_iter()
.map(|(grpc_addr, client)| CandidateNode {
grpc_addr,
Expand Down Expand Up @@ -259,7 +261,7 @@ struct CandidateNode {

impl Hash for CandidateNode {
fn hash<H: Hasher>(&self, state: &mut H) {
self.grpc_addr.hash(state);
SocketAddrLegacyHash(&self.grpc_addr).hash(state);
}
}

Expand Down

0 comments on commit e51b3f4

Please sign in to comment.