diff --git a/docker-compose.yml b/docker-compose.yml index 45aa468d296..eafe854900d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,8 +39,6 @@ services: environment: SERVICES: kinesis,s3,sqs PERSISTENCE: 1 - # avoid using the localstack.cloud domain in the generated queue URLs - SQS_ENDPOINT_STRATEGY: path volumes: - .localstack:/etc/localstack/init/ready.d - localstack_data:/var/lib/localstack diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 359c9c57a4e..29906d39cc6 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -934,6 +934,20 @@ mod tests { "Only one notification can be specified for now" ); } + { + let json = r#" + { + "notifications": [ + { + "queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue", + "message_type": "s3_notification" + } + ] + } + "#; + let error = serde_json::from_str::(json).unwrap_err(); + assert!(error.to_string().contains("missing field `type`")); + } } #[test] diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs index 51f14886f2d..e93a4035f3f 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs @@ -23,17 +23,18 @@ use std::time::Duration; use itertools::Itertools; use quickwit_actors::{ActorExitStatus, Mailbox}; -use quickwit_common::rate_limited_warn; +use quickwit_common::rate_limited_error; use quickwit_config::{FileSourceMessageType, FileSourceSqs}; use quickwit_metastore::checkpoint::SourceCheckpoint; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::SourceType; use quickwit_storage::StorageResolver; use serde::Serialize; +use tracing::info; use ulid::Ulid; use super::local_state::QueueLocalState; -use super::message::{MessageType, ReadyMessage}; +use super::message::{MessageType, PreProcessingError, ReadyMessage}; use super::shared_state::{checkpoint_messages, QueueSharedState}; use super::visibility::{spawn_visibility_task, VisibilitySettings}; use super::{Queue, QueueReceiver}; @@ -156,23 +157,30 @@ impl QueueCoordinator { .receive(1, self.visible_settings.deadline_for_receive) .await?; - let mut invalid_messages = Vec::new(); - let preprocessed_messages = raw_messages - .into_iter() - .map(|msg| msg.pre_process(self.message_type)) - .filter_map(|res| res.map_err(|err| invalid_messages.push(err)).ok()) - .collect::>(); - if !invalid_messages.is_empty() { - self.observable_state.num_messages_failed_preprocessing += - invalid_messages.len() as u64; - rate_limited_warn!( + let mut format_errors = Vec::new(); + let mut discardable_ack_ids = Vec::new(); + let mut preprocessed_messages = Vec::new(); + for message in raw_messages { + match message.pre_process(self.message_type) { + Ok(preprocessed_message) => preprocessed_messages.push(preprocessed_message), + Err(PreProcessingError::UnexpectedFormat(err)) => format_errors.push(err), + Err(PreProcessingError::Discardable { ack_id, reason }) => { + info!(reason, "acknowledge message without processing"); + discardable_ack_ids.push(ack_id) + } + } + } + if !format_errors.is_empty() { + self.observable_state.num_messages_failed_preprocessing += format_errors.len() as u64; + rate_limited_error!( limit_per_min = 10, - count = invalid_messages.len(), - last_err = ?invalid_messages.last().unwrap(), - "invalid messages skipped, use a dead letter queue to limit retries" + count = format_errors.len(), + last_err = ?format_errors.last().unwrap(), + "invalid message(s) not processed, use a dead letter queue to limit retries" ); } if preprocessed_messages.is_empty() { + self.queue.acknowledge(&discardable_ack_ids).await?; return Ok(()); } @@ -218,10 +226,11 @@ impl QueueCoordinator { self.local_state.set_ready_for_read(ready_messages); // Acknowledge messages that already have been processed - let ack_ids = already_completed + let mut ack_ids = already_completed .iter() .map(|msg| msg.metadata.ack_id.clone()) .collect::>(); + ack_ids.append(&mut discardable_ack_ids); self.queue.acknowledge(&ack_ids).await?; Ok(()) @@ -258,7 +267,7 @@ impl QueueCoordinator { } Err(err) => { self.observable_state.num_messages_failed_opening += 1; - rate_limited_warn!( + rate_limited_error!( limit_per_min = 5, err = ?err, "failed to start processing message" diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/message.rs b/quickwit/quickwit-indexing/src/source/queue_sources/message.rs index f6cd952daab..21028e4b705 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/message.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/message.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use core::fmt; use std::io::read_to_string; use std::str::FromStr; use std::time::Instant; @@ -27,6 +28,7 @@ use quickwit_metastore::checkpoint::PartitionId; use quickwit_proto::types::Position; use quickwit_storage::{OwnedBytes, StorageResolver}; use serde_json::Value; +use thiserror::Error; use super::visibility::VisibilityTaskHandle; use crate::source::doc_file_reader::ObjectUriBatchReader; @@ -61,14 +63,39 @@ pub struct RawMessage { pub payload: OwnedBytes, } +impl fmt::Debug for RawMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RawMessage") + .field("metadata", &self.metadata) + .field("payload", &"") + .finish() + } +} + +#[derive(Error, Debug)] +pub enum PreProcessingError { + /// The message can be acknowledged without processing + #[error("skippable message: {reason}")] + Discardable { + reason: &'static str, + ack_id: String, + }, + #[error("unexpected message format: {0}")] + UnexpectedFormat(#[from] anyhow::Error), +} + impl RawMessage { - pub fn pre_process(self, message_type: MessageType) -> anyhow::Result { + pub fn pre_process( + self, + message_type: MessageType, + ) -> Result { let payload = match message_type { - MessageType::S3Notification => { - PreProcessedPayload::ObjectUri(uri_from_s3_notification(&self.payload)?) - } + MessageType::S3Notification => PreProcessedPayload::ObjectUri( + uri_from_s3_notification(&self.payload, &self.metadata.ack_id)?, + ), MessageType::RawUri => { - PreProcessedPayload::ObjectUri(Uri::from_str(&read_to_string(self.payload)?)?) + let payload_str = read_to_string(self.payload).context("failed to read payload")?; + PreProcessedPayload::ObjectUri(Uri::from_str(&payload_str)?) } MessageType::RawData => unimplemented!(), }; @@ -108,15 +135,22 @@ impl PreProcessedMessage { } } -fn uri_from_s3_notification(message: &OwnedBytes) -> anyhow::Result { - let value: Value = serde_json::from_slice(message.as_slice())?; +fn uri_from_s3_notification(message: &OwnedBytes, ack_id: &str) -> Result { + let value: Value = + serde_json::from_slice(message.as_slice()).context("invalid JSON message")?; + if matches!(value["Event"].as_str(), Some("s3:TestEvent")) { + return Err(PreProcessingError::Discardable { + reason: "S3 test event", + ack_id: ack_id.to_string(), + }); + } let key = value["Records"][0]["s3"]["object"]["key"] .as_str() - .context("Invalid S3 notification")?; + .context("invalid S3 notification: Records[0].s3.object.key not found")?; let bucket = value["Records"][0]["s3"]["bucket"]["name"] .as_str() - .context("Invalid S3 notification")?; - Uri::from_str(&format!("s3://{}/{}", bucket, key)) + .context("invalid S3 notification: Records[0].s3.bucket.name not found".to_string())?; + Uri::from_str(&format!("s3://{}/{}", bucket, key)).map_err(|e| e.into()) } /// A message for which we know as much of the global processing status as @@ -172,7 +206,7 @@ mod tests { use super::*; #[test] - fn test_uri_from_s3_notification() { + fn test_uri_from_s3_notification_valid() { let test_message = r#" { "Records": [ @@ -214,10 +248,13 @@ mod tests { ] }"#; let actual_uri = - uri_from_s3_notification(&OwnedBytes::new(test_message.as_bytes())).unwrap(); + uri_from_s3_notification(&OwnedBytes::new(test_message.as_bytes()), "myackid").unwrap(); let expected_uri = Uri::from_str("s3://mybucket/logs.json").unwrap(); assert_eq!(actual_uri, expected_uri); + } + #[test] + fn test_uri_from_s3_notification_invalid() { let invalid_message = r#"{ "Records": [ { @@ -229,7 +266,31 @@ mod tests { } ] }"#; - let result = uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes())); - assert!(result.is_err()); + let result = + uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid"); + assert!(matches!( + result, + Err(PreProcessingError::UnexpectedFormat(_)) + )); + } + + #[test] + fn test_uri_from_s3_notification_skippable() { + let invalid_message = r#"{ + "Service":"Amazon S3", + "Event":"s3:TestEvent", + "Time":"2014-10-13T15:57:02.089Z", + "Bucket":"bucketname", + "RequestId":"5582815E1AEA5ADF", + "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE" + }"#; + let result = + uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid"); + if let Err(PreProcessingError::Discardable { reason, ack_id }) = result { + assert_eq!(reason, "S3 test event"); + assert_eq!(ack_id, "myackid"); + } else { + panic!("Expected skippable error"); + } } } diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs b/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs index 5b998effd9c..edd13ef8c0d 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs @@ -72,7 +72,7 @@ impl Queue for SqsQueue { // state that it starts when the message is returned. let initial_deadline = Instant::now() + suggested_deadline; let clamped_max_messages = std::cmp::min(max_messages, 10) as i32; - let receive_res = aws_retry(&self.receive_retries, || async { + let receive_output = aws_retry(&self.receive_retries, || async { self.sqs_client .receive_message() .queue_url(&self.queue_url) @@ -83,19 +83,7 @@ impl Queue for SqsQueue { .send() .await }) - .await; - - let receive_output = match receive_res { - Ok(output) => output, - Err(err) => { - rate_limited_error!( - limit_per_min = 10, - first_err = ?err, - "failed to receive messages from SQS", - ); - return Ok(Vec::new()); - } - }; + .await?; receive_output .messages @@ -465,4 +453,19 @@ mod localstack_tests { .unwrap(); assert_eq!(in_flight_count, 0); } + + #[tokio::test] + async fn test_receive_wrong_queue() { + let client = test_helpers::get_localstack_sqs_client().await.unwrap(); + let queue_url = test_helpers::create_queue(&client, "test-receive-existing-msg").await; + let bad_queue_url = format!("{}wrong", queue_url); + let queue = Arc::new(SqsQueue::try_new(bad_queue_url).await.unwrap()); + tokio::time::timeout( + Duration::from_millis(500), + queue.clone().receive(5, Duration::from_secs(60)), + ) + .await + .unwrap() + .unwrap_err(); + } }