From 3eb4e68147d3863955dde3c7daced623774897ab Mon Sep 17 00:00:00 2001 From: hustfisher Date: Fri, 11 Oct 2024 11:31:49 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=AF=B9=E5=BC=82=E5=B8=B8=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E8=BF=9B=E8=A1=8C=E6=8C=87=E6=A0=87=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=EF=BC=8C=E5=88=86=E8=AF=B7=E6=B1=82=E8=BF=87=E7=A8=8B=E5=92=8C?= =?UTF-8?q?=E6=9C=80=E7=BB=88=E5=93=8D=E5=BA=94=E4=B8=A4=E7=A7=8D=E5=9C=BA?= =?UTF-8?q?=E6=99=AF=EF=BC=8C=E6=8E=92=E9=99=A4mc=EF=BC=8C=E5=9B=A0?= =?UTF-8?q?=E4=B8=BA=E5=BC=82=E5=B8=B8=E5=9F=BA=E6=9C=AC=E6=98=AF=E8=AF=B7?= =?UTF-8?q?=E6=B1=82miss=E6=88=96del=20notfound?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- protocol/src/memcache/binary/mod.rs | 6 ++++++ protocol/src/parser.rs | 4 ++++ stream/src/handler.rs | 9 ++++++++- stream/src/pipeline.rs | 8 +++++++- 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/protocol/src/memcache/binary/mod.rs b/protocol/src/memcache/binary/mod.rs index 2e84e1813..d838db5e7 100644 --- a/protocol/src/memcache/binary/mod.rs +++ b/protocol/src/memcache/binary/mod.rs @@ -190,6 +190,12 @@ impl Protocol for MemcacheBinary { None } } + + // mc目前不需要统计error,因为mc的error基本都是get miss,del not-found这种,这种错误不需要统计 + #[inline] + fn metric_err(&self) -> bool { + false + } } impl MemcacheBinary { // 根据req构建response,status为mc协议status,共11种 diff --git a/protocol/src/parser.rs b/protocol/src/parser.rs index 97b1e7ddf..681f60c1c 100644 --- a/protocol/src/parser.rs +++ b/protocol/src/parser.rs @@ -128,6 +128,10 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static { fn max_tries(&self, _req_op: Operation) -> u8 { 1_u8 } + + fn metric_err(&self) -> bool { + true + } } pub trait RequestProcessor { diff --git a/stream/src/handler.rs b/stream/src/handler.rs index c809d8d9a..a8e72abff 100644 --- a/stream/src/handler.rs +++ b/stream/src/handler.rs @@ -19,6 +19,7 @@ pub struct Handler<'r, Req, P, S> { s: S, parser: P, rtt: Metric, + err: Metric, host_metric: HostMetric, num: Number, @@ -63,12 +64,14 @@ where data.enable(); let name = path.clone(); let rtt = path.rtt("req"); + let err = path.qps("err"); Self { data, pending: VecDeque::with_capacity(31), s, parser, rtt, + err, host_metric: HostMetric::from(path), num: Number::default(), req_buf: Vec::with_capacity(4), @@ -142,8 +145,12 @@ where } let (req, start) = self.pending.pop_front().expect("take response"); self.num.rx(); - // 统计请求耗时。 + // 统计请求耗时、异常响应 self.rtt += start.elapsed(); + if self.parser.metric_err() && !cmd.ok() { + self.err += 1; + } + self.parser.check(&*req, &cmd); req.on_complete(cmd); continue; diff --git a/stream/src/pipeline.rs b/stream/src/pipeline.rs index f0e8490c2..ee56ae19e 100644 --- a/stream/src/pipeline.rs +++ b/stream/src/pipeline.rs @@ -172,10 +172,16 @@ where let op = ctx.request().operation(); if let Some(rsp) = response { - if ctx.is_write_back() && rsp.ok() { + let rsp_ok = rsp.ok(); + if ctx.is_write_back() && rsp_ok { ctx.async_write_back(&self.parser, rsp, self.top.exp_sec(), &mut self.metrics); self.async_pending.push_back(ctx); } + + // 不区分是否last,这样更精确的感知总的异常响应数量 + if self.parser.metric_err() && !rsp_ok { + *self.metrics.err() += 1; + } } // 数据写完,统计耗时。当前数据只写入到buffer中, From f412973d43f21a287337fe990dde2504ddb82c55 Mon Sep 17 00:00:00 2001 From: hustfisher Date: Tue, 15 Oct 2024 18:04:29 +0800 Subject: [PATCH 2/2] update metrics name --- stream/src/handler.rs | 2 +- tests/src/bkdrsub.rs | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/stream/src/handler.rs b/stream/src/handler.rs index a8e72abff..270c2d1b9 100644 --- a/stream/src/handler.rs +++ b/stream/src/handler.rs @@ -64,7 +64,7 @@ where data.enable(); let name = path.clone(); let rtt = path.rtt("req"); - let err = path.qps("err"); + let err = path.qps("be_err"); Self { data, pending: VecDeque::with_capacity(31), diff --git a/tests/src/bkdrsub.rs b/tests/src/bkdrsub.rs index b312f0b49..732aa4aec 100644 --- a/tests/src/bkdrsub.rs +++ b/tests/src/bkdrsub.rs @@ -12,9 +12,15 @@ use sharding::{ fn bkdrsub_one() { let hasher = Hasher::from("bkdrsub"); - let key1 = "mfh15d#3940964349989430"; + let key1 = "otdn#1042015:carSubBrand_e4ab74c125e9e95edad691ffe9820118"; let hash1 = hasher.hash(&key1.as_bytes()); - println!("key:{}, hash:{}, idx:{}", key1, hash1, hash1 % 180); + + let shards = 1080; + let servers = vec!["padding".to_string(); shards]; + let dist = Distribute::from("modrange-8640", &servers); + let dist_idx = dist.index(hash1); + + println!("key:{}, hash:{}, idx:{}", key1, hash1, dist_idx); } // TODO 临时批量文件的hash、dist校验测试,按需打开