Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close local shards and delete mrecordlog queues on ingester startup #3895

Merged
merged 1 commit into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ matches = "0.1.9"
md5 = "0.7"
mime_guess = "2.0.4"
mockall = "0.11"
mrecordlog = "0.4"
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "ae1ca3f" }
guilload marked this conversation as resolved.
Show resolved Hide resolved
new_string_template = "1.4.0"
nom = "7.1.3"
num_cpus = "1"
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-codegen/example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use tower::{Layer, Service};

pub use crate::error::HelloError;
pub use crate::hello::*;
use crate::hello::{Hello, HelloRequest, HelloResponse};
guilload marked this conversation as resolved.
Show resolved Hide resolved

pub type HelloResult<T> = Result<T, HelloError>;

Expand Down
5 changes: 1 addition & 4 deletions quickwit/quickwit-common/src/tower/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,7 @@ impl<R> fmt::Debug for BufferLayer<R> {

impl<R> Clone for BufferLayer<R> {
fn clone(&self) -> Self {
Self {
bound: self.bound,
_phantom: PhantomData,
}
*self
}
}

Expand Down
154 changes: 96 additions & 58 deletions quickwit/quickwit-common/src/tower/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ use std::cmp::{Eq, PartialEq};
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use futures::{Stream, StreamExt};
use tokio::sync::RwLock;

use super::Change;

/// A pool of `V` values identified by `K` keys. The pool can be updated manually by calling the
/// `add/remove` methods or by listening to a stream of changes.
pub struct Pool<K, V> {
inner: Arc<RwLock<InnerPool<K, V>>>,
pool: Arc<RwLock<HashMap<K, V>>>,
}

impl<K, V> fmt::Debug for Pool<K, V>
Expand All @@ -49,26 +48,21 @@ where
impl<K, V> Clone for Pool<K, V> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
pool: self.pool.clone(),
}
}
}

impl<K, V> Default for Pool<K, V> {
impl<K, V> Default for Pool<K, V>
where K: Eq + PartialEq + Hash
{
fn default() -> Self {
let inner = InnerPool {
map: HashMap::new(),
};
Self {
inner: Arc::new(RwLock::new(inner)),
pool: Arc::new(RwLock::new(HashMap::default())),
}
}
}

struct InnerPool<K, V> {
map: HashMap<K, V>,
}

impl<K, V> Pool<K, V>
where
K: Eq + PartialEq + Hash + Clone + Send + Sync + 'static,
Expand All @@ -85,10 +79,10 @@ where
.for_each(|change| async {
match change {
Change::Insert(key, service) => {
pool.insert(key, service).await;
pool.insert(key, service);
}
Change::Remove(key) => {
pool.remove(&key).await;
pool.remove(&key);
}
}
})
Expand All @@ -98,59 +92,97 @@ where
}

