Skip to content

Commit

Permalink
Merge branch 'main' into break_pipelines3
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Nov 1, 2023
2 parents ac972ed + a6eeba9 commit 6918fc3
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 13 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ http = { workspace = true }
hyper = { workspace = true }
itertools = { workspace = true }
mockall = { workspace = true, optional = true }
once_cell = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl ControlPlane {
let indexing_scheduler = IndexingScheduler::new(
cluster_id.clone(),
self_node_id.clone(),
metastore.clone(),
indexer_pool.clone(),
);
let ingest_controller =
Expand Down Expand Up @@ -121,6 +120,7 @@ impl Actor for ControlPlane {
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
crate::metrics::CONTROL_PLANE_METRICS.restart_total.inc();
self.model
.load_from_metastore(&mut self.metastore, ctx.progress())
.await
Expand Down Expand Up @@ -190,11 +190,17 @@ fn convert_metastore_error<T>(
// It will be up to the client to decide what to do there.
error!(err=?metastore_error, transaction_outcome="aborted", "metastore error");
}
crate::metrics::CONTROL_PLANE_METRICS
.metastore_error_aborted
.inc();
Ok(Err(ControlPlaneError::Metastore(metastore_error)))
} else {
// If the metastore transaction may have been executed, we need to restart the control plane
// so that it gets resynced with the metastore state.
error!(err=?metastore_error, transaction_outcome="maybe-executed", "metastore error");
crate::metrics::CONTROL_PLANE_METRICS
.metastore_error_maybe_executed
.inc();
Err(ActorExitStatus::from(anyhow::anyhow!(metastore_error)))
}
}
Expand Down
15 changes: 4 additions & 11 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::time::{Duration, Instant};
use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask};
use quickwit_proto::metastore::{MetastoreServiceClient, SourceType};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::NodeId;
use scheduling::{SourceToSchedule, SourceToScheduleType};
use serde::Serialize;
Expand Down Expand Up @@ -99,7 +99,6 @@ pub struct IndexingSchedulerState {
pub struct IndexingScheduler {
cluster_id: String,
self_node_id: NodeId,
metastore: MetastoreServiceClient,
indexer_pool: IndexerPool,
state: IndexingSchedulerState,
}
Expand All @@ -109,7 +108,6 @@ impl fmt::Debug for IndexingScheduler {
f.debug_struct("IndexingScheduler")
.field("cluster_id", &self.cluster_id)
.field("node_id", &self.self_node_id)
.field("metastore", &self.metastore)
.field(
"last_applied_plan_ts",
&self.state.last_applied_plan_timestamp,
Expand Down Expand Up @@ -170,16 +168,10 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
}

impl IndexingScheduler {
pub fn new(
cluster_id: String,
self_node_id: NodeId,
metastore: MetastoreServiceClient,
indexer_pool: IndexerPool,
) -> Self {
Self {
pub fn new(cluster_id: String, self_node_id: NodeId, indexer_pool: IndexerPool) -> Self {
IndexingScheduler {
cluster_id,
self_node_id,
metastore,
indexer_pool,
state: IndexingSchedulerState::default(),
}
Expand All @@ -192,6 +184,7 @@ impl IndexingScheduler {
// Should be called whenever a change in the list of index/shard
// has happened.
pub(crate) fn schedule_indexing_plan_if_needed(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();
let mut indexers = self.get_indexers_from_indexer_pool();
if indexers.is_empty() {
warn!("No indexer available, cannot schedule an indexing plan.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl IngestController {
ingester_pool: IngesterPool,
replication_factor: usize,
) -> Self {
Self {
IngestController {
metastore,
ingester_pool,
replication_factor,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(crate) mod control_plane_model;
pub mod indexing_plan;
pub mod indexing_scheduler;
pub mod ingest;
pub(crate) mod metrics;

use quickwit_common::tower::Pool;
use quickwit_proto::indexing::{IndexingServiceClient, IndexingTask};
Expand Down
60 changes: 60 additions & 0 deletions quickwit/quickwit-control-plane/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use once_cell::sync::Lazy;
use quickwit_common::metrics::{new_counter, IntCounter};

pub struct ControlPlaneMetrics {
pub restart_total: IntCounter,
pub schedule_total: IntCounter,
pub metastore_error_aborted: IntCounter,
pub metastore_error_maybe_executed: IntCounter,
}

impl Default for ControlPlaneMetrics {
fn default() -> Self {
ControlPlaneMetrics {
restart_total: new_counter(
"restart_total",
"Number of control plane restart.",
"quickwit_control_plane",
),
schedule_total: new_counter(
"schedule_total",
"Number of control plane `schedule` operation.",
"quickwit_control_plane",
),
metastore_error_aborted: new_counter(
"metastore_error_aborted",
"Number of aborted metastore transaction (= do not trigger a control plane \
restart)",
"quickwit_control_plane",
),
metastore_error_maybe_executed: new_counter(
"metastore_error_maybe_executed",
"Number of metastore transaction with an uncertain outcome (= do trigger a \
control plane restart)",
"quickwit_control_plane",
),
}
}
}

pub static CONTROL_PLANE_METRICS: Lazy<ControlPlaneMetrics> =
Lazy::new(ControlPlaneMetrics::default);

0 comments on commit 6918fc3

Please sign in to comment.