Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize and batch merge pipelines list splits queries #4968

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ docker-compose-up:
COMPOSE_PROFILES=$(DOCKER_SERVICES) docker compose -f docker-compose.yml up -d --remove-orphans --wait

docker-compose-down:
docker compose -f docker-compose.yml down --remove-orphans
docker compose -p quickwit down --remove-orphans

docker-compose-logs:
docker compose logs -f -t
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# first if they are already tagged latest and volumes if their content is
# incompatible with the latest version, as in case of postgres.

name: quickwit

networks:
default:
name: quickwit-network
Expand Down
15 changes: 8 additions & 7 deletions quickwit/quickwit-cli/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use quickwit_indexing::models::IndexingStatistics;
use quickwit_indexing::IndexingPipeline;
use quickwit_metastore::{IndexMetadata, Split, SplitState};
use quickwit_proto::search::{CountHits, SortField, SortOrder};
use quickwit_proto::types::IndexId;
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::rest_client::{CommitType, IngestEvent};
use quickwit_search::SearchResponseRest;
Expand Down Expand Up @@ -200,7 +201,7 @@ pub fn build_index_command() -> Command {
#[derive(Debug, Eq, PartialEq)]
pub struct ClearIndexArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub assume_yes: bool,
}

Expand All @@ -215,13 +216,13 @@ pub struct CreateIndexArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct DescribeIndexArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct IngestDocsArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub input_path_opt: Option<PathBuf>,
pub batch_size_limit_opt: Option<ByteSize>,
pub commit_type: CommitType,
Expand All @@ -230,7 +231,7 @@ pub struct IngestDocsArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct SearchIndexArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub query: String,
pub aggregation: Option<String>,
pub max_hits: usize,
Expand All @@ -245,7 +246,7 @@ pub struct SearchIndexArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct DeleteIndexArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub dry_run: bool,
pub assume_yes: bool,
}
Expand Down Expand Up @@ -528,7 +529,7 @@ where I: IntoIterator<Item = IndexConfig> {
#[derive(Tabled)]
struct IndexRow {
#[tabled(rename = "Index ID")]
index_id: String,
index_id: IndexId,
#[tabled(rename = "Index URI")]
index_uri: Uri,
}
Expand All @@ -548,7 +549,7 @@ pub async fn describe_index_cli(args: DescribeIndexArgs) -> anyhow::Result<()> {
}

pub struct IndexStats {
pub index_id: String,
pub index_id: IndexId,
pub index_uri: Uri,
pub num_published_splits: usize,
pub size_published_splits: ByteSize,
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/src/index/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use anyhow::{bail, Context};
use clap::{arg, ArgMatches, Command};
use colored::Colorize;
use quickwit_config::{RetentionPolicy, SearchSettings};
use quickwit_proto::types::IndexId;
use quickwit_serve::IndexUpdates;
use tracing::debug;

Expand Down Expand Up @@ -65,7 +66,7 @@ pub fn build_index_update_command() -> Command {
#[derive(Debug, Eq, PartialEq)]
pub struct RetentionPolicyArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub disable: bool,
pub period: Option<String>,
pub schedule: Option<String>,
Expand All @@ -74,7 +75,7 @@ pub struct RetentionPolicyArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct SearchSettingsArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub default_search_fields: Vec<String>,
}

Expand Down
23 changes: 12 additions & 11 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use itertools::Itertools;
use quickwit_common::uri::Uri;
use quickwit_config::{validate_identifier, ConfigFormat, SourceConfig};
use quickwit_metastore::checkpoint::SourceCheckpoint;
use quickwit_proto::types::{IndexId, SourceId};
use quickwit_storage::{load_file, StorageResolver};
use serde_json::Value as JsonValue;
use tabled::{Table, Tabled};
Expand Down Expand Up @@ -142,44 +143,44 @@ pub fn build_source_command() -> Command {
#[derive(Debug, Eq, PartialEq)]
pub struct CreateSourceArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub source_config_uri: Uri,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ToggleSourceArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
pub enable: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct DeleteSourceArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
pub assume_yes: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct DescribeSourceArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ListSourcesArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ResetCheckpointArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
pub assume_yes: bool,
}

Expand Down Expand Up @@ -469,7 +470,7 @@ where I: IntoIterator<Item = SourceConfig> {
#[derive(Tabled)]
struct SourceRow {
#[tabled(rename = "ID")]
source_id: String,
source_id: SourceId,
#[tabled(rename = "Type")]
source_type: String,
#[tabled(rename = "Enabled")]
Expand Down
11 changes: 6 additions & 5 deletions quickwit/quickwit-cli/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use clap::{arg, ArgMatches, Command};
use colored::Colorize;
use itertools::Itertools;
use quickwit_metastore::{Split, SplitState};
use quickwit_proto::types::{IndexId, SplitId};
use quickwit_serve::ListSplitsQueryParams;
use tabled::{Table, Tabled};
use time::{format_description, Date, OffsetDateTime, PrimitiveDateTime};
Expand Down Expand Up @@ -133,7 +134,7 @@ impl FromStr for OutputFormat {
#[derive(Debug, PartialEq)]
pub struct ListSplitArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub offset: Option<usize>,
pub limit: Option<usize>,
pub split_states: Option<Vec<SplitState>>,
Expand All @@ -147,16 +148,16 @@ pub struct ListSplitArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct MarkForDeletionArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub index_id: IndexId,
pub split_ids: Vec<String>,
pub assume_yes: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct DescribeSplitArgs {
pub client_args: ClientArgs,
pub index_id: String,
pub split_id: String,
pub index_id: IndexId,
pub split_id: SplitId,
pub verbose: bool,
}

Expand Down Expand Up @@ -470,7 +471,7 @@ fn parse_split_state(split_state_arg: &str) -> anyhow::Result<SplitState> {
#[derive(Tabled)]
struct SplitRow {
#[tabled(rename = "ID")]
split_id: String,
split_id: SplitId,
#[tabled(rename = "State")]
split_state: SplitState,
#[tabled(rename = "Num docs")]
Expand Down
27 changes: 12 additions & 15 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use quickwit_config::{
VecSourceParams, CLI_SOURCE_ID,
};
use quickwit_index_management::{clear_cache_directory, IndexService};
use quickwit_indexing::actors::{
IndexingService, MergePipeline, MergePipelineId, MergeSchedulerService,
};
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
Expand All @@ -52,7 +50,7 @@ use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::{CountHits, SearchResponse};
use quickwit_proto::types::{NodeId, PipelineUid};
use quickwit_proto::types::{IndexId, PipelineUid, SourceId, SplitId};
use quickwit_search::{single_node_search, SearchResponseRest};
use quickwit_serve::{
search_request_from_api_request, BodyFormat, SearchRequestQueryString, SortBy,
Expand Down Expand Up @@ -174,7 +172,7 @@ pub fn build_tool_command() -> Command {
#[derive(Debug, Eq, PartialEq)]
pub struct LocalIngestDocsArgs {
pub config_uri: Uri,
pub index_id: String,
pub index_id: IndexId,
pub input_path_opt: Option<PathBuf>,
pub input_format: SourceInputFormat,
pub overwrite: bool,
Expand All @@ -185,7 +183,7 @@ pub struct LocalIngestDocsArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct LocalSearchArgs {
pub config_uri: Uri,
pub index_id: String,
pub index_id: IndexId,
pub query: String,
pub aggregation: Option<String>,
pub max_hits: usize,
Expand All @@ -200,23 +198,23 @@ pub struct LocalSearchArgs {
#[derive(Debug, Eq, PartialEq)]
pub struct GarbageCollectIndexArgs {
pub config_uri: Uri,
pub index_id: String,
pub index_id: IndexId,
pub grace_period: Duration,
pub dry_run: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct MergeArgs {
pub config_uri: Uri,
pub index_id: String,
pub source_id: String,
pub index_id: IndexId,
pub source_id: SourceId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ExtractSplitArgs {
pub config_uri: Uri,
pub index_id: String,
pub split_id: String,
pub index_id: IndexId,
pub split_id: SplitId,
pub target_dir: PathBuf,
}

Expand Down Expand Up @@ -479,7 +477,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.await?;
let merge_pipeline_handle = indexing_server_mailbox
.ask_for_res(DetachMergePipeline {
pipeline_id: MergePipelineId::from(&pipeline_id),
pipeline_id: pipeline_id.merge_pipeline_id(),
})
.await?;
let indexing_pipeline_handle = indexing_server_mailbox
Expand Down Expand Up @@ -618,7 +616,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
.ask_for_res(DetachMergePipeline {
pipeline_id: MergePipelineId::from(&pipeline_id),
pipeline_id: pipeline_id.merge_pipeline_id(),
})
.await?;

Expand Down Expand Up @@ -931,9 +929,8 @@ impl ThroughputCalculator {
}

async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
let node_id: NodeId = config.node_id.clone().into();
let self_node = ClusterMember {
node_id,
node_id: config.node_id.clone(),
generation_id: quickwit_cluster::GenerationId::now(),
is_ready: false,
enabled_services: HashSet::new(),
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use quickwit_common::uri::Uri;
use quickwit_config::service::QuickwitService;
use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, MetastoreResolver};
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::IndexId;
use quickwit_storage::{Storage, StorageResolver};
use reqwest::Url;
use tempfile::{tempdir, TempDir};
Expand Down Expand Up @@ -120,7 +121,7 @@ pub struct TestEnv {
pub cluster_endpoint: Url,
pub index_config_uri: Uri,
/// The index ID.
pub index_id: String,
pub index_id: IndexId,
pub index_uri: Uri,
pub rest_listen_port: u16,
pub storage_resolver: StorageResolver,
Expand Down Expand Up @@ -177,7 +178,7 @@ pub enum TestStorageType {

/// Creates all necessary artifacts in a test environment.
pub async fn create_test_env(
index_id: String,
index_id: IndexId,
storage_type: TestStorageType,
) -> anyhow::Result<TestEnv> {
let temp_dir = tempdir()?;
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use quickwit_common::metrics::IntCounter;
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::types::NodeId;
use time::OffsetDateTime;

#[cfg(any(test, feature = "testsuite"))]
Expand Down Expand Up @@ -129,7 +128,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
let peer_seed_addrs = node_config.peer_seed_addrs().await?;
let indexing_tasks = Vec::new();

let node_id: NodeId = node_config.node_id.clone().into();
let node_id = node_config.node_id.clone();
let generation_id = GenerationId::now();
let is_ready = false;
let indexing_cpu_capacity = if node_config.is_service_enabled(QuickwitService::Indexer) {
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use anyhow::Context;
use quickwit_common::uri::Uri;
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
use tracing::info;

Expand Down Expand Up @@ -127,7 +128,8 @@ impl TryFrom<VersionedIndexConfig> for IndexConfig {
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct IndexConfigV0_8 {
pub index_id: String,
#[schema(value_type = String)]
pub index_id: IndexId,
#[schema(value_type = String)]
#[serde(default)]
pub index_uri: Option<Uri>,
Expand Down
Loading
Loading