Skip to content

Commit

Permalink
Add test for visibility actor (failing)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jul 2, 2024
1 parent 2076261 commit 95beb16
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ mod tests {

#[tokio::test]
async fn test_process_empty_queue() {
let queue = Arc::new(MemoryQueueForTests::default());
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let batches = process_messages(&mut coordinator, queue, &[]).await;
Expand All @@ -380,7 +380,7 @@ mod tests {

#[tokio::test]
async fn test_process_one_small_message() {
let queue = Arc::new(MemoryQueueForTests::default());
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone());
let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await;
Expand All @@ -394,7 +394,7 @@ mod tests {

#[tokio::test]
async fn test_process_one_big_message() {
let queue = Arc::new(MemoryQueueForTests::default());
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 1;
Expand All @@ -407,7 +407,7 @@ mod tests {

#[tokio::test]
async fn test_process_two_messages_different_compression() {
let queue = Arc::new(MemoryQueueForTests::default());
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await;
Expand All @@ -426,7 +426,7 @@ mod tests {

#[tokio::test]
async fn test_process_local_duplicate_message() {
let queue = Arc::new(MemoryQueueForTests::default());
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await;
Expand All @@ -447,7 +447,7 @@ mod tests {
let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap();
let partition_id = PreProcessedPayload::ObjectUri(test_uri.clone()).partition_id();

let queue = Arc::new(MemoryQueueForTests::default());
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests(
"test-index",
&[(
Expand All @@ -468,7 +468,7 @@ mod tests {

#[tokio::test]
async fn test_process_multiple_coordinator() {
let queue = Arc::new(MemoryQueueForTests::default());
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let mut proc_1 = setup_coordinator(queue.clone(), shared_state.clone());
let mut proc_2 = setup_coordinator(queue.clone(), shared_state.clone());
Expand Down
99 changes: 72 additions & 27 deletions quickwit/quickwit-indexing/src/source/queue_sources/memory_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use anyhow::bail;
use async_trait::async_trait;
use quickwit_storage::OwnedBytes;
use ulid::Ulid;
Expand All @@ -47,56 +48,95 @@ impl fmt::Debug for InnerState {
}

/// A simple in-memory queue
#[derive(Clone, Default, Debug)]
#[derive(Clone, Debug)]
pub struct MemoryQueueForTests {
inner_state: Arc<Mutex<InnerState>>,
}

impl MemoryQueueForTests {
pub fn new() -> Self {
let inner_state = Arc::new(Mutex::new(InnerState::default()));
let inner_weak = Arc::downgrade(&inner_state);
tokio::spawn(async move {
loop {
if let Some(inner_state) = inner_weak.upgrade() {
let mut inner_state = inner_state.lock().unwrap();
let mut expired = Vec::new();
for (ack_id, msg) in inner_state.in_flight.iter() {
if msg.metadata.initial_deadline < Instant::now() {
expired.push(ack_id.clone());
}
}
for ack_id in expired {
let msg = inner_state.in_flight.remove(&ack_id).unwrap();
inner_state.in_queue.push_back(msg);
}
} else {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
});
MemoryQueueForTests {
inner_state: Arc::new(Mutex::new(InnerState::default())),
}
}

pub fn send_message(&self, payload: String, ack_id: &str) {
let message = RawMessage {
payload: OwnedBytes::new(payload.into_bytes()),
metadata: MessageMetadata {
ack_id: ack_id.to_string(),
delivery_attempts: 0,
initial_deadline: Instant::now() + Duration::from_secs(30),
initial_deadline: Instant::now(),
message_id: Ulid::new().to_string(),
},
};
self.inner_state.lock().unwrap().in_queue.push_back(message);
}

/// Returns the next visibility deadline for the message if it is in flight
pub fn next_visibility_deadline(&self, ack_id: &str) -> Option<Instant> {
let inner_state = self.inner_state.lock().unwrap();
inner_state
.in_flight
.get(ack_id)
.map(|msg| msg.metadata.initial_deadline)
}
}

#[async_trait]
impl Queue for MemoryQueueForTests {
async fn receive(
&self,
max_messages: usize,
_suggested_deadline: Duration,
suggested_deadline: Duration,
) -> anyhow::Result<Vec<RawMessage>> {
for _ in 0..3 {
{
let mut inner_state = self.inner_state.lock().unwrap();
let mut response = Vec::new();
while let Some(msg) = inner_state.in_queue.pop_front() {
let msg_cloned = RawMessage {
payload: msg.payload.clone(),
metadata: msg.metadata.clone(),
};
inner_state
.in_flight
.insert(msg.metadata.ack_id.clone(), msg_cloned);
response.push(msg);
if response.len() >= max_messages {
break;
}
}
if !response.is_empty() {
return Ok(response);
{
let mut inner_state = self.inner_state.lock().unwrap();
let mut response = Vec::new();
while let Some(mut msg) = inner_state.in_queue.pop_front() {
msg.metadata.delivery_attempts += 1;
msg.metadata.initial_deadline = Instant::now() + suggested_deadline;
let msg_cloned = RawMessage {
payload: msg.payload.clone(),
metadata: msg.metadata.clone(),
};
inner_state
.in_flight
.insert(msg.metadata.ack_id.clone(), msg_cloned);
response.push(msg);
if response.len() >= max_messages {
break;
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
if !response.is_empty() {
return Ok(response);
}
}
// `sleep` to avoid using all the CPU when called in a loop
tokio::time::sleep(Duration::from_millis(50)).await;

Ok(vec![])
}

Expand All @@ -107,16 +147,21 @@ impl Queue for MemoryQueueForTests {
inner_state.acked.push(msg);
}
}

Ok(())
}

async fn modify_deadlines(
&self,
_ack_id: &str,
ack_id: &str,
suggested_deadline: Duration,
) -> anyhow::Result<Instant> {
// TODO implement deadlines
let mut inner_state = self.inner_state.lock().unwrap();
let in_flight = inner_state.in_flight.get_mut(ack_id);
if let Some(msg) = in_flight {
msg.metadata.initial_deadline = Instant::now() + suggested_deadline;
} else {
bail!("ack_id {} not found in in-flight", ack_id);
}
return Ok(Instant::now() + suggested_deadline);
}
}
Expand All @@ -126,7 +171,7 @@ mod tests {
use super::*;

fn prefilled_queue(nb_message: usize) -> MemoryQueueForTests {
let memory_queue = MemoryQueueForTests::default();
let memory_queue = MemoryQueueForTests::new();
for i in 0..nb_message {
let payload = format!("Test message {}", i);
let ack_id = i.to_string();
Expand Down
93 changes: 93 additions & 0 deletions quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,96 @@ impl Handler<RequestLastExtension> for VisibilityTask {
}
}
}

#[cfg(test)]
mod tests {
use quickwit_actors::Universe;
use tokio::sync::watch;

use super::*;
use crate::source::queue_sources::memory_queue::MemoryQueueForTests;

#[tokio::test]
async fn test_visibility_task_request_last_extension() {
// actor context
let universe = Universe::with_accelerated_time();
let (source_mailbox, _source_inbox) = universe.create_test_mailbox();
let (observable_state_tx, _observable_state_rx) = watch::channel(serde_json::Value::Null);
let ctx: SourceContext =
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);
// queue with test message
let ack_id = "ack_id".to_string();
let queue = MemoryQueueForTests::new();
queue.send_message("test message".to_string(), &ack_id);
let initial_deadline = queue.receive(1, Duration::from_secs(1)).await.unwrap()[0]
.metadata
.initial_deadline;
// spawn task
let visibility_settings = VisibilitySettings {
deadline_for_default_extension: Duration::from_secs(1),
deadline_for_last_extension: Duration::from_secs(20),
deadline_for_receive: Duration::from_secs(1),
request_timeout: Duration::from_millis(100),
request_margin: Duration::from_millis(100),
};
let handle = spawn_visibility_task(
&ctx,
Arc::new(queue.clone()),
ack_id.clone(),
initial_deadline,
visibility_settings.clone(),
);
// assert that the background task performs extensions
assert!(!handle.extension_failed());
tokio::time::sleep_until(initial_deadline.into()).await;
assert!(initial_deadline < queue.next_visibility_deadline(&ack_id).unwrap());
assert!(!handle.extension_failed());
// request last extension
handle
.request_last_extension(Duration::from_secs(5))
.await
.unwrap();
assert!(
Instant::now() + Duration::from_secs(4)
< queue.next_visibility_deadline(&ack_id).unwrap()
);
universe.assert_quit().await;
}

#[tokio::test]
async fn test_visibility_task_stop_on_drop() {
// actor context
let universe = Universe::with_accelerated_time();
let (source_mailbox, _source_inbox) = universe.create_test_mailbox();
let (observable_state_tx, _observable_state_rx) = watch::channel(serde_json::Value::Null);
let ctx: SourceContext =
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);
// queue with test message
let ack_id = "ack_id".to_string();
let queue = MemoryQueueForTests::new();
queue.send_message("test message".to_string(), &ack_id);
let initial_deadline = queue.receive(1, Duration::from_secs(1)).await.unwrap()[0]
.metadata
.initial_deadline;
// spawn task
let visibility_settings = VisibilitySettings {
deadline_for_default_extension: Duration::from_secs(1),
deadline_for_last_extension: Duration::from_secs(20),
deadline_for_receive: Duration::from_secs(1),
request_timeout: Duration::from_millis(100),
request_margin: Duration::from_millis(100),
};
let handle = spawn_visibility_task(
&ctx,
Arc::new(queue.clone()),
ack_id.clone(),
initial_deadline,
visibility_settings.clone(),
);
// assert that visibility is not extended after drop
drop(handle);
tokio::time::sleep_until(initial_deadline.into()).await;
assert_eq!(queue.next_visibility_deadline(&ack_id), None);
universe.assert_quit().await;
}
}

0 comments on commit 95beb16

Please sign in to comment.