/// Returns whether the pool is empty.
pub async fn is_empty(&self) -> bool {
self.inner.read().await.map.is_empty()
pub fn is_empty(&self) -> bool {
self.pool
.read()
.expect("lock should not be poisoned")
.is_empty()
}

/// Returns the number of values in the pool.
pub async fn len(&self) -> usize {
self.inner.read().await.map.len()
pub fn len(&self) -> usize {
self.pool.read().expect("lock should not be poisoned").len()
}

/// Returns all the keys in the pool.
pub async fn keys(&self) -> Vec<K> {
self.inner.read().await.map.keys().cloned().collect()
pub fn keys(&self) -> Vec<K> {
self.pool
.read()
.expect("lock should not be poisoned")
.iter()
.map(|(key, _)| key.clone())
.collect()
}

/// Returns all the values in the pool.
pub fn values(&self) -> Vec<V> {
self.pool
.read()
.expect("lock should not be poisoned")
.iter()
.map(|(_, value)| value.clone())
.collect()
}

/// Returns all the key-value pairs in the pool.
pub async fn all(&self) -> Vec<(K, V)> {
self.inner
pub fn pairs(&self) -> Vec<(K, V)> {
self.pool
.read()
.await
.map
.expect("lock should not be poisoned")
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
}

/// Returns the value associated with the given key.
pub async fn get<Q>(&self, key: &Q) -> Option<V>
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
Q: Hash + Eq + ?Sized,
K: Borrow<Q>,
{
self.inner.read().await.map.get(key).cloned()
self.pool
.read()
.expect("lock should not be poisoned")
.contains_key(key)
}

/// Finds a key in the pool that satisfies the given predicate.
pub async fn find(&self, func: impl Fn(&K) -> bool) -> Option<K> {
self.inner
/// Returns the value associated with the given key.
pub fn get<Q>(&self, key: &Q) -> Option<V>
where
Q: Hash + Eq + ?Sized,
K: Borrow<Q>,
{
self.pool
.read()
.await
.map
.keys()
.find(|k| func(k))
.expect("lock should not be poisoned")
.get(key)
.cloned()
}

/// Finds a key in the pool that satisfies the given predicate.
pub fn find(&self, func: impl Fn(&K, &V) -> bool) -> Option<(K, V)> {
self.pool
.read()
.expect("lock should not be poisoned")
.iter()
.find(|(key, value)| func(key, value))
.map(|(key, value)| (key.clone(), value.clone()))
}

/// Adds a value to the pool.
pub async fn insert(&self, key: K, service: V) {
self.inner.write().await.map.insert(key, service);
pub fn insert(&self, key: K, service: V) {
self.pool
.write()
.expect("lock should not be poisoned")
.insert(key, service);
}

/// Removes a value from the pool.
pub async fn remove(&self, key: &K) {
self.inner.write().await.map.remove(key);
pub fn remove(&self, key: &K) {
self.pool
.write()
.expect("lock should not be poisoned")
.remove(key);
}
}

Expand All @@ -159,11 +191,8 @@ where K: Eq + PartialEq + Hash
{
fn from_iter<I>(iter: I) -> Self
where I: IntoIterator<Item = (K, V)> {
let key_values = HashMap::from_iter(iter);
let inner = InnerPool { map: key_values };

Self {
inner: Arc::new(RwLock::new(inner)),
pool: Arc::new(RwLock::new(HashMap::from_iter(iter))),
}
}
}
Expand All @@ -180,39 +209,48 @@ mod tests {
async fn test_pool() {
let (change_stream_tx, change_stream_rx) = tokio::sync::mpsc::channel(10);
let change_stream = ReceiverStream::new(change_stream_rx);

let pool = Pool::default();
pool.listen_for_changes(change_stream);
assert!(pool.is_empty().await);
assert_eq!(pool.len().await, 0);

assert!(pool.is_empty());
assert_eq!(pool.len(), 0);

change_stream_tx.send(Change::Insert(1, 11)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert!(!pool.is_empty().await);
assert_eq!(pool.len().await, 1);
assert_eq!(pool.get(&1).await, Some(11));

assert!(!pool.is_empty());
assert_eq!(pool.len(), 1);

assert!(pool.contains_key(&1));
assert_eq!(pool.get(&1), Some(11));

change_stream_tx.send(Change::Insert(2, 21)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(pool.len().await, 2);
assert_eq!(pool.get(&2).await, Some(21));

assert_eq!(pool.find(|k| *k == 1).await, Some(1));
assert_eq!(pool.len(), 2);
assert_eq!(pool.get(&2), Some(21));

assert_eq!(pool.find(|k, _| *k == 1), Some((1, 11)));

let mut all_nodes = pool.all().await;
all_nodes.sort();
assert_eq!(all_nodes, vec![(1, 11), (2, 21)]);
let mut pairs = pool.pairs();
pairs.sort();

assert_eq!(pairs, vec![(1, 11), (2, 21)]);

change_stream_tx.send(Change::Insert(1, 12)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(pool.get(&1).await, Some(12));

assert_eq!(pool.get(&1), Some(12));

change_stream_tx.send(Change::Remove(1)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(pool.len().await, 1);

assert_eq!(pool.len(), 1);

change_stream_tx.send(Change::Remove(2)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert!(pool.is_empty().await);
assert_eq!(pool.len().await, 0);

assert!(pool.is_empty());
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl IndexConfig {
default_search_fields: vec![
"body".to_string(),
r#"attributes.server"#.to_string(),
r#"attributes.server\.status"#.to_string(),
r"attributes.server\.status".to_string(),
],
};
IndexConfig {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ documentation = "https://quickwit.io/docs/"
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
fnv = { workspace = true }
dyn-clone = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
Expand Down
Loading