Skip to content

Commit

Permalink
Ack notifications of undesired type
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jul 31, 2024
1 parent ab93cb7 commit fc31c8d
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ EOF
:::note

- Quickwit does not automatically delete the source files after a successful ingestion. You can use [S3 object expiration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-expire-general-considerations.html) to configure how long they should be retained in the bucket.
- Configure the notification to only forward events of type `s3:ObjectCreated:*`. Other types of event are discarded by the source and an error log is printed.
- Configure the notification to only forward events of type `s3:ObjectCreated:*`. Other events are acknowledged by the source without further processing and an warning is logged.
- We strongly recommend using a [dead letter queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) to receive all messages that couldn't be processed by the file source. A `maxReceiveCount` of 5 is a good default value.

:::
Expand Down
2 changes: 1 addition & 1 deletion docs/ingest-data/sqs-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ resource "aws_s3_bucket_notification" "bucket_notification" {
:::note

Only events of type `s3:ObjectCreated:*` are supported. Other types (e.g.
`ObjectRemoved`) are discarded with an error log.
`ObjectRemoved`) are acknowledged and a warning is logged.

:::

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::SourceType;
use quickwit_storage::StorageResolver;
use serde::Serialize;
use tracing::info;
use ulid::Ulid;

use super::helpers::QueueReceiver;
Expand Down Expand Up @@ -168,10 +167,7 @@ impl QueueCoordinator {
match message.pre_process(self.message_type) {
Ok(preprocessed_message) => preprocessed_messages.push(preprocessed_message),
Err(PreProcessingError::UnexpectedFormat(err)) => format_errors.push(err),
Err(PreProcessingError::Discardable { ack_id, reason }) => {
info!(reason, "acknowledge message without processing");
discardable_ack_ids.push(ack_id)
}
Err(PreProcessingError::Discardable { ack_id }) => discardable_ack_ids.push(ack_id),
}
}
if !format_errors.is_empty() {
Expand Down
33 changes: 17 additions & 16 deletions quickwit/quickwit-indexing/src/source/queue_sources/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ use std::str::FromStr;
use std::time::Instant;

use anyhow::Context;
use quickwit_common::rate_limited_warn;
use quickwit_common::uri::Uri;
use quickwit_metastore::checkpoint::PartitionId;
use quickwit_proto::types::Position;
use quickwit_storage::{OwnedBytes, StorageResolver};
use serde_json::Value;
use thiserror::Error;
use tracing::info;

use super::visibility::VisibilityTaskHandle;
use crate::source::doc_file_reader::ObjectUriBatchReader;
Expand Down Expand Up @@ -74,12 +76,8 @@ impl fmt::Debug for RawMessage {

#[derive(Error, Debug)]
pub enum PreProcessingError {
/// The message can be acknowledged without processing
#[error("skippable message: {reason}")]
Discardable {
reason: &'static str,
ack_id: String,
},
#[error("message can be acknowledged without processing")]
Discardable { ack_id: String },
#[error("unexpected message format: {0}")]
UnexpectedFormat(#[from] anyhow::Error),
}
Expand Down Expand Up @@ -140,26 +138,30 @@ impl PreProcessedMessage {
fn uri_from_s3_notification(message: &[u8], ack_id: &str) -> Result<Uri, PreProcessingError> {
let value: Value = serde_json::from_slice(message).context("invalid JSON message")?;
if matches!(value["Event"].as_str(), Some("s3:TestEvent")) {
info!("discarding S3 test event");
return Err(PreProcessingError::Discardable {
reason: "S3 test event",
ack_id: ack_id.to_string(),
});
}
let event_name = value["Records"][0]["eventName"]
.as_str()
.context("invalid S3 notification: Records[0].eventName not found")?;
if !event_name.starts_with("ObjectCreated:") {
return Err(PreProcessingError::UnexpectedFormat(anyhow::anyhow!(
"only s3:ObjectCreated:* are supported, got {}",
event_name
)));
rate_limited_warn!(
limit_per_min = 5,
event = event_name,
"only s3:ObjectCreated:* events are supported"
);
return Err(PreProcessingError::Discardable {
ack_id: ack_id.to_string(),
});
}
let key = value["Records"][0]["s3"]["object"]["key"]
.as_str()
.context("invalid S3 notification: Records[0].s3.object.key not found")?;
let bucket = value["Records"][0]["s3"]["bucket"]["name"]
.as_str()
.context("invalid S3 notification: Records[0].s3.bucket.name not found".to_string())?;
.context("invalid S3 notification: Records[0].s3.bucket.name not found")?;
Uri::from_str(&format!("s3://{}/{}", bucket, key)).map_err(|e| e.into())
}

Expand Down Expand Up @@ -325,12 +327,12 @@ mod tests {
uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
assert!(matches!(
result,
Err(PreProcessingError::UnexpectedFormat(_))
Err(PreProcessingError::Discardable { .. })
));
}

#[test]
fn test_uri_from_s3_notification_skippable() {
fn test_uri_from_s3_notification_discardable() {
let invalid_message = r#"{
"Service":"Amazon S3",
"Event":"s3:TestEvent",
Expand All @@ -341,8 +343,7 @@ mod tests {
}"#;
let result =
uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
if let Err(PreProcessingError::Discardable { reason, ack_id }) = result {
assert_eq!(reason, "S3 test event");
if let Err(PreProcessingError::Discardable { ack_id }) = result {
assert_eq!(ack_id, "myackid");
} else {
panic!("Expected skippable error");
Expand Down

0 comments on commit fc31c8d

Please sign in to comment.