Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add File source with SQS notifications #5148

Merged
merged 51 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3d847c5
Refactor file source
rdettai Jul 31, 2024
8f3268a
More explicit reponse for read_batch
rdettai Jul 31, 2024
430d1c6
Fix clippy
rdettai Jul 31, 2024
ff500bf
Remove batch logic from DocFileReader
rdettai Jul 31, 2024
69ad7fe
Address styling comments
rdettai Jul 31, 2024
25fa085
Replace FileSourceParams path with URI
rdettai Jul 31, 2024
5ead213
Additional URI related cleanup
rdettai Jul 31, 2024
30cb1e8
Fix unit tests to now use the URI
rdettai Jul 31, 2024
ffa4142
Add queue source with SQS implementation
rdettai Jul 31, 2024
ae907a9
Fix the publish_token strategy
rdettai Jul 31, 2024
5b0e35b
Fix never-used warnings
rdettai Jul 31, 2024
f7a9e57
Fix unit tests
rdettai Jul 31, 2024
d02b17c
Abort visibility task after acknowledging
rdettai Jul 31, 2024
b7d773a
Address smaller review comments
rdettai Jul 31, 2024
e59ce3c
Shorten visibility extension task
rdettai Jul 31, 2024
d941bf8
Fix pulsar tests
rdettai Jul 31, 2024
b9e0ffc
Adjustments after rebase
rdettai Jul 31, 2024
8faa7ad
Move object backed source to file source
rdettai Jul 31, 2024
bdcce45
Simpler flow for adhoc file processing
rdettai Jul 31, 2024
369e576
Fix tests and refactor batch creation to BatchReader
rdettai Jul 31, 2024
2fd6197
Add max_messages param to Queue.receive
rdettai Jul 31, 2024
48a6567
Move use_shard_api to the metastore crate
rdettai Jul 31, 2024
b9432b6
Dedup within batches
rdettai Jul 31, 2024
827a458
Improve visibility task
rdettai Jul 31, 2024
26273c4
Re-acquire partitions aggressively
rdettai Jul 31, 2024
84762b0
Address simpler review comments
rdettai Jul 31, 2024
afa7b0a
Add test for visibility actor (failing)
rdettai Jul 31, 2024
45e8b97
Fix visibility actor drop
rdettai Jul 31, 2024
98e2646
Fix reader edge case
rdettai Jul 31, 2024
2785c76
Add end to end tests
rdettai Jul 31, 2024
a83cba7
Improve integration test scenario
rdettai Jul 31, 2024
6394d27
Chunk receive future to avoid hanging actor
rdettai Jul 31, 2024
193b305
Improve error handling
rdettai Jul 31, 2024
e4b6ef7
Fix flaky test
rdettai Jul 31, 2024
c89b9e1
New SourceConfig format with notifications field
rdettai Jul 31, 2024
fe49873
Improvements to error handling
rdettai Jul 31, 2024
727d4ef
Clarification of Queue contract
rdettai Jul 31, 2024
da80fe4
Address new round of review comments
rdettai Jul 31, 2024
ebc95a4
Remove SqsSource for now
rdettai Jul 31, 2024
36c6862
Fix panic
rdettai Jul 31, 2024
2f6bcdd
Revert to forbidding any adhoc file source through the API
rdettai Jul 31, 2024
80bdaa3
Add docs
rdettai Jul 31, 2024
a0f773c
Fix panic on empty file
rdettai Jul 31, 2024
8de7747
Documentation improvments
rdettai Jul 31, 2024
6870e56
Improve documentation
rdettai Jul 31, 2024
1bfcb9b
Improve error handling code and associated docs
rdettai Jul 31, 2024
c4fae3c
Nitpic and TODO cleanup
rdettai Jul 31, 2024
00323ff
Add tip about ingests directly from object stores
rdettai Jul 31, 2024
10b6a06
Ack notifications of undesired type
rdettai Jul 31, 2024
86a8770
Add docs about situations where messages require a DLQ
rdettai Jul 31, 2024
35b5e40
Fix integ test after rebase
rdettai Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ networks:

services:
localstack:
image: localstack/localstack:${LOCALSTACK_VERSION:-2.3.2}
image: localstack/localstack:${LOCALSTACK_VERSION:-3.5.0}
container_name: localstack
ports:
- "${MAP_HOST_LOCALSTACK:-127.0.0.1}:4566:4566"
Expand All @@ -37,7 +37,7 @@ services:
- all
- localstack
environment:
SERVICES: kinesis,s3
SERVICES: kinesis,s3,sqs
PERSISTENCE: 1
volumes:
- .localstack:/etc/localstack/init/ready.d
Expand Down
134 changes: 134 additions & 0 deletions docs/assets/sqs-file-source.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
terraform {
required_version = "1.7.5"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.39.1"
}
}
}

provider "aws" {
region = "us-east-1"
default_tags {
tags = {
provisioner = "terraform"
author = "Quickwit"
}
}
}

locals {
sqs_notification_queue_name = "qw-tuto-s3-event-notifications"
source_bucket_name = "qw-tuto-source-bucket"
}

resource "aws_s3_bucket" "file_source" {
bucket_prefix = local.source_bucket_name
force_destroy = true
}

data "aws_iam_policy_document" "sqs_notification" {
statement {
effect = "Allow"

principals {
type = "*"
identifiers = ["*"]
}

actions = ["sqs:SendMessage"]
resources = ["arn:aws:sqs:*:*:${local.sqs_notification_queue_name}"]

condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [aws_s3_bucket.file_source.arn]
}
}
}


resource "aws_sqs_queue" "s3_events" {
name = local.sqs_notification_queue_name
policy = data.aws_iam_policy_document.sqs_notification.json

redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.s3_events_deadletter.arn
maxReceiveCount = 5
})
}

resource "aws_sqs_queue" "s3_events_deadletter" {
name = "${locals.sqs_notification_queue_name}-deadletter"
}

resource "aws_sqs_queue_redrive_allow_policy" "s3_events_deadletter" {
queue_url = aws_sqs_queue.s3_events_deadletter.id

redrive_allow_policy = jsonencode({
redrivePermission = "byQueue",
sourceQueueArns = [aws_sqs_queue.s3_events.arn]
})
}

resource "aws_s3_bucket_notification" "bucket_notification" {
bucket = aws_s3_bucket.file_source.id

queue {
queue_arn = aws_sqs_queue.s3_events.arn
events = ["s3:ObjectCreated:*"]
}
}

data "aws_iam_policy_document" "quickwit_node" {
statement {
effect = "Allow"
actions = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueAttributes",
]
resources = [aws_sqs_queue.s3_events.arn]
}
statement {
effect = "Allow"
actions = ["s3:GetObject"]
resources = ["${aws_s3_bucket.file_source.arn}/*"]
}
}

resource "aws_iam_user" "quickwit_node" {
name = "quickwit-filesource-tutorial"
path = "/system/"
}

resource "aws_iam_user_policy" "quickwit_node" {
name = "quickwit-filesource-tutorial"
user = aws_iam_user.quickwit_node.name
policy = data.aws_iam_policy_document.quickwit_node.json
}

resource "aws_iam_access_key" "quickwit_node" {
user = aws_iam_user.quickwit_node.name
}

output "source_bucket_name" {
value = aws_s3_bucket.file_source.bucket

}

output "notification_queue_url" {
value = aws_sqs_queue.s3_events.id
}

output "quickwit_node_access_key_id" {
value = aws_iam_access_key.quickwit_node.id
sensitive = true
}

output "quickwit_node_secret_access_key" {
value = aws_iam_access_key.quickwit_node.secret
sensitive = true
}
55 changes: 51 additions & 4 deletions docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,62 @@ The source type designates the kind of source being configured. As of version 0.

The source parameters indicate how to connect to a data store and are specific to the source type.

### File source (CLI only)
### File source

A file source reads data from a local file. The file must consist of JSON objects separated by a newline (NDJSON).
As of version 0.5, a file source can only be ingested with the [CLI command](/docs/reference/cli.md#tool-local-ingest). Compressed files (bz2, gzip, ...) and remote files (Amazon S3, HTTP, ...) are not supported.
A file source reads data from files containing JSON objects separated by newlines (NDJSON). Gzip compression is supported provided that the file name ends with the `.gz` suffix.

#### Ingest a single file (CLI only)

To ingest a specific file, run the indexing directly in an adhoc CLI process with:

```bash
./quickwit tool local-ingest --index <index> --input-path <input-path>
```

Both local and object files are supported, provided that the environment is configured with the appropriate permissions. A tutorial is available [here](/docs/ingest-data/ingest-local-file.md).

#### Notification based file ingestion (beta)

Quickwit can automatically ingest all new files that are uploaded to an S3 bucket. This requires creating and configuring an [SQS notification queue](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html). A complete example can be found [in this tutorial](/docs/ingest-data/sqs-files.md).


The `notifications` parameter takes an array of notification settings. Currently one notifier can be configured per source and only the SQS notification `type` is supported.

Required fields for the SQS `notifications` parameter items:
- `type`: `sqs`
- `queue_url`: complete URL of the SQS queue (e.g `https://sqs.us-east-1.amazonaws.com/123456789012/queue-name`)
- `message_type`: format of the message payload, either
- `s3_notification`: an [S3 event notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html)
- `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`)

*Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)*

```bash
./quickwit tool local-ingest --input-path <INPUT_PATH>
cat << EOF > source-config.yaml
version: 0.8
source_id: my-sqs-file-source
source_type: file
num_pipelines: 2
params:
notifications:
- type: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
message_type: s3_notification
EOF
./quickwit source create --index my-index --source-config source-config.yaml
rdettai marked this conversation as resolved.
Show resolved Hide resolved
```

:::note

- Quickwit does not automatically delete the source files after a successful ingestion. You can use [S3 object expiration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-expire-general-considerations.html) to configure how long they should be retained in the bucket.
- Configure the notification to only forward events of type `s3:ObjectCreated:*`. Other events are acknowledged by the source without further processing and an warning is logged.
- We strongly recommend using a [dead letter queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) to receive all messages that couldn't be processed by the file source. A `maxReceiveCount` of 5 is a good default value. Here are some common situations where the notification message ends up in the dead letter queue:
- the notification message could not be parsed (e.g it is not a valid S3 notification)
- the file was not found
- the file is corrupted (e.g unexpected compression)

:::

### Ingest API source

An ingest API source reads data from the [Ingest API](/docs/reference/rest-api.md#ingest-data-into-an-index). This source is automatically created at the index creation and cannot be deleted nor disabled.
Expand Down
6 changes: 6 additions & 0 deletions docs/ingest-data/ingest-local-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ Clearing local cache directory...
✔ Documents successfully indexed.
```

:::tip

Object store URIs like `s3://mybucket/mykey.json` are also supported as `--input-path`, provided that your environment is configured with the appropriate permissions.

:::

## Tear down resources (optional)

That's it! You can now tear down the resources you created. You can do so by running the following command:
Expand Down
Loading
Loading