Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

page_service: add benchmark for batching #9820

Merged
merged 10 commits into from
Nov 25, 2024
40 changes: 38 additions & 2 deletions libs/metrics/src/more_process_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,28 @@

// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.

use once_cell::sync::Lazy;
use prometheus::Gauge;

use crate::UIntGauge;

pub struct Collector {
descs: Vec<prometheus::core::Desc>,
vmlck: crate::UIntGauge,
cpu_seconds_highres: Gauge,
}

const NMETRICS: usize = 1;
const NMETRICS: usize = 2;

static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
if long == -1 {
panic!("sysconf(_SC_CLK_TCK) failed");
}
let convertible_to_f64: i32 =
i32::try_from(long).expect("sysconf(_SC_CLK_TCK) is larger than i32");
convertible_to_f64 as f64
});

impl prometheus::core::Collector for Collector {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
Expand All @@ -27,6 +41,12 @@ impl prometheus::core::Collector for Collector {
mfs.extend(self.vmlck.collect())
}
}
if let Ok(stat) = myself.stat() {
let cpu_seconds = stat.utime + stat.stime;
self.cpu_seconds_highres
.set(cpu_seconds as f64 / *CLK_TCK_F64);
mfs.extend(self.cpu_seconds_highres.collect());
}
mfs
}
}
Expand All @@ -43,7 +63,23 @@ impl Collector {
.cloned(),
);

Self { descs, vmlck }
let cpu_seconds_highres = Gauge::new(
problame marked this conversation as resolved.
Show resolved Hide resolved
"libmetrics_process_cpu_seconds_highres",
"Total user and system CPU time spent in seconds.\
Sub-second resolution, hence better than `process_cpu_seconds_total`.",
)
.unwrap();
descs.extend(
prometheus::core::Collector::desc(&cpu_seconds_highres)
.into_iter()
.cloned(),
);

Self {
descs,
vmlck,
cpu_seconds_highres,
}
}
}

Expand Down
119 changes: 111 additions & 8 deletions test_runner/performance/pageserver/test_pageserver_getpage_merge.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import dataclasses
import json
import time
from dataclasses import dataclass
from typing import Any, Optional
from pathlib import Path
from typing import Any, Optional, Union

import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.utils import humantime_to_ms

TARGET_RUNTIME = 60
Expand Down Expand Up @@ -43,7 +45,7 @@ def test_getpage_merge_smoke(
#
# record perf-related parameters as metrics to simplify processing of results
#
params: dict[str, tuple[float | int, dict[str, Any]]] = {}
params: dict[str, tuple[Union[float, int], dict[str, Any]]] = {}

params.update(
{
Expand Down Expand Up @@ -146,7 +148,7 @@ def get_metrics() -> Metrics:
).value,
compute_getpage_count=compute_getpage_count,
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
"process_cpu_seconds_total"
"libmetrics_process_cpu_seconds_highres"
).value,
)

Expand Down Expand Up @@ -176,11 +178,10 @@ def workload() -> Metrics:
#
# Sanity-checks on the collected data
#
def close_enough(a, b):
return (a / b > 0.99 and a / b < 1.01) and (b / a > 0.99 and b / a < 1.01)

# assert that getpage counts roughly match between compute and ps
assert close_enough(metrics.pageserver_getpage_count, metrics.compute_getpage_count)
assert metrics.pageserver_getpage_count == pytest.approx(
metrics.compute_getpage_count, rel=0.01
)

#
# Record the results
Expand All @@ -195,3 +196,105 @@ def close_enough(a, b):
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)


@pytest.mark.parametrize(
"batch_timeout", [None, "10us", "20us", "50us", "100us", "200us", "500us", "1ms"]
)
def test_timer_precision(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
batch_timeout: Optional[str],
):
"""
Determine the batching timeout precision (mean latency) and tail latency impact.

The baseline is `None`; an ideal batching timeout implementation would increase
the mean latency by exactly `batch_timeout`.

That is not the case with the current implementation, will be addressed in future changes.
"""

#
# Setup
#

def patch_ps_config(ps_config):
ps_config["server_side_batch_timeout"] = batch_timeout

neon_env_builder.pageserver_config_override = patch_ps_config

env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()
cur = conn.cursor()

cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
cur.execute("SET effective_io_concurrency=1")

cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")

log.info("Filling the table")
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
tablesize = 50 * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?

cur.close()
conn.close()

wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)

endpoint.stop()

for sk in env.safekeepers:
sk.stop()

#
# Run single-threaded pagebench (TODO: dedup with other benchmark code)
#

ps_http = env.pageserver.http_client()

cmd = [
str(env.neon_binpath / "pagebench"),
problame marked this conversation as resolved.
Show resolved Hide resolved
"get-page-latest-lsn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--num-clients",
"1",
"--runtime",
"10s",
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")

with open(results_path) as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")

total = results["total"]

metric = "latency_mean"
zenbenchmark.record(
metric,
metric_value=humantime_to_ms(total[metric]),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)

metric = "latency_percentiles"
for k, v in total[metric].items():
zenbenchmark.record(
f"{metric}.{k}",
metric_value=humantime_to_ms(v),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
Loading