Skip to content

Commit

Permalink
Merge branch 'main' into trinity/cast-updated-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a authored Jun 7, 2024
2 parents 2b9857f + 1fd7d0b commit 82b9928
Show file tree
Hide file tree
Showing 39 changed files with 96 additions and 1,423 deletions.
4 changes: 2 additions & 2 deletions distribution/ecs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ Metastore database backups are disabled as restoring one would lead to
inconsistencies with the index store on S3. To ensure high availability, you
should enable `rds_config.multi_az` instead. To use your own Postgres database
instead of creating a new RDS instance, configure the
`external_postgres_uri_ssm_parameter_arn` variable (e.g
`postgres://user:password@domain:port/db`).
`external_postgres_uri_secret_arn` variable (e.g ARN of an SSM parameter with
the value `postgres://user:password@domain:port/db`).

Using NAT Gateways for the image registry is quite costly (approx. $0.05/hour/AZ). If
you are not already using NAT Gateways in the AZs where Quickwit will be
Expand Down
2 changes: 1 addition & 1 deletion distribution/ecs/example/terraform.tf
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ module "quickwit" {
# multi_az = false
# }

# external_postgres_uri_ssm_parameter_arn = aws_ssm_parameter.postgres_uri.arn
# external_postgres_uri_secret_arn = aws_ssm_parameter.postgres_uri.arn

## Example logging configuration
# sidecar_container_definitions = {
Expand Down
4 changes: 2 additions & 2 deletions distribution/ecs/quickwit/configs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ locals {

quickwit_index_s3_prefix = var.quickwit_index_s3_prefix == "" ? aws_s3_bucket.index[0].id : var.quickwit_index_s3_prefix

use_external_rds = var.external_postgres_uri_ssm_parameter_arn != ""
postgres_uri_parameter_arn = var.external_postgres_uri_ssm_parameter_arn != "" ? var.external_postgres_uri_ssm_parameter_arn : aws_ssm_parameter.postgres_credential[0].arn
use_external_rds = var.external_postgres_uri_secret_arn != ""
postgres_uri_secret_arn = var.external_postgres_uri_secret_arn != "" ? var.external_postgres_uri_secret_arn : aws_ssm_parameter.postgres_credential[0].arn
}

resource "random_id" "module" {
Expand Down
2 changes: 1 addition & 1 deletion distribution/ecs/quickwit/iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ data "aws_iam_policy_document" "quickwit_task_execution_permission" {
statement {
actions = ["ssm:GetParameters"]

resources = [local.postgres_uri_parameter_arn]
resources = [local.postgres_uri_secret_arn]
}

statement {
Expand Down
2 changes: 1 addition & 1 deletion distribution/ecs/quickwit/quickwit-control-plane.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module "quickwit_control_plane" {
service_name = "control_plane"
service_discovery_registry_arn = aws_service_discovery_service.control_plane.arn
cluster_arn = module.ecs_cluster.arn
postgres_credential_arn = local.postgres_uri_parameter_arn
postgres_uri_secret_arn = local.postgres_uri_secret_arn
quickwit_peer_list = local.quickwit_peer_list
s3_access_policy_arn = aws_iam_policy.quickwit_task_permission.arn
task_execution_policy_arn = aws_iam_policy.quickwit_task_execution_permission.arn
Expand Down
2 changes: 1 addition & 1 deletion distribution/ecs/quickwit/quickwit-indexer.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module "quickwit_indexer" {
service_name = "indexer"
service_discovery_registry_arn = aws_service_discovery_service.indexer.arn
cluster_arn = module.ecs_cluster.arn
postgres_credential_arn = local.postgres_uri_parameter_arn
postgres_uri_secret_arn = local.postgres_uri_secret_arn
quickwit_peer_list = local.quickwit_peer_list
s3_access_policy_arn = aws_iam_policy.quickwit_task_permission.arn
task_execution_policy_arn = aws_iam_policy.quickwit_task_execution_permission.arn
Expand Down
2 changes: 1 addition & 1 deletion distribution/ecs/quickwit/quickwit-janitor.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module "quickwit_janitor" {
service_name = "janitor"
service_discovery_registry_arn = aws_service_discovery_service.janitor.arn
cluster_arn = module.ecs_cluster.arn
postgres_credential_arn = local.postgres_uri_parameter_arn
postgres_uri_secret_arn = local.postgres_uri_secret_arn
quickwit_peer_list = local.quickwit_peer_list
s3_access_policy_arn = aws_iam_policy.quickwit_task_permission.arn
task_execution_policy_arn = aws_iam_policy.quickwit_task_execution_permission.arn
Expand Down
2 changes: 1 addition & 1 deletion distribution/ecs/quickwit/quickwit-metastore.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module "quickwit_metastore" {
service_name = "metastore"
service_discovery_registry_arn = aws_service_discovery_service.metastore.arn
cluster_arn = module.ecs_cluster.arn
postgres_credential_arn = local.postgres_uri_parameter_arn
postgres_uri_secret_arn = local.postgres_uri_secret_arn
quickwit_peer_list = local.quickwit_peer_list
s3_access_policy_arn = aws_iam_policy.quickwit_task_permission.arn
task_execution_policy_arn = aws_iam_policy.quickwit_task_execution_permission.arn
Expand Down
2 changes: 1 addition & 1 deletion distribution/ecs/quickwit/quickwit-searcher.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module "quickwit_searcher" {
service_name = "searcher"
service_discovery_registry_arn = aws_service_discovery_service.searcher.arn
cluster_arn = module.ecs_cluster.arn
postgres_credential_arn = local.postgres_uri_parameter_arn
postgres_uri_secret_arn = local.postgres_uri_secret_arn
quickwit_peer_list = local.quickwit_peer_list
s3_access_policy_arn = aws_iam_policy.quickwit_task_permission.arn
task_execution_policy_arn = aws_iam_policy.quickwit_task_execution_permission.arn
Expand Down
6 changes: 1 addition & 5 deletions distribution/ecs/quickwit/service/ecs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module "quickwit_service" {
secrets = [
{
name = "QW_METASTORE_URI"
valueFrom = var.postgres_credential_arn
valueFrom = var.postgres_uri_secret_arn
}
]

Expand Down Expand Up @@ -119,10 +119,6 @@ module "quickwit_service" {
}
]

task_exec_ssm_param_arns = [
var.postgres_credential_arn
]

tasks_iam_role_policies = local.tasks_iam_role_policies

task_exec_iam_role_policies = {
Expand Down
4 changes: 3 additions & 1 deletion distribution/ecs/quickwit/service/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ variable "subnet_ids" {
type = list(string)
}

variable "postgres_credential_arn" {}
variable "postgres_uri_secret_arn" {
description = "ARN of the SSM parameter or Secret Manager secret containing the URI of a Postgres instance"
}

variable "quickwit_image" {}

Expand Down
4 changes: 2 additions & 2 deletions distribution/ecs/quickwit/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ variable "rds_config" {
default = {}
}

variable "external_postgres_uri_ssm_parameter_arn" {
description = "ARN of the SSM parameter containing the URI of a Postgres instance (postgres://{user}:{password}@{address}:{port}/{db_instance_name}). The Postgres instance should allow indbound connections from the subnets specified in `variable.subnet_ids`. If provided, the internal RDS will not be created and `var.rds_config` is ignored."
variable "external_postgres_uri_secret_arn" {
description = "ARN of the SSM parameter or Secret Manager secret containing the URI of a Postgres instance (postgres://{user}:{password}@{address}:{port}/{db_instance_name}). The Postgres instance should allow indbound connections from the subnets specified in `variable.subnet_ids`. If provided, the internal RDS will not be created and `var.rds_config` is ignored."
default = ""
}
3 changes: 0 additions & 3 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ type IndexConfigForSerialization = IndexConfigV0_8;
pub(crate) enum VersionedIndexConfig {
#[serde(rename = "0.8")]
// Retro compatibility
#[serde(alias = "0.4")]
#[serde(alias = "0.5")]
#[serde(alias = "0.6")]
#[serde(alias = "0.7")]
V0_8(IndexConfigV0_8),
}
Expand Down
3 changes: 0 additions & 3 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ enum VersionedNodeConfig {
#[serde(rename = "0.8")]
// Retro compatibility.
#[serde(alias = "0.7")]
#[serde(alias = "0.6")]
#[serde(alias = "0.5")]
#[serde(alias = "0.4")]
V0_8(NodeConfigBuilder),
}

Expand Down
9 changes: 3 additions & 6 deletions quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@ type SourceConfigForSerialization = SourceConfigV0_8;
#[serde(deny_unknown_fields)]
#[serde(tag = "version")]
pub enum VersionedSourceConfig {
#[serde(rename = "0.7")]
// Retro compatibility.
#[serde(alias = "0.6")]
#[serde(alias = "0.5")]
#[serde(alias = "0.4")]
V0_7(SourceConfigV0_7),
#[serde(rename = "0.8")]
V0_8(SourceConfigV0_8),
// Retro compatibility.
#[serde(rename = "0.7")]
V0_7(SourceConfigV0_7),
}

impl From<VersionedSourceConfig> for SourceConfigForSerialization {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2114,7 +2114,7 @@ mod tests {
assert_eq!(source_configs[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(source_configs[1].source_id, CLI_SOURCE_ID);

let index_uid = IndexUid::from_parts("test-index-foo", 0);
let index_uid = IndexUid::for_test("test-index-foo", 0);
let mut index_metadata = IndexMetadata::new_with_index_uid(index_uid, index_config);

for source_config in source_configs {
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,11 +806,11 @@ mod tests {
#[test]
fn test_build_physical_indexing_plan_simple() {
let source_1 = SourceUid {
index_uid: IndexUid::from_parts("index-1", 0),
index_uid: IndexUid::for_test("index-1", 0),
source_id: "source1".to_string(),
};
let source_2 = SourceUid {
index_uid: IndexUid::from_parts("index-2", 0),
index_uid: IndexUid::for_test("index-2", 0),
source_id: "source2".to_string(),
};
let sources = vec![
Expand Down Expand Up @@ -887,7 +887,7 @@ mod tests {
prop_compose! {
fn gen_kafka_source()
(index_idx in 0usize..100usize, num_pipelines in 1usize..51usize) -> (IndexUid, SourceConfig) {
let index_uid = IndexUid::from_parts(&format!("index-id-{index_idx}"), 0 /* this is the index uid */);
let index_uid = IndexUid::for_test(&format!("index-id-{index_idx}"), 0 /* this is the index uid */);
let source_id = quickwit_common::rand::append_random_suffix("kafka-source");
(index_uid, SourceConfig {
source_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ mod tests {

fn source_id() -> SourceUid {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let index = IndexUid::from_parts("test_index", 0);
let index = IndexUid::for_test("test_index", 0);
let source_id = COUNTER.fetch_add(1, Ordering::SeqCst);
SourceUid {
index_uid: index,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub struct MockSplitBuilder {
impl MockSplitBuilder {
pub fn new(split_id: &str) -> Self {
Self {
split_metadata: mock_split_meta(split_id, &IndexUid::from_parts("test-index", 0)),
split_metadata: mock_split_meta(split_id, &IndexUid::for_test("test-index", 0)),
}
}

Expand Down
14 changes: 7 additions & 7 deletions quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ mod tests {
#[test]
fn test_routing_table_entry_new() {
let self_node_id: NodeId = "test-node-0".into();
let index_uid: IndexUid = IndexUid::from_parts("test-index", 0);
let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".into();
let table_entry = RoutingTableEntry::new(
&self_node_id,
Expand Down Expand Up @@ -557,7 +557,7 @@ mod tests {

#[test]
fn test_routing_table_entry_has_open_shards() {
let index_uid: IndexUid = IndexUid::from_parts("test-index", 0);
let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".into();
let table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone());

Expand Down Expand Up @@ -653,7 +653,7 @@ mod tests {

#[test]
fn test_routing_table_entry_next_open_shard_round_robin() {
let index_uid: IndexUid = IndexUid::from_parts("test-index", 0);
let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".into();
let table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone());
let ingester_pool = IngesterPool::default();
Expand Down Expand Up @@ -770,7 +770,7 @@ mod tests {

#[test]
fn test_routing_table_entry_insert_open_shards() {
let index_uid_0: IndexUid = IndexUid::from_parts("test-index", 0);
let index_uid_0 = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".into();
let mut table_entry = RoutingTableEntry::empty(index_uid_0.clone(), source_id.clone());

Expand Down Expand Up @@ -847,7 +847,7 @@ mod tests {
assert_eq!(table_entry.remote_shards[1].shard_state, ShardState::Closed);

// Update index incarnation.
let index_uid_1: IndexUid = IndexUid::from_parts("test-index", 1);
let index_uid_1 = IndexUid::for_test("test-index", 1);
table_entry.insert_open_shards(
&local_node_id,
&local_node_id,
Expand Down Expand Up @@ -879,7 +879,7 @@ mod tests {

#[test]
fn test_routing_table_entry_close_shards() {
let index_uid: IndexUid = IndexUid::from_parts("test-index", 0);
let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".into();

let mut table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone());
Expand Down Expand Up @@ -960,7 +960,7 @@ mod tests {

#[test]
fn test_routing_table_entry_delete_shards() {
let index_uid: IndexUid = IndexUid::from_parts("test-index", 0);
let index_uid = IndexUid::for_test("test-index", 0);
let source_id: SourceId = "test-source".into();

let mut table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,6 @@ mod tests {
use std::collections::BTreeSet;

use quickwit_doc_mapper::tag_pruning::TagFilterAst;
use quickwit_doc_mapper::{BinaryFormat, FieldMappingType};
use quickwit_proto::ingest::Shard;
use quickwit_proto::metastore::ListShardsSubrequest;
use quickwit_proto::types::{IndexUid, SourceId};
Expand Down Expand Up @@ -897,106 +896,4 @@ mod tests {
assert!(!split_query_predicate(&&split_2, &query));
assert!(!split_query_predicate(&&split_3, &query));
}

#[test]
fn test_index_otel_bytes_fields_format_conversion() {
// TODO: remove after 0.8 release.
let index_json_str = r#"
{
"version": "0.6",
"splits": [],
"index": {
"version": "0.6",
"sources": [],
"index_uid": "otel-traces-v0_6:00000000000000000000000000",
"checkpoint": {
"kafka-source": {
"00000000000000000000": "00000000000000000042"
}
},
"create_timestamp": 1789,
"index_config": {
"version": "0.6",
"index_id": "otel-traces-v0_6",
"index_uri": "s3://otel-traces-v0_6",
"doc_mapping": {
"field_mappings": [
{
"name": "timestamp",
"type": "datetime",
"fast": true
},
{
"name": "tenant_id",
"type": "bytes",
"fast": true,
"input_format": "base64",
"output_format": "base64"
},
{
"name": "trace_id",
"type": "bytes",
"fast": true,
"input_format": "base64",
"output_format": "base64"
},
{
"name": "span_id",
"type": "bytes",
"fast": true,
"input_format": "base64",
"output_format": "base64"
}
],
"tag_fields": [],
"timestamp_field": "timestamp",
"store_source": false
}
}
}
}
"#;

let file_backed_index: FileBackedIndex = serde_json::from_str(index_json_str).unwrap();
let field_mapping = file_backed_index
.metadata
.index_config
.doc_mapping
.field_mappings;
assert_eq!(
field_mapping
.iter()
.filter(|field_mapping| field_mapping.name == "tenant_id")
.count(),
1
);
assert_eq!(
field_mapping
.iter()
.filter(|field_mapping| field_mapping.name == "trace_id")
.count(),
1
);
assert_eq!(
field_mapping
.iter()
.filter(|field_mapping| field_mapping.name == "span_id")
.count(),
1
);
for field_mapping in &field_mapping {
if field_mapping.name == "tenant_id" {
if let FieldMappingType::Bytes(bytes_options, _) = &field_mapping.mapping_type {
assert_eq!(bytes_options.input_format, BinaryFormat::Base64);
assert_eq!(bytes_options.output_format, BinaryFormat::Base64);
}
}
if field_mapping.name == "trace_id" || field_mapping.name == "span_id" {
if let FieldMappingType::Bytes(bytes_options, _) = &field_mapping.mapping_type {
assert_eq!(bytes_options.input_format, BinaryFormat::Hex);
assert_eq!(bytes_options.output_format, BinaryFormat::Hex);
}
}
}
}
}
Loading

0 comments on commit 82b9928

Please sign in to comment.