Skip to content

Commit

Permalink
perf(storage): allow dynamic enable/disable minitrace for tiered cache (
Browse files Browse the repository at this point in the history
#17570)

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jul 4, 2024
1 parent 936362e commit 7159f9a
Show file tree
Hide file tree
Showing 12 changed files with 298 additions and 79 deletions.
98 changes: 35 additions & 63 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.10.0", features = ["nightly"] }
foyer = { version = "0.10.0", features = ["nightly", "mtrace"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
Expand Down Expand Up @@ -178,10 +178,10 @@ tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git"
"profiling",
"stats",
], rev = "64a2d9" }
opentelemetry = "0.21"
opentelemetry-otlp = "0.14"
opentelemetry_sdk = { version = "0.21", default-features = false }
opentelemetry-semantic-conventions = "0.13"
opentelemetry = "0.23"
opentelemetry-otlp = "0.16"
opentelemetry_sdk = { version = "0.23", default-features = false }
opentelemetry-semantic-conventions = "0.15"
parking_lot = { version = "0.12", features = [
"arc_lock",
"deadlock_detection",
Expand All @@ -194,7 +194,7 @@ sea-orm = { version = "0.12.14", features = [
] }
sqlx = "0.7"
tokio-util = "0.7"
tracing-opentelemetry = "0.22"
tracing-opentelemetry = "0.24"
rand = { version = "0.8", features = ["small_rng"] }
risingwave_backup = { path = "./src/storage/backup" }
risingwave_batch = { path = "./src/batch" }
Expand Down
12 changes: 12 additions & 0 deletions proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,23 @@ message GetBackPressureResponse {
repeated BackPressureInfo back_pressure_infos = 1;
}

message TieredCacheTracingRequest {
bool enable = 1;
optional uint32 record_hybrid_insert_threshold_ms = 2;
optional uint32 record_hybrid_get_threshold_ms = 3;
optional uint32 record_hybrid_obtain_threshold_ms = 4;
optional uint32 record_hybrid_remove_threshold_ms = 5;
optional uint32 record_hybrid_fetch_threshold_ms = 6;
}

message TieredCacheTracingResponse {}

service MonitorService {
rpc StackTrace(StackTraceRequest) returns (StackTraceResponse);
rpc Profiling(ProfilingRequest) returns (ProfilingResponse);
rpc HeapProfiling(HeapProfilingRequest) returns (HeapProfilingResponse);
rpc ListHeapProfiling(ListHeapProfilingRequest) returns (ListHeapProfilingResponse);
rpc AnalyzeHeap(AnalyzeHeapRequest) returns (AnalyzeHeapResponse);
rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse);
rpc TieredCacheTracing(TieredCacheTracingRequest) returns (TieredCacheTracingResponse);
}
79 changes: 77 additions & 2 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,52 @@ use std::fs;
use std::path::Path;
use std::time::Duration;

use foyer::HybridCache;
use itertools::Itertools;
use prometheus::core::Collector;
use risingwave_common::config::{MetricLevel, ServerConfig};
use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces;
use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
use risingwave_pb::monitor_service::{
AnalyzeHeapRequest, AnalyzeHeapResponse, BackPressureInfo, GetBackPressureRequest,
GetBackPressureResponse, HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest,
ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest,
StackTraceResponse,
StackTraceResponse, TieredCacheTracingRequest, TieredCacheTracingResponse,
};
use risingwave_rpc_client::error::ToTonicStatus;
use risingwave_storage::hummock::compactor::await_tree_key::Compaction;
use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
use risingwave_stream::executor::monitor::global_streaming_metrics;
use risingwave_stream::task::await_tree_key::{Actor, BarrierAwait};
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tonic::{Code, Request, Response, Status};

type MetaCache = HybridCache<HummockSstableObjectId, Box<Sstable>>;
type BlockCache = HybridCache<SstableBlockIndex, Box<Block>>;

#[derive(Clone)]
pub struct MonitorServiceImpl {
stream_mgr: LocalStreamManager,
server_config: ServerConfig,
meta_cache: Option<MetaCache>,
block_cache: Option<BlockCache>,
}

impl MonitorServiceImpl {
pub fn new(stream_mgr: LocalStreamManager, server_config: ServerConfig) -> Self {
pub fn new(
stream_mgr: LocalStreamManager,
server_config: ServerConfig,
meta_cache: Option<MetaCache>,
block_cache: Option<BlockCache>,
) -> Self {
Self {
stream_mgr,
server_config,
meta_cache,
block_cache,
}
}
}
Expand Down Expand Up @@ -303,6 +318,66 @@ impl MonitorService for MonitorServiceImpl {
back_pressure_infos,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn tiered_cache_tracing(
&self,
request: Request<TieredCacheTracingRequest>,
) -> Result<Response<TieredCacheTracingResponse>, Status> {
let req = request.into_inner();

tracing::info!("Update tiered cache tracing config: {req:?}");

if let Some(cache) = &self.meta_cache {
if req.enable {
cache.enable_tracing();
} else {
cache.disable_tracing();
}
let config = cache.tracing_config();
if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
config.set_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_get_threshold_ms {
config.set_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
config.set_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
config.set_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
config.set_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
}
}

if let Some(cache) = &self.block_cache {
if req.enable {
cache.enable_tracing();
} else {
cache.disable_tracing();
}
let config = cache.tracing_config();
if let Some(threshold) = req.record_hybrid_insert_threshold_ms {
config.set_record_hybrid_insert_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_get_threshold_ms {
config.set_record_hybrid_get_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_obtain_threshold_ms {
config.set_record_hybrid_obtain_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_remove_threshold_ms {
config.set_record_hybrid_remove_threshold(Duration::from_millis(threshold as _));
}
if let Some(threshold) = req.record_hybrid_fetch_threshold_ms {
config.set_record_hybrid_fetch_threshold(Duration::from_millis(threshold as _));
}
}

Ok(Response::new(TieredCacheTracingResponse::default()))
}
}

pub use grpc_middleware::*;
Expand Down
Loading

0 comments on commit 7159f9a

Please sign in to comment.