diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 0bfd01dd4fd9..05561b6080ff 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -23,7 +23,7 @@ use smallvec::SmallVec; use snafu::ResultExt; use store_api::storage::RegionId; use strum::IntoStaticStr; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest}; use crate::cache::CacheManagerRef; @@ -88,6 +88,9 @@ pub struct WriteBufferManagerImpl { memory_used: AtomicUsize, /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables). memory_active: AtomicUsize, + /// Optional notifier. + /// The manager can wake up the worker once we free the write buffer. + notifier: Option>, } impl WriteBufferManagerImpl { @@ -98,9 +101,16 @@ impl WriteBufferManagerImpl { mutable_limit: Self::get_mutable_limit(global_write_buffer_size), memory_used: AtomicUsize::new(0), memory_active: AtomicUsize::new(0), + notifier: None, } } + /// Attaches a notifier to the manager. + pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self { + self.notifier = Some(notifier); + self + } + /// Returns memory usage of mutable memtables. pub fn mutable_usage(&self) -> usize { self.memory_active.load(Ordering::Relaxed) @@ -159,6 +169,12 @@ impl WriteBufferManager for WriteBufferManagerImpl { fn free_mem(&self, mem: usize) { self.memory_used.fetch_sub(mem, Ordering::Relaxed); + if let Some(notifier) = &self.notifier { + // Notifies the worker after the memory usage is decreased. When we drop the memtable + // outside of the worker, the worker may still stall requests because the memory usage + // is not updated. So we need to notify the worker to handle stalled requests again. + let _ = notifier.send(()); + } } fn memory_usage(&self) -> usize { @@ -786,6 +802,18 @@ mod tests { assert!(manager.should_flush_engine()); } + #[test] + fn test_manager_notify() { + let (sender, receiver) = watch::channel(()); + let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender); + manager.reserve_mem(500); + assert!(!receiver.has_changed().unwrap()); + manager.schedule_free_mem(500); + assert!(!receiver.has_changed().unwrap()); + manager.free_mem(500); + assert!(receiver.has_changed().unwrap()); + } + #[tokio::test] async fn test_schedule_empty() { let env = SchedulerEnv::new().await; diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index b9ef03f1c1d7..963330948955 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -34,9 +34,13 @@ lazy_static! { /// Global memtable dictionary size in bytes. pub static ref MEMTABLE_DICT_BYTES: IntGauge = register_int_gauge!("greptime_mito_memtable_dict_bytes", "mito memtable dictionary size in bytes").unwrap(); - /// Gauge for open regions - pub static ref REGION_COUNT: IntGauge = - register_int_gauge!("greptime_mito_region_count", "mito region count").unwrap(); + /// Gauge for open regions in each worker. + pub static ref REGION_COUNT: IntGaugeVec = + register_int_gauge_vec!( + "greptime_mito_region_count", + "mito region count in each worker", + &[WORKER_LABEL], + ).unwrap(); /// Elapsed time to handle requests. pub static ref HANDLE_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_mito_handle_request_elapsed", diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index c8aa1bb340fb..82b48bcebb29 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -51,7 +51,7 @@ use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::MemtableBuilderProvider; -use crate::metrics::WRITE_STALL_TOTAL; +use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL}; use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -130,9 +130,11 @@ impl WorkerGroup { object_store_manager: ObjectStoreManagerRef, plugins: Plugins, ) -> Result { - let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new( - config.global_write_buffer_size.as_bytes() as usize, - )); + let (flush_sender, flush_receiver) = watch::channel(()); + let write_buffer_manager = Arc::new( + WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize) + .with_notifier(flush_sender.clone()), + ); let puffin_manager_factory = PuffinManagerFactory::new( &config.index.aux_path, config.index.staging_size.as_bytes(), @@ -165,7 +167,6 @@ impl WorkerGroup { .build(), ); let time_provider = Arc::new(StdTimeProvider); - let (flush_sender, flush_receiver) = watch::channel(()); let workers = (0..config.num_workers) .map(|id| { @@ -265,10 +266,12 @@ impl WorkerGroup { listener: Option, time_provider: TimeProviderRef, ) -> Result { + let (flush_sender, flush_receiver) = watch::channel(()); let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| { - Arc::new(WriteBufferManagerImpl::new( - config.global_write_buffer_size.as_bytes() as usize, - )) + Arc::new( + WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize) + .with_notifier(flush_sender.clone()), + ) }); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); @@ -297,7 +300,6 @@ impl WorkerGroup { .write_cache(write_cache) .build(), ); - let (flush_sender, flush_receiver) = watch::channel(()); let workers = (0..config.num_workers) .map(|id| { WorkerStarter { @@ -401,6 +403,7 @@ impl WorkerStarter { let running = Arc::new(AtomicBool::new(true)); let now = self.time_provider.current_time_millis(); + let id_string = self.id.to_string(); let mut worker_thread = RegionWorkerLoop { id: self.id, config: self.config.clone(), @@ -436,7 +439,8 @@ impl WorkerStarter { last_periodical_check_millis: now, flush_sender: self.flush_sender, flush_receiver: self.flush_receiver, - stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]), + stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]), + region_count: REGION_COUNT.with_label_values(&[&id_string]), }; let handle = common_runtime::spawn_global(async move { worker_thread.run().await; @@ -623,6 +627,8 @@ struct RegionWorkerLoop { flush_receiver: watch::Receiver<()>, /// Gauge of stalled request count. stalled_count: IntGauge, + /// Gauge of regions in the worker. + region_count: IntGauge, } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 26a6f9a34dde..8e33fcb1eb9d 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -19,7 +19,6 @@ use store_api::region_request::AffectedRows; use store_api::storage::RegionId; use crate::error::Result; -use crate::metrics::REGION_COUNT; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -31,7 +30,7 @@ impl RegionWorkerLoop { return Ok(0); }; - info!("Try to close region {}", region_id); + info!("Try to close region {}, worker: {}", region_id, self.id); region.stop().await; self.regions.remove_region(region_id); @@ -40,9 +39,9 @@ impl RegionWorkerLoop { // Clean compaction status. self.compaction_scheduler.on_region_closed(region_id); - info!("Region {} closed", region_id); + info!("Region {} closed, worker: {}", region_id, self.id); - REGION_COUNT.dec(); + self.region_count.dec(); Ok(0) } diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index e99c0a810237..863435c7eabe 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -24,7 +24,6 @@ use store_api::region_request::{AffectedRows, RegionCreateRequest}; use store_api::storage::RegionId; use crate::error::{InvalidMetadataSnafu, Result}; -use crate::metrics::REGION_COUNT; use crate::region::opener::{check_recovered_region, RegionOpener}; use crate::worker::RegionWorkerLoop; @@ -70,9 +69,13 @@ impl RegionWorkerLoop { .create_or_open(&self.config, &self.wal) .await?; - info!("A new region created, region: {:?}", region.metadata()); + info!( + "A new region created, worker: {}, region: {:?}", + self.id, + region.metadata() + ); - REGION_COUNT.inc(); + self.region_count.inc(); // Insert the MitoRegion into the RegionMap. self.regions.insert_region(Arc::new(region)); diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index ca1466249759..51b42acb406f 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -28,7 +28,6 @@ use store_api::storage::RegionId; use tokio::time::sleep; use crate::error::{OpenDalSnafu, Result}; -use crate::metrics::REGION_COUNT; use crate::region::{RegionMapRef, RegionState}; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; @@ -45,7 +44,7 @@ where ) -> Result { let region = self.regions.writable_region(region_id)?; - info!("Try to drop region: {}", region_id); + info!("Try to drop region: {}, worker: {}", region_id, self.id); // Marks the region as dropping. region.set_dropping()?; @@ -93,7 +92,7 @@ where region_id ); - REGION_COUNT.dec(); + self.region_count.dec(); // Detaches a background task to delete the region dir let region_dir = region.access_layer.region_dir().to_owned(); diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 7fe1d3c322a1..fa4f48704009 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -26,7 +26,6 @@ use store_api::storage::RegionId; use crate::error::{ ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result, }; -use crate::metrics::REGION_COUNT; use crate::region::opener::RegionOpener; use crate::request::OptionOutputTx; use crate::wal::entry_distributor::WalEntryReceiver; @@ -56,7 +55,10 @@ impl RegionWorkerLoop { .context(OpenDalSnafu)? { let result = remove_region_dir_once(&request.region_dir, object_store).await; - info!("Region {} is dropped, result: {:?}", region_id, result); + info!( + "Region {} is dropped, worker: {}, result: {:?}", + region_id, self.id, result + ); return RegionNotFoundSnafu { region_id }.fail(); } @@ -84,7 +86,7 @@ impl RegionWorkerLoop { sender.send(Err(err)); return; } - info!("Try to open region {}", region_id); + info!("Try to open region {}, worker: {}", region_id, self.id); // Open region from specific region dir. let opener = match RegionOpener::new( @@ -112,12 +114,14 @@ impl RegionWorkerLoop { let wal = self.wal.clone(); let config = self.config.clone(); let opening_regions = self.opening_regions.clone(); + let region_count = self.region_count.clone(); + let worker_id = self.id; opening_regions.insert_sender(region_id, sender); common_runtime::spawn_global(async move { match opener.open(&config, &wal).await { Ok(region) => { - info!("Region {} is opened", region_id); - REGION_COUNT.inc(); + info!("Region {} is opened, worker: {}", region_id, worker_id); + region_count.inc(); // Insert the Region into the RegionMap. regions.insert_region(Arc::new(region));