diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 651e162013d..338da75a868 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -182,39 +182,15 @@ quickwit index create --endpoint=http://127.0.0.1:7280 --index-config wikipedia_ ### index update +Update an index using an index config file. `quickwit index update [args]` -#### index update search-settings - -Updates default search settings. -`quickwit index update search-settings [args]` *Synopsis* ```bash -quickwit index update search-settings +quickwit index update --index - --default-search-fields -``` - -*Options* - -| Option | Description | -|-----------------|-------------| -| `--index` | ID of the target index | -| `--default-search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. "field1 field2". If no value is provided, existing defaults are removed and queries without target field will fail. | -#### index update retention-policy - -Configure or disable the retention policy. -`quickwit index update retention-policy [args]` - -*Synopsis* - -```bash -quickwit index update retention-policy - --index - [--period ] - [--schedule ] - [--disable] + --index-config ``` *Options* @@ -222,9 +198,7 @@ quickwit index update retention-policy | Option | Description | |-----------------|-------------| | `--index` | ID of the target index | -| `--period` | Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...) | -| `--schedule` | Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...). | -| `--disable` | Disable the retention policy. Old indexed data will not be cleaned up anymore. | +| `--index-config` | Location of the index config file. | ### index clear Clears an index: deletes all splits and resets checkpoint. @@ -380,8 +354,9 @@ quickwit index ingest | `--input-path` | Location of the input file. | | `--batch-size-limit` | Size limit of each submitted document batch. | | `--wait` | Wait for all documents to be commited and available for search before exiting | +| `--v2` | Ingest v2 (experimental! Do not use me.) | | `--force` | Force a commit after the last document is sent, and wait for all documents to be committed and available for search before exiting | -| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting which sets the maximum time before commiting splits after their creation. | +| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting, which sets the maximum time before commiting splits after their creation. | *Examples* diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index d0606b74372..dbc17c459a4 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -223,7 +223,7 @@ The response is a JSON object, and the content type is `application/json; charse POST api/v1/indexes ``` -Create an index by posting an `IndexConfig` payload. The API accepts JSON with `content-type: application/json` and YAML `content-type: application/yaml`. +Create an index by posting an `IndexConfig` payload. The API accepts JSON with `content-type: application/json` and YAML with `content-type: application/yaml`. #### POST payload @@ -309,18 +309,23 @@ The response is the index metadata of the created index, and the content type is | `sources` | List of the index sources configurations. | `Array` | -### Update an index (search settings and retention policy only) +### Update an index ``` PUT api/v1/indexes/ ``` -Updates the search settings and retention policy of an index. This endpoint follows PUT semantics (not PATCH), which means that all the updatable fields of the index configuration are replaced by the values specified in this request. In particular, omitting an optional field like retention_policy will delete the associated configuration. Unlike the create endpoint, this API only accepts JSON payloads. +Updates the configurations of an index. This endpoint follows PUT semantics, which means that all the fields of the current configuration are replaced by the values specified in this request or the associated defaults. In particular if the field is optional (e.g `retention_policy`), omitting it will delete the associated configuration. If the new configuration file contains updates that cannot be applied, the request fails and none of the updates are applied. The API accepts JSON with `content-type: application/json` and YAML with `content-type: application/yaml`. #### PUT payload | Variable | Type | Description | Default value | |---------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| +| `version` | `String` | Config format version, use the same as your Quickwit version. (mandatory) | | +| `index_id` | `String` | Index ID, must be the same index as in the request URI. (mandatory) | | +| `index_uri` | `String` | Defines where the index files are stored. (cannot be updated) | `{current_index_uri}` | +| `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping). (cannot be updated) | | +| `indexing_settings` | `IndexingSettings` | Indexing settings object as specified in the [index config docs](../configuration/index-config.md#indexing-settings). | | | `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | | | `retention` | `Retention` | Retention policy object as specified in the [index config docs](../configuration/index-config.md#retention-policy). | | diff --git a/quickwit/quickwit-cli/src/index/mod.rs b/quickwit/quickwit-cli/src/index.rs similarity index 94% rename from quickwit/quickwit-cli/src/index/mod.rs rename to quickwit/quickwit-cli/src/index.rs index efc253b9667..05b3161c39f 100644 --- a/quickwit/quickwit-cli/src/index/mod.rs +++ b/quickwit/quickwit-cli/src/index.rs @@ -57,13 +57,10 @@ use tabled::{Table, Tabled}; use thousands::Separable; use tracing::{debug, Level}; -use self::update::{build_index_update_command, IndexUpdateCliCommand}; use crate::checklist::GREEN_COLOR; use crate::stats::{mean, percentile, std_deviation}; use crate::{client_args, make_table, prompt_confirmation, ClientArgs, THROUGHPUT_WINDOW_SIZE}; -pub mod update; - pub fn build_index_command() -> Command { Command::new("index") .about("Manages indexes: creates, updates, deletes, ingests, searches, describes...") @@ -81,7 +78,18 @@ pub fn build_index_command() -> Command { ]) ) .subcommand( - build_index_update_command().display_order(2) + Command::new("update") + .display_order(1) + .about("Update an index using an index config file.") + .long_about("This command follows PUT semantics, which means that all the fields of the current configuration are replaced by the values specified in this request or the associated defaults. In particular if the field is optional (e.g `retention_policy`), omitting it will delete the associated configuration. If the new configuration file contains updates that cannot be applied, the request fails and none of the updates are applied.") + .args(&[ + arg!(--index "ID of the target index") + .display_order(1) + .required(true), + arg!(--"index-config" "Location of the index config file.") + .display_order(2) + .required(true), + ]) ) .subcommand( Command::new("clear") @@ -213,6 +221,14 @@ pub struct CreateIndexArgs { pub assume_yes: bool, } +#[derive(Debug, Eq, PartialEq)] +pub struct UpdateIndexArgs { + pub client_args: ClientArgs, + pub index_id: IndexId, + pub index_config_uri: Uri, + pub assume_yes: bool, +} + #[derive(Debug, Eq, PartialEq)] pub struct DescribeIndexArgs { pub client_args: ClientArgs, @@ -260,12 +276,12 @@ pub struct ListIndexesArgs { pub enum IndexCliCommand { Clear(ClearIndexArgs), Create(CreateIndexArgs), + Update(UpdateIndexArgs), Delete(DeleteIndexArgs), Describe(DescribeIndexArgs), Ingest(IngestDocsArgs), List(ListIndexesArgs), Search(SearchIndexArgs), - Update(IndexUpdateCliCommand), } impl IndexCliCommand { @@ -288,7 +304,7 @@ impl IndexCliCommand { "ingest" => Self::parse_ingest_args(submatches), "list" => Self::parse_list_args(submatches), "search" => Self::parse_search_args(submatches), - "update" => Ok(Self::Update(IndexUpdateCliCommand::parse_args(submatches)?)), + "update" => Self::parse_update_args(submatches), _ => bail!("unknown index subcommand `{subcommand}`"), } } @@ -323,6 +339,25 @@ impl IndexCliCommand { })) } + fn parse_update_args(mut matches: ArgMatches) -> anyhow::Result { + let client_args = ClientArgs::parse(&mut matches)?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); + let index_config_uri = matches + .remove_one::("index-config") + .map(|uri| Uri::from_str(&uri)) + .expect("`index-config` should be a required arg.")?; + let assume_yes = matches.get_flag("yes"); + + Ok(Self::Update(UpdateIndexArgs { + index_id, + client_args, + index_config_uri, + assume_yes, + })) + } + fn parse_describe_args(mut matches: ArgMatches) -> anyhow::Result { let client_args = ClientArgs::parse(&mut matches)?; let index_id = matches @@ -449,7 +484,7 @@ impl IndexCliCommand { Self::Ingest(args) => ingest_docs_cli(args).await, Self::List(args) => list_index_cli(args).await, Self::Search(args) => search_index_cli(args).await, - Self::Update(args) => args.execute().await, + Self::Update(args) => update_index_cli(args).await, } } } @@ -501,6 +536,33 @@ pub async fn create_index_cli(args: CreateIndexArgs) -> anyhow::Result<()> { Ok(()) } +pub async fn update_index_cli(args: UpdateIndexArgs) -> anyhow::Result<()> { + debug!(args=?args, "create-index"); + println!("❯ Updating index..."); + let storage_resolver = StorageResolver::unconfigured(); + let file_content = load_file(&storage_resolver, &args.index_config_uri).await?; + let index_config_str: String = std::str::from_utf8(&file_content) + .with_context(|| format!("Invalid utf8: `{}`", args.index_config_uri))? + .to_string(); + let config_format = ConfigFormat::sniff_from_uri(&args.index_config_uri)?; + let qw_client = args.client_args.client(); + if !args.assume_yes { + // Stop if user answers no. + let prompt = "This operation will overwrite the index and delete all its data. Do you \ + want to proceed?" + .to_string(); + if !prompt_confirmation(&prompt, false) { + return Ok(()); + } + } + qw_client + .indexes() + .update(&args.index_id, &index_config_str, config_format) + .await?; + println!("{} Index successfully created.", "✔".color(GREEN_COLOR)); + Ok(()) +} + pub async fn list_index_cli(args: ListIndexesArgs) -> anyhow::Result<()> { debug!(args=?args, "list-index"); let qw_client = args.client_args.client(); diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs deleted file mode 100644 index f417dbb0216..00000000000 --- a/quickwit/quickwit-cli/src/index/update.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use anyhow::{bail, Context}; -use clap::{arg, ArgMatches, Command}; -use colored::Colorize; -use quickwit_config::{RetentionPolicy, SearchSettings}; -use quickwit_proto::types::IndexId; -use quickwit_serve::IndexUpdates; -use tracing::debug; - -use crate::checklist::GREEN_COLOR; -use crate::ClientArgs; - -pub fn build_index_update_command() -> Command { - Command::new("update") - .subcommand_required(true) - .subcommand( - Command::new("search-settings") - .about("Updates default search settings.") - .args(&[ - arg!(--index "ID of the target index") - .display_order(1) - .required(true), - arg!(--"default-search-fields" "List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. \"field1 field2\". If no value is provided, existing defaults are removed and queries without target field will fail.") - .display_order(2) - .num_args(0..) - .required(true), - ])) - .subcommand( - Command::new("retention-policy") - .about("Configures or disables the retention policy.") - .args(&[ - arg!(--index "ID of the target index") - .display_order(1) - .required(true), - arg!(--"period" "Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...)") - .display_order(2) - .required(false), - arg!(--"schedule" "Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...).") - .display_order(3) - .required(false), - arg!(--"disable" "Disables the retention policy. Old indexed data will not be cleaned up anymore.") - .display_order(4) - .required(false), - ]) - ) -} - -#[derive(Debug, Eq, PartialEq)] -pub struct RetentionPolicyArgs { - pub client_args: ClientArgs, - pub index_id: IndexId, - pub disable: bool, - pub period: Option, - pub schedule: Option, -} - -#[derive(Debug, Eq, PartialEq)] -pub struct SearchSettingsArgs { - pub client_args: ClientArgs, - pub index_id: IndexId, - pub default_search_fields: Vec, -} - -#[derive(Debug, Eq, PartialEq)] -pub enum IndexUpdateCliCommand { - RetentionPolicy(RetentionPolicyArgs), - SearchSettings(SearchSettingsArgs), -} - -impl IndexUpdateCliCommand { - pub fn parse_args(mut matches: ArgMatches) -> anyhow::Result { - let (subcommand, submatches) = matches - .remove_subcommand() - .context("failed to parse index update subcommand")?; - match subcommand.as_str() { - "retention-policy" => Self::parse_update_retention_policy_args(submatches), - "search-settings" => Self::parse_update_search_settings_args(submatches), - _ => bail!("unknown index update subcommand `{subcommand}`"), - } - } - - fn parse_update_retention_policy_args(mut matches: ArgMatches) -> anyhow::Result { - let client_args = ClientArgs::parse(&mut matches)?; - let index_id = matches - .remove_one::("index") - .expect("`index` should be a required arg."); - let disable = matches.get_flag("disable"); - let period = matches.remove_one::("period"); - let schedule = matches.remove_one::("schedule"); - Ok(Self::RetentionPolicy(RetentionPolicyArgs { - client_args, - index_id, - disable, - period, - schedule, - })) - } - - fn parse_update_search_settings_args(mut matches: ArgMatches) -> anyhow::Result { - let client_args = ClientArgs::parse(&mut matches)?; - let index_id = matches - .remove_one::("index") - .expect("`index` should be a required arg."); - let default_search_fields = matches - .remove_many::("default-search-fields") - .map(|values| values.collect()) - // --default-search-fields should be made optional if other fields - // are added to SearchSettings - .expect("`default-search-fields` should be a required arg."); - Ok(Self::SearchSettings(SearchSettingsArgs { - client_args, - index_id, - default_search_fields, - })) - } - - pub async fn execute(self) -> anyhow::Result<()> { - match self { - Self::RetentionPolicy(args) => update_retention_policy_cli(args).await, - Self::SearchSettings(args) => update_search_settings_cli(args).await, - } - } -} - -pub async fn update_retention_policy_cli(args: RetentionPolicyArgs) -> anyhow::Result<()> { - debug!(args=?args, "update-index-retention-policy"); - println!("❯ Updating index retention policy..."); - let qw_client = args.client_args.client(); - let metadata = qw_client.indexes().get(&args.index_id).await?; - let new_retention_policy_opt = match ( - args.disable, - args.period, - args.schedule, - metadata.index_config.retention_policy_opt, - ) { - (true, Some(_), Some(_), _) | (true, None, Some(_), _) | (true, Some(_), None, _) => { - bail!("`--period` and `--schedule` cannot be used together with `--disable`") - } - (false, None, None, _) => bail!("either `--period` or `--disable` must be specified"), - (false, None, Some(_), None) => { - bail!("`--period` is required when creating a retention policy") - } - (true, None, None, _) => None, - (false, None, Some(schedule), Some(policy)) => Some(RetentionPolicy { - retention_period: policy.retention_period, - evaluation_schedule: schedule, - }), - (false, Some(period), schedule_opt, None) => Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule_opt.unwrap_or(RetentionPolicy::default_schedule()), - }), - (false, Some(period), schedule_opt, Some(policy)) => Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule_opt.unwrap_or(policy.evaluation_schedule.clone()), - }), - }; - if let Some(new_retention_policy) = new_retention_policy_opt.as_ref() { - println!( - "New retention policy: {}", - serde_json::to_string(&new_retention_policy)? - ); - } else { - println!("Disable retention policy."); - } - qw_client - .indexes() - .update( - &args.index_id, - IndexUpdates { - retention_policy_opt: new_retention_policy_opt, - search_settings: metadata.index_config.search_settings, - }, - ) - .await?; - println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); - Ok(()) -} - -pub async fn update_search_settings_cli(args: SearchSettingsArgs) -> anyhow::Result<()> { - debug!(args=?args, "update-index-search-settings"); - println!("❯ Updating index search settings..."); - let qw_client = args.client_args.client(); - let metadata = qw_client.indexes().get(&args.index_id).await?; - let search_settings = SearchSettings { - default_search_fields: args.default_search_fields, - }; - println!( - "New search settings: {}", - serde_json::to_string(&search_settings)? - ); - qw_client - .indexes() - .update( - &args.index_id, - IndexUpdates { - retention_policy_opt: metadata.index_config.retention_policy_opt, - search_settings, - }, - ) - .await?; - println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - use crate::cli::{build_cli, CliCommand}; - use crate::index::IndexCliCommand; - - #[test] - fn test_cmd_update_subsubcommand() { - let app = build_cli().no_binary_name(true); - let matches = app - .try_get_matches_from([ - "index", - "update", - "retention-policy", - "--index", - "my-index", - "--period", - "1 day", - ]) - .unwrap(); - let command = CliCommand::parse_cli_args(matches).unwrap(); - assert!(matches!( - command, - CliCommand::Index(IndexCliCommand::Update( - IndexUpdateCliCommand::RetentionPolicy(RetentionPolicyArgs { - client_args: _, - index_id, - disable: false, - period: Some(period), - schedule: None, - }) - )) if &index_id == "my-index" && &period == "1 day" - )); - } -} diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 848934c6a28..25f840fd1e7 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -22,25 +22,21 @@ mod helpers; use std::path::Path; -use std::str::FromStr; use anyhow::Result; use clap::error::ErrorKind; use helpers::{TestEnv, TestStorageType}; use quickwit_cli::checklist::ChecklistError; use quickwit_cli::cli::build_cli; -use quickwit_cli::index::update::{update_retention_policy_cli, RetentionPolicyArgs}; use quickwit_cli::index::{ - create_index_cli, delete_index_cli, search_index, CreateIndexArgs, DeleteIndexArgs, - SearchIndexArgs, + create_index_cli, delete_index_cli, search_index, update_index_cli, CreateIndexArgs, + DeleteIndexArgs, SearchIndexArgs, UpdateIndexArgs, }; use quickwit_cli::tool::{ garbage_collect_index_cli, local_ingest_docs_cli, GarbageCollectIndexArgs, LocalIngestDocsArgs, }; -use quickwit_cli::ClientArgs; use quickwit_common::fs::get_cache_directory_path; use quickwit_common::rand::append_random_suffix; -use quickwit_common::uri::Uri; use quickwit_config::{RetentionPolicy, SourceInputFormat, CLI_SOURCE_ID}; use quickwit_metastore::{ ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, @@ -57,11 +53,8 @@ use crate::helpers::{create_test_env, upload_test_file, PACKAGE_BIN_NAME}; async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> { let args = CreateIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - index_config_uri: test_env.index_config_uri.clone(), + client_args: test_env.default_client_args(), + index_config_uri: test_env.resource_files.index_config.clone(), overwrite: false, assume_yes: true, }; @@ -70,7 +63,7 @@ async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> { async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Result<()> { let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri.clone(), + config_uri: test_env.resource_files.config.clone(), index_id: test_env.index_id.clone(), input_path_opt: Some(input_path.to_path_buf()), input_format: SourceInputFormat::Json, @@ -118,12 +111,9 @@ async fn test_cmd_create_no_index_uri() { .unwrap(); test_env.start_server().await.unwrap(); - let index_config_without_uri = Uri::from_str(&test_env.index_config_without_uri()).unwrap(); + let index_config_without_uri = test_env.resource_files.index_config_without_uri.clone(); let args = CreateIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_config_uri: index_config_without_uri, overwrite: false, assume_yes: true, @@ -146,12 +136,9 @@ async fn test_cmd_create_overwrite() { .unwrap(); test_env.start_server().await.unwrap(); - let index_config_without_uri = Uri::from_str(&test_env.index_config_without_uri()).unwrap(); + let index_config_without_uri = test_env.resource_files.index_config_without_uri.clone(); let args = CreateIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_config_uri: index_config_without_uri, overwrite: true, assume_yes: true, @@ -182,9 +169,9 @@ async fn test_cmd_ingest_on_non_existing_index() { .unwrap(); let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri, + config_uri: test_env.resource_files.config, index_id: "index-does-not-exist".to_string(), - input_path_opt: Some(test_env.resource_files["logs"].clone()), + input_path_opt: Some(test_env.resource_files.log_docs.clone()), input_format: SourceInputFormat::Json, overwrite: false, clear_cache: true, @@ -212,9 +199,9 @@ async fn test_ingest_docs_cli_keep_cache() { create_logs_index(&test_env).await.unwrap(); let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri, + config_uri: test_env.resource_files.config, index_id, - input_path_opt: Some(test_env.resource_files["logs"].clone()), + input_path_opt: Some(test_env.resource_files.log_docs.clone()), input_format: SourceInputFormat::Json, overwrite: false, clear_cache: false, @@ -239,9 +226,9 @@ async fn test_ingest_docs_cli() { let index_uid = test_env.index_metadata().await.unwrap().index_uid; let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri.clone(), + config_uri: test_env.resource_files.config.clone(), index_id: index_id.clone(), - input_path_opt: Some(test_env.resource_files["logs"].clone()), + input_path_opt: Some(test_env.resource_files.log_docs.clone()), input_format: SourceInputFormat::Json, overwrite: false, clear_cache: true, @@ -270,7 +257,7 @@ async fn test_ingest_docs_cli() { // Ingest a non-existing file should fail. let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri, + config_uri: test_env.resource_files.config, index_id: test_env.index_id, input_path_opt: Some(test_env.data_dir_path.join("file-does-not-exist.json")), input_format: SourceInputFormat::Json, @@ -345,7 +332,7 @@ async fn test_cmd_search_aggregation() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -371,7 +358,7 @@ async fn test_cmd_search_aggregation() { // search with aggregation let args = SearchIndexArgs { - index_id: test_env.index_id, + index_id: test_env.index_id.clone(), query: "paris OR tokio OR london".to_string(), aggregation: Some(serde_json::to_string(&aggregation).unwrap()), max_hits: 10, @@ -380,10 +367,7 @@ async fn test_cmd_search_aggregation() { snippet_fields: None, start_timestamp: None, end_timestamp: None, - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint, - ..Default::default() - }, + client_args: test_env.default_client_args(), sort_by_score: false, }; let search_response = search_index(args).await.unwrap(); @@ -448,13 +432,13 @@ async fn test_cmd_search_with_snippets() -> Result<()> { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); // search with snippets let args = SearchIndexArgs { - index_id: test_env.index_id, + index_id: test_env.index_id.clone(), query: "event:baz".to_string(), aggregation: None, max_hits: 10, @@ -463,10 +447,7 @@ async fn test_cmd_search_with_snippets() -> Result<()> { snippet_fields: Some(vec!["event".to_string()]), start_timestamp: None, end_timestamp: None, - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint, - ..Default::default() - }, + client_args: test_env.default_client_args(), sort_by_score: false, }; let search_response = search_index(args).await.unwrap(); @@ -493,10 +474,7 @@ async fn test_search_index_cli() { create_logs_index(&test_env).await.unwrap(); let create_search_args = |query: &str| SearchIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), query: query.to_string(), aggregation: None, @@ -509,7 +487,7 @@ async fn test_search_index_cli() { sort_by_score: false, }; - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -544,20 +522,16 @@ async fn test_cmd_update_index() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - // add a policy - update_retention_policy_cli(RetentionPolicyArgs { + // add retention policy + let args = UpdateIndexArgs { + client_args: test_env.default_client_args(), index_id: index_id.clone(), - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - disable: false, - period: Some(String::from("1 week")), - schedule: Some(String::from("daily")), - }) - .await - .unwrap(); + index_config_uri: test_env.resource_files.index_config_with_retention.clone(), + assume_yes: true, + }; + update_index_cli(args).await.unwrap(); let index_metadata = test_env.index_metadata().await.unwrap(); + assert_eq!(index_metadata.index_id(), test_env.index_id); assert_eq!( index_metadata.index_config.retention_policy_opt, Some(RetentionPolicy { @@ -566,34 +540,16 @@ async fn test_cmd_update_index() { }) ); - // invalid args - update_retention_policy_cli(RetentionPolicyArgs { - index_id: index_id.clone(), - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - disable: true, - period: Some(String::from("a week")), - schedule: Some(String::from("daily")), - }) - .await - .unwrap_err(); - - // remove the policy - update_retention_policy_cli(RetentionPolicyArgs { + // remove retention policy + let args = UpdateIndexArgs { + client_args: test_env.default_client_args(), index_id, - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - disable: true, - period: None, - schedule: None, - }) - .await - .unwrap(); + index_config_uri: test_env.resource_files.index_config.clone(), + assume_yes: true, + }; + update_index_cli(args).await.unwrap(); let index_metadata = test_env.index_metadata().await.unwrap(); + assert_eq!(index_metadata.index_id(), test_env.index_id); assert_eq!(index_metadata.index_config.retention_policy_opt, None); } @@ -620,10 +576,7 @@ async fn test_delete_index_cli_dry_run() { }; let create_delete_args = |dry_run| DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), dry_run, assume_yes: true, @@ -647,7 +600,7 @@ async fn test_delete_index_cli_dry_run() { .unwrap(); assert!(metastore.index_exists(&index_id).await.unwrap()); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -673,15 +626,12 @@ async fn test_delete_index_cli() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); let args = DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), assume_yes: true, dry_run: false, @@ -702,7 +652,7 @@ async fn test_garbage_collect_cli_no_grace() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); let index_uid = test_env.index_metadata().await.unwrap().index_uid; - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -724,7 +674,7 @@ async fn test_garbage_collect_cli_no_grace() { }; let create_gc_args = |dry_run| GarbageCollectIndexArgs { - config_uri: test_env.config_uri.clone(), + config_uri: test_env.resource_files.config.clone(), index_id: index_id.clone(), grace_period: Duration::from_secs(3600), dry_run, @@ -792,10 +742,7 @@ async fn test_garbage_collect_cli_no_grace() { ); let args = DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id, dry_run: false, assume_yes: true, @@ -815,7 +762,7 @@ async fn test_garbage_collect_index_cli() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); let index_uid = test_env.index_metadata().await.unwrap().index_uid; - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -832,7 +779,7 @@ async fn test_garbage_collect_index_cli() { }; let create_gc_args = |grace_period_secs| GarbageCollectIndexArgs { - config_uri: test_env.config_uri.clone(), + config_uri: test_env.resource_files.config.clone(), index_id: index_id.clone(), grace_period: Duration::from_secs(grace_period_secs), dry_run: false, @@ -967,7 +914,7 @@ async fn test_all_local_index() { .unwrap(); assert!(metadata_file_exists); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -997,10 +944,7 @@ async fn test_all_local_index() { assert_eq!(search_stream_response, "72057597000000\n72057608000000\n"); let args = DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id, dry_run: false, assume_yes: true, @@ -1028,7 +972,7 @@ async fn test_all_with_s3_localstack_cli() { let s3_path = upload_test_file( test_env.storage_resolver.clone(), - test_env.resource_files["logs"].clone(), + test_env.resource_files.log_docs.clone(), "quickwit-integration-tests", "sources/", &append_random_suffix("test-all--cli-s3-localstack"), @@ -1039,10 +983,7 @@ async fn test_all_with_s3_localstack_cli() { // Cli search let args = SearchIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), query: "level:info".to_string(), aggregation: None, @@ -1072,10 +1013,7 @@ async fn test_all_with_s3_localstack_cli() { assert_eq!(result["num_hits"], Value::Number(Number::from(2i64))); let args = DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), dry_run: false, assume_yes: true, diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index 1ff7fb8234e..034b0388fc2 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -18,7 +18,6 @@ // along with this program. If not, see . use std::borrow::Borrow; -use std::collections::HashMap; use std::fs; use std::path::PathBuf; use std::str::FromStr; @@ -27,6 +26,7 @@ use std::sync::Arc; use anyhow::Context; use predicates::str; use quickwit_cli::service::RunCliCommand; +use quickwit_cli::ClientArgs; use quickwit_common::net::find_available_tcp_port; use quickwit_common::test_utils::wait_for_server_ready; use quickwit_common::uri::Uri; @@ -81,6 +81,14 @@ const DEFAULT_INDEX_CONFIG: &str = r#" default_search_fields: [event] "#; +const RETENTION_CONFIG: &str = r#" + + retention: + period: 1 week + schedule: daily + +"#; + const DEFAULT_QUICKWIT_CONFIG: &str = r#" version: 0.8 metastore_uri: #metastore_uri @@ -103,6 +111,15 @@ const WIKI_JSON_DOCS: &str = r#"{"body": "foo", "title": "shimroy", "url": "http {"body": "biz", "title": "modern", "url": "https://wiki.com?id=13"} "#; +pub struct TestResourceFiles { + pub config: Uri, + pub index_config: Uri, + pub index_config_without_uri: Uri, + pub index_config_with_retention: Uri, + pub log_docs: PathBuf, + pub wikipedia_docs: PathBuf, +} + /// A struct to hold few info about the test environment. pub struct TestEnv { /// The temporary directory of the test. @@ -112,14 +129,14 @@ pub struct TestEnv { /// Path of the directory where indexes are stored. pub indexes_dir_path: PathBuf, /// Resource files needed for the test. - pub resource_files: HashMap<&'static str, PathBuf>, + pub resource_files: TestResourceFiles, /// The metastore URI. pub metastore_uri: Uri, pub metastore_resolver: MetastoreResolver, pub metastore: MetastoreServiceClient, - pub config_uri: Uri, + pub cluster_endpoint: Url, - pub index_config_uri: Uri, + /// The index ID. pub index_id: IndexId, pub index_uri: Uri, @@ -137,12 +154,6 @@ impl TestEnv { .unwrap() } - pub fn index_config_without_uri(&self) -> String { - self.resource_files["index_config_without_uri"] - .display() - .to_string() - } - pub async fn index_metadata(&self) -> anyhow::Result { let index_metadata = self .metastore() @@ -155,7 +166,7 @@ impl TestEnv { pub async fn start_server(&self) -> anyhow::Result<()> { let run_command = RunCliCommand { - config_uri: self.config_uri.clone(), + config_uri: self.resource_files.config.clone(), services: Some(QuickwitService::supported_services()), }; tokio::spawn(async move { @@ -169,6 +180,13 @@ impl TestEnv { wait_for_server_ready(([127, 0, 0, 1], self.rest_listen_port).into()).await?; Ok(()) } + + pub fn default_client_args(&self) -> ClientArgs { + ClientArgs { + cluster_endpoint: self.cluster_endpoint.clone(), + ..Default::default() + } + } } pub enum TestStorageType { @@ -176,6 +194,10 @@ pub enum TestStorageType { LocalFileSystem, } +fn uri_from_path(path: PathBuf) -> Uri { + Uri::from_str(&format!("file://{}", path.display())).unwrap() +} + /// Creates all necessary artifacts in a test environment. pub async fn create_test_env( index_id: IndexId, @@ -216,6 +238,14 @@ pub async fn create_test_env( .replace("#index_id", &index_id) .replace("index_uri: #index_uri\n", ""), )?; + let index_config_with_retention_path = + resources_dir_path.join("index_config_with_retention.yaml"); + fs::write( + &index_config_with_retention_path, + format!("{DEFAULT_INDEX_CONFIG}{RETENTION_CONFIG}") + .replace("#index_id", &index_id) + .replace("#index_uri", index_uri.as_str()), + )?; let node_config_path = resources_dir_path.join("config.yaml"); let rest_listen_port = find_available_tcp_port()?; let grpc_listen_port = find_available_tcp_port()?; @@ -233,23 +263,18 @@ pub async fn create_test_env( let wikipedia_docs_path = resources_dir_path.join("wikis.json"); fs::write(&wikipedia_docs_path, WIKI_JSON_DOCS)?; - let mut resource_files = HashMap::new(); - resource_files.insert("config", node_config_path); - resource_files.insert("index_config", index_config_path); - resource_files.insert("index_config_without_uri", index_config_without_uri_path); - resource_files.insert("logs", log_docs_path); - resource_files.insert("wiki", wikipedia_docs_path); - - let config_uri = - Uri::from_str(&format!("file://{}", resource_files["config"].display())).unwrap(); - let index_config_uri = Uri::from_str(&format!( - "file://{}", - resource_files["index_config"].display() - )) - .unwrap(); let cluster_endpoint = Url::parse(&format!("http://localhost:{rest_listen_port}")) .context("failed to parse cluster endpoint")?; + let resource_files = TestResourceFiles { + config: uri_from_path(node_config_path), + index_config: uri_from_path(index_config_path), + index_config_without_uri: uri_from_path(index_config_without_uri_path), + index_config_with_retention: uri_from_path(index_config_with_retention_path), + log_docs: log_docs_path, + wikipedia_docs: wikipedia_docs_path, + }; + Ok(TestEnv { _temp_dir: temp_dir, data_dir_path, @@ -258,9 +283,7 @@ pub async fn create_test_env( metastore_uri, metastore_resolver, metastore, - config_uri, cluster_endpoint, - index_config_uri, index_id, index_uri, rest_listen_port, diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 6e8b5332f1b..9fa0eee0ec9 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -39,7 +39,7 @@ use quickwit_common::Progress; use quickwit_config::service::QuickwitService; use quickwit_config::{ClusterConfig, IndexConfig, IndexTemplate, SourceConfig}; use quickwit_ingest::{IngesterPool, LocalShardsUpdate}; -use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt}; +use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadataResponseExt}; use quickwit_proto::control_plane::{ AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest, @@ -582,6 +582,15 @@ impl Handler for ControlPlane { return convert_metastore_error(metastore_error); } }; + let index_metadata = match response.deserialize_index_metadata() { + Ok(index_metadata) => index_metadata, + Err(serde_error) => { + error!(error=?serde_error, "failed to deserialize index metadata"); + return Err(ActorExitStatus::from(anyhow::anyhow!(serde_error))); + } + }; + self.model + .update_index_config(&index_uid, index_metadata.index_config)?; // TODO: Handle doc mapping and/or indexing settings update here. info!(%index_uid, "updated index"); Ok(Ok(response)) diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 229f9275558..2a4fc5c9616 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -29,7 +29,7 @@ use anyhow::bail; use fnv::{FnvHashMap, FnvHashSet}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::Progress; -use quickwit_config::SourceConfig; +use quickwit_config::{IndexConfig, SourceConfig}; use quickwit_ingest::ShardInfos; use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt}; use quickwit_proto::control_plane::ControlPlaneResult; @@ -194,6 +194,21 @@ impl ControlPlaneModel { self.update_metrics(); } + /// Update the configuration of the specified index, returning an error if + /// the index didn't exist. + pub(crate) fn update_index_config( + &mut self, + index_uid: &IndexUid, + index_config: IndexConfig, + ) -> anyhow::Result<()> { + let Some(index_model) = self.index_table.get_mut(index_uid) else { + bail!("index `{}` not found", index_uid.index_id); + }; + index_model.index_config = index_config; + self.update_metrics(); + Ok(()) + } + pub(crate) fn delete_index(&mut self, index_uid: &IndexUid) { self.index_table.remove(index_uid); self.index_uid_table.remove(&index_uid.index_id); @@ -529,6 +544,27 @@ mod tests { assert_eq!(model.shard_table.get_shards(&source_uid).unwrap().len(), 0); } + #[test] + fn test_control_plane_model_update_index_config() { + let mut model = ControlPlaneModel::default(); + let index_metadata = IndexMetadata::for_test("test-index", "ram:///indexes"); + let index_uid = index_metadata.index_uid.clone(); + model.add_index(index_metadata.clone()); + + // Update the index config + let mut index_config = index_metadata.index_config.clone(); + index_config.search_settings.default_search_fields = vec!["myfield".to_string()]; + model + .update_index_config(&index_uid, index_config.clone()) + .unwrap(); + + assert_eq!(model.index_table.len(), 1); + assert_eq!( + model.index_table.get(&index_uid).unwrap().index_config, + index_config + ); + } + #[test] fn test_control_plane_model_delete_index() { let mut model = ControlPlaneModel::default(); diff --git a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs index 5ddc8a034ec..d9bfdbac5eb 100644 --- a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs @@ -21,9 +21,8 @@ use std::collections::HashSet; use std::time::Duration; use quickwit_config::service::QuickwitService; -use quickwit_config::SearchSettings; use quickwit_rest_client::rest_client::CommitType; -use quickwit_serve::{IndexUpdates, SearchRequestQueryString}; +use quickwit_serve::SearchRequestQueryString; use serde_json::json; use crate::ingest_json; @@ -121,12 +120,21 @@ async fn test_update_on_multi_nodes_cluster() { .indexes() .update( "my-updatable-index", - IndexUpdates { - search_settings: SearchSettings { - default_search_fields: vec!["title".to_string(), "body".to_string()], - }, - retention_policy_opt: None, - }, + r#" + version: 0.8 + index_id: my-updatable-index + doc_mapping: + field_mappings: + - name: title + type: text + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + search_settings: + default_search_fields: [title, body] + "#, + quickwit_config::ConfigFormat::Yaml, ) .await .unwrap(); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 567ae67918f..638c450eb4b 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,7 +30,9 @@ use std::ops::Bound; use itertools::Itertools; use quickwit_common::pretty::PrettySample; -use quickwit_config::{RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID}; +use quickwit_config::{ + IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID, +}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, @@ -223,6 +225,11 @@ impl FileBackedIndex { self.metadata.set_search_settings(search_settings) } + /// Replaces the indexing settings in the index config, returning whether a mutation occurred. + pub fn set_indexing_settings(&mut self, search_settings: IndexingSettings) -> bool { + self.metadata.set_indexing_settings(search_settings) + } + /// Stages a single split. /// /// If a split already exists and is in the [SplitState::Staged] state, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 244033f8177..57f2091f7af 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -504,12 +504,14 @@ impl MetastoreService for FileBackedMetastore { ) -> MetastoreResult { let retention_policy_opt = request.deserialize_retention_policy()?; let search_settings = request.deserialize_search_settings()?; + let indexing_settings = request.deserialize_indexing_settings()?; let index_uid = request.index_uid(); let index_metadata = self .mutate(index_uid, |index| { let mut mutation_occurred = index.set_retention_policy(retention_policy_opt); mutation_occurred |= index.set_search_settings(search_settings); + mutation_occurred |= index.set_indexing_settings(indexing_settings); let index_metadata = index.metadata().clone(); diff --git a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs index 36db32f24e7..0ec8e750aa8 100644 --- a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs @@ -24,7 +24,8 @@ use std::collections::{BTreeMap, HashMap}; use quickwit_common::uri::Uri; use quickwit_config::{ - IndexConfig, RetentionPolicy, SearchSettings, SourceConfig, TestableForRegression, + IndexConfig, IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, + TestableForRegression, }; use quickwit_proto::metastore::{EntityKind, MetastoreError, MetastoreResult}; use quickwit_proto::types::{IndexUid, Position, SourceId}; @@ -111,7 +112,7 @@ impl IndexMetadata { } } - /// Replaces or removes the current search settings, returning whether a mutation occurred. + /// Replaces the current search settings, returning whether a mutation occurred. pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool { if self.index_config.search_settings != search_settings { self.index_config.search_settings = search_settings; @@ -121,6 +122,16 @@ impl IndexMetadata { } } + /// Replaces the current indexing settings, returning whether a mutation occurred. + pub fn set_indexing_settings(&mut self, indexing_settings: IndexingSettings) -> bool { + if self.index_config.indexing_settings != indexing_settings { + self.index_config.indexing_settings = indexing_settings; + true + } else { + false + } + } + /// Adds a source to the index. Returns an error if the source already exists. pub fn add_source(&mut self, source_config: SourceConfig) -> MetastoreResult<()> { match self.sources.entry(source_config.source_id.clone()) { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 3ca762f6068..7378faa94fc 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -32,7 +32,9 @@ use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; use quickwit_common::thread_pool::run_cpu_intensive; -use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig}; +use quickwit_config::{ + IndexConfig, IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, +}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteTask, @@ -188,6 +190,7 @@ pub trait UpdateIndexRequestExt { index_uid: impl Into, search_settings: &SearchSettings, retention_policy_opt: &Option, + indexing_settings: &IndexingSettings, ) -> MetastoreResult; /// Deserializes the `search_settings_json` field of an [`UpdateIndexRequest`] into a @@ -197,6 +200,10 @@ pub trait UpdateIndexRequestExt { /// Deserializes the `retention_policy_json` field of an [`UpdateIndexRequest`] into a /// [`RetentionPolicy`] object. fn deserialize_retention_policy(&self) -> MetastoreResult>; + + /// Deserializes the `indexing_settings_json` field of an [`UpdateIndexRequest`] into a + /// [`IndexingSettings`] object. + fn deserialize_indexing_settings(&self) -> MetastoreResult; } impl UpdateIndexRequestExt for UpdateIndexRequest { @@ -204,17 +211,20 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { index_uid: impl Into, search_settings: &SearchSettings, retention_policy_opt: &Option, + indexing_settings: &IndexingSettings, ) -> MetastoreResult { - let search_settings_json = serde_utils::to_json_str(&search_settings)?; + let search_settings_json = serde_utils::to_json_str(search_settings)?; let retention_policy_json = retention_policy_opt .as_ref() .map(serde_utils::to_json_str) .transpose()?; + let indexing_settings_json = serde_utils::to_json_str(indexing_settings)?; let update_request = UpdateIndexRequest { index_uid: Some(index_uid.into()), search_settings_json, retention_policy_json, + indexing_settings_json, }; Ok(update_request) } @@ -229,6 +239,10 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { .map(|policy| serde_utils::from_json_str(policy)) .transpose() } + + fn deserialize_indexing_settings(&self) -> MetastoreResult { + serde_utils::from_json_str(&self.indexing_settings_json) + } } /// Helper trait to build a [`IndexMetadataResponse`] and deserialize its payload. diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index c6fcfb45ada..6efeeec02be 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -398,12 +398,14 @@ impl MetastoreService for PostgresqlMetastore { ) -> MetastoreResult { let retention_policy_opt = request.deserialize_retention_policy()?; let search_settings = request.deserialize_search_settings()?; + let indexing_settings = request.deserialize_indexing_settings()?; let index_uid: IndexUid = request.index_uid().clone(); let updated_index_metadata = run_with_tx!(self.connection_pool, tx, { mutate_index_metadata::(tx, index_uid, |index_metadata| { let mut mutation_occurred = index_metadata.set_retention_policy(retention_policy_opt); mutation_occurred |= index_metadata.set_search_settings(search_settings); + mutation_occurred |= index_metadata.set_indexing_settings(indexing_settings); Ok(MutationOccurred::from(mutation_occurred)) }) .await diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 48dbcd14655..84aef4e1846 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -148,6 +148,7 @@ pub async fn test_metastore_update_index< index_uid.clone(), &new_search_setting, &loop_retention_policy_opt, + &index_metadata.index_config.indexing_settings, ) .unwrap(); let response_metadata = metastore diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index ac4e35b0817..084dccad417 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -215,6 +215,7 @@ message UpdateIndexRequest { quickwit.common.IndexUid index_uid = 1; string search_settings_json = 2; optional string retention_policy_json = 3; + string indexing_settings_json = 4; } message ListIndexesMetadataRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index d3537e02799..76abf454b70 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -30,6 +30,8 @@ pub struct UpdateIndexRequest { pub search_settings_json: ::prost::alloc::string::String, #[prost(string, optional, tag = "3")] pub retention_policy_json: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, tag = "4")] + pub indexing_settings_json: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index fe2f796a38c..ccbfc5f3166 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -26,9 +26,7 @@ use quickwit_indexing::actors::IndexingServiceCounters; pub use quickwit_ingest::CommitType; use quickwit_metastore::{IndexMetadata, Split, SplitInfo}; use quickwit_search::SearchResponseRest; -use quickwit_serve::{ - IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString, -}; +use quickwit_serve::{ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString}; use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; use reqwest::{Client, ClientBuilder, Method, StatusCode, Url}; use serde::Serialize; @@ -357,13 +355,22 @@ impl<'a> IndexClient<'a> { pub async fn update( &self, index_id: &str, - index_updates: IndexUpdates, + index_config: impl ToString, + config_format: ConfigFormat, ) -> Result { - let body = Bytes::from(serde_json::to_string(&index_updates)?); + let header_map = header_from_config_format(config_format); + let body = Bytes::from(index_config.to_string()); let path = format!("indexes/{index_id}"); let response = self .transport - .send::<()>(Method::PUT, &path, None, None, Some(body), self.timeout) + .send::<()>( + Method::PUT, + &path, + Some(header_map), + None, + Some(body), + self.timeout, + ) .await?; let index_metadata = response.deserialize().await?; Ok(index_metadata) diff --git a/quickwit/quickwit-serve/src/index_api/mod.rs b/quickwit/quickwit-serve/src/index_api/mod.rs index 64d58cddb94..a11878ac264 100644 --- a/quickwit/quickwit-serve/src/index_api/mod.rs +++ b/quickwit/quickwit-serve/src/index_api/mod.rs @@ -20,6 +20,6 @@ mod rest_handler; pub use self::rest_handler::{ - get_index_metadata_handler, index_management_handlers, IndexApi, IndexUpdates, - ListSplitsQueryParams, ListSplitsResponse, + get_index_metadata_handler, index_management_handlers, IndexApi, ListSplitsQueryParams, + ListSplitsResponse, }; diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index f5a783901d4..f5029fd31f9 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -19,12 +19,12 @@ use std::sync::Arc; +use anyhow::Context; use bytes::Bytes; use quickwit_common::uri::Uri; use quickwit_config::{ load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, NodeConfig, - RetentionPolicy, SearchSettings, SourceConfig, SourceParams, CLI_SOURCE_ID, - INGEST_API_SOURCE_ID, + SourceConfig, SourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, }; use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_index_management::{IndexService, IndexServiceError}; @@ -67,7 +67,7 @@ use crate::with_arg; toggle_source, delete_source, ), - components(schemas(ToggleSource, SplitsForDeletion, IndexStats, IndexUpdates)) + components(schemas(ToggleSource, SplitsForDeletion, IndexStats)) )] pub struct IndexApi; @@ -527,22 +527,14 @@ async fn create_index( .await } -/// The body of the index update request. All fields will be replaced in the -/// existing configuration. -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, utoipa::ToSchema)] -#[serde(deny_unknown_fields)] // Remove when adding new fields to allow to ensure forward compatibility -pub struct IndexUpdates { - pub search_settings: SearchSettings, - #[serde(rename = "retention_policy")] - pub retention_policy_opt: Option, -} - fn update_index_handler( metastore: MetastoreServiceClient, ) -> impl Filter + Clone { warp::path!("indexes" / String) .and(warp::put()) - .and(json_body()) + .and(extract_config_format()) + .and(warp::body::content_length_limit(1024 * 1024)) + .and(warp::filters::body::bytes()) .and(with_arg(metastore)) .then(update_index) .map(log_failure("failed to update index")) @@ -554,9 +546,9 @@ fn update_index_handler( put, tag = "Indexes", path = "/indexes/{index_id}", - request_body = IndexUpdates, + request_body = VersionedIndexConfig, responses( - (status = 200, description = "Successfully updated the index configuration.") + (status = 200, description = "Successfully updated the index configuration.", body = VersionedIndexMetadata) ), params( ("index_id" = String, Path, description = "The index ID to update."), @@ -564,26 +556,64 @@ fn update_index_handler( )] /// Updates an existing index. /// -/// This endpoint has PUT semantics, which means that all the updatable fields of the index -/// configuration are replaced by the values specified in the request. In particular, omitting an -/// optional field like `retention_policy` will delete the associated configuration. +/// This endpoint follows PUT semantics, which means that all the fields of the +/// current configuration are replaced by the values specified in this request +/// or the associated defaults. In particular if the field is optional (e.g +/// `retention_policy`), omitting it will delete the associated configuration. +/// If the new configuration file contains updates that cannot be applied, the +/// request fails and none of the updates are applied. async fn update_index( index_id: IndexId, - request: IndexUpdates, + config_format: ConfigFormat, + index_config_bytes: Bytes, mut metastore: MetastoreServiceClient, ) -> Result { info!(index_id = %index_id, "update-index"); + let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); - let index_uid: IndexUid = metastore + let current_index_metadata = metastore .index_metadata(index_metadata_request) .await? - .deserialize_index_metadata()? - .index_uid; + .deserialize_index_metadata()?; + let index_uid = current_index_metadata.index_uid.clone(); + let current_index_config = current_index_metadata.into_index_config(); + + let new_index_config = quickwit_config::load_index_config_from_user_config( + config_format, + &index_config_bytes, + ¤t_index_config + .index_uri + .parent() + .context("Unexpected index_uri format on current configuration") + .map_err(IndexServiceError::InvalidConfig)?, + ) + .map_err(IndexServiceError::InvalidConfig)?; + + if new_index_config.index_id != index_id { + return Err(IndexServiceError::OperationNotAllowed(format!( + "index_id in config file {} does not match updated index_id {}", + current_index_config.index_uri, new_index_config.index_uri + ))); + } + + if current_index_config.index_uri != new_index_config.index_uri { + return Err(IndexServiceError::OperationNotAllowed(format!( + "index_uri cannot be updated, current value {}, new expected value {}", + current_index_config.index_uri, new_index_config.index_uri + ))); + } + + if current_index_config.doc_mapping != new_index_config.doc_mapping { + return Err(IndexServiceError::OperationNotAllowed( + "doc_mapping cannot be updated".to_string(), + )); + } let update_request = UpdateIndexRequest::try_from_updates( index_uid, - &request.search_settings, - &request.retention_policy_opt, + &new_index_config.search_settings, + &new_index_config.retention_policy_opt, + &new_index_config.indexing_settings, )?; let update_resp = metastore.update_index(update_request).await?; Ok(update_resp.deserialize_index_metadata()?) @@ -1811,7 +1841,7 @@ mod tests { .path("/indexes/hdfs-logs") .method("PUT") .json(&true) - .body(r#"{"search_settings":{"default_search_fields":["severity_text","body"]}}"#) + .body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"field_mappings":[{"name": "timestamp", "type": "i64", "fast": true, "indexed": true}]},"search_settings":{"default_search_fields":["severity_text", "body"]}}"#) .reply(&index_management_handler) .await; assert_eq!(resp.status(), 200); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 22c6730fc43..0eed328d286 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -120,7 +120,7 @@ use tracing::{debug, error, info, warn}; use warp::{Filter, Rejection}; pub use crate::build_info::{BuildInfo, RuntimeInfo}; -pub use crate::index_api::{IndexUpdates, ListSplitsQueryParams, ListSplitsResponse}; +pub use crate::index_api::{ListSplitsQueryParams, ListSplitsResponse}; pub use crate::metrics::SERVE_METRICS; use crate::rate_modulator::RateModulator; #[cfg(test)]