diff --git a/cfspeedtest/__init__.py b/cfspeedtest/__init__.py index 9b1e5db..10d2873 100644 --- a/cfspeedtest/__init__.py +++ b/cfspeedtest/__init__.py @@ -1,5 +1,10 @@ """cfspeedtest, a Cloudflare speedtest suite in Python.""" -from cfspeedtest.cloudflare import CloudflareSpeedtest, TestSpec, TestType +from cfspeedtest.cloudflare import ( + CloudflareSpeedtest, + SuiteResults, + TestSpec, + TestType, +) -__all__ = ("CloudflareSpeedtest", "TestSpec", "TestType") +__all__ = ("CloudflareSpeedtest", "SuiteResults", "TestSpec", "TestType") diff --git a/cfspeedtest/__main__.py b/cfspeedtest/__main__.py index 6de3f87..8985c13 100644 --- a/cfspeedtest/__main__.py +++ b/cfspeedtest/__main__.py @@ -4,7 +4,7 @@ import sys from argparse import ArgumentParser -from cfspeedtest.cloudflare import CloudflareSpeedtest +from cfspeedtest.cloudflare import CloudflareSpeedtest, SuiteResults from cfspeedtest.logger import log, set_verbosity, setup_log from cfspeedtest.version import __version__ @@ -28,7 +28,7 @@ def cfspeedtest() -> None: ) args = parser.parse_args() - setup_log(silent=args.json) + setup_log(silent=args.json and not args.version) set_verbosity(debug=args.debug) if args.version: @@ -36,11 +36,11 @@ def cfspeedtest() -> None: log.debug("Python %s", sys.version) sys.exit(0) - results = CloudflareSpeedtest().run_all(megabits=not args.bps) + results = CloudflareSpeedtest(SuiteResults(megabits=not args.bps)).run_all() if args.json: setup_log() - log.info(json.dumps(CloudflareSpeedtest.results_to_dict(results))) + log.info(json.dumps(results.to_full_dict())) if __name__ == "__main__": diff --git a/cfspeedtest/cloudflare.py b/cfspeedtest/cloudflare.py index cc88211..d243882 100644 --- a/cfspeedtest/cloudflare.py +++ b/cfspeedtest/cloudflare.py @@ -9,7 +9,9 @@ import logging import statistics import time +from collections import UserDict from enum import Enum +from itertools import pairwise from typing import Any, NamedTuple import requests @@ -38,6 +40,11 @@ def bits(self) -> int: """The size of the test in bits.""" return self.size * 8 + @property + def label(self) -> str: + """The output label for the test.""" + return f"{self.name}_{self.type.name.lower()}" + TestSpecs = tuple[TestSpec, ...] @@ -97,12 +104,7 @@ def jitter_from(latencies: list[float]) -> float | None: """Compute jitter as average deviation between consecutive latencies.""" if len(latencies) < 2: return None - return statistics.mean( - [ - abs(latencies[i] - latencies[i - 1]) - for i in range(1, len(latencies)) - ] - ) + return statistics.mean([abs(b - a) for a, b in pairwise(latencies)]) class TestMetadata(NamedTuple): @@ -128,7 +130,67 @@ def _calculate_percentile(data: list[float], percentile: float) -> float: return edges[0] + (edges[1] - edges[0]) * rem -SuiteResults = dict[str, dict[str, TestResult]] +def bits_to_megabits(bits: int) -> float: + """Convert bits to megabits, rounded to 2 decimal places.""" + return round(bits / 1e6, 2) + + +class SuiteResults(UserDict): + """The results of a test suite.""" + + def __init__(self, *, megabits: bool = False): + super().__init__() + self.setdefault("tests", {}) + self._megabits = megabits + log.info("megabits: %s", self._megabits) + + @property + def meta(self) -> TestMetadata: + return self["meta"] + + @meta.setter + def meta(self, value: TestMetadata) -> None: + self["meta"] = value + for meta_field, meta_value in value._asdict().items(): + log.info("meta.%s: %s", meta_field, meta_value) + + @property + def tests(self) -> dict[str, TestResult]: + return self["tests"] + + def add_test(self, label: str, result: TestResult) -> None: + if ( + self._megabits + and result.value is not None + and label not in {"latency", "jitter"} + ): + result = TestResult(bits_to_megabits(result.value), result.time) + self.tests[label] = result + log.info("tests.%s: %s", label, result.value) + + def add_90th(self, test_type: TestType, result: TestResult) -> None: + if self._megabits and result.value is not None: + result = TestResult(bits_to_megabits(result.value), result.time) + label = f"90th_percentile_{test_type.name.lower()}" + self[label] = result + log.info("%s: %s", label, result.value) + + @property + def percentile_90th_down(self) -> TestResult: + return self["90th_percentile_down"] + + @property + def percentile_90th_up(self) -> TestResult: + return self["90th_percentile_up"] + + def to_full_dict(self) -> dict: + return { + "megabits": self._megabits, + "meta": self.meta._asdict(), + "tests": {k: v._asdict() for k, v in self.tests.items()}, + "90th_percentile_down": self.percentile_90th_down._asdict(), + "90th_percentile_up": self.percentile_90th_up._asdict(), + } class CloudflareSpeedtest: @@ -155,10 +217,7 @@ def __init__( # noqa: D417 no logging will occur. """ - self.results = results or {} - self.results.setdefault("tests", {}) - self.results.setdefault("meta", {}) - + self.results = results or SuiteResults() self.tests = tests self.request_sess = requests.Session() self.timeout = timeout @@ -192,81 +251,50 @@ def run_test(self, test: TestSpec) -> TestTimers: ) coll.full.append(time.time() - start) coll.server.append( - float(r.headers["Server-Timing"].split("=")[1].split(",")[0]) / 1e3 + float(r.headers["Server-Timing"].split("=")[1].split(",")[0]) + / 1e3 ) coll.request.append( r.elapsed.seconds + r.elapsed.microseconds / 1e6 ) return coll - def _sprint( - self, label: str, result: TestResult, *, meta: bool = False - ) -> None: - """Add an entry to the suite results and log it.""" - log.info("%s: %s", label, result.value) - save_to = self.results["meta"] if meta else self.results["tests"] - save_to[label] = result + def run_test_latency(self, test: TestSpec) -> tuple[float, float | None]: + """Run a test specification and collect latency results.""" + timers = self.run_test(test) + latencies = timers.to_latencies() + jitter = timers.jitter_from(latencies) + if jitter: + jitter = round(jitter, 2) + latency = round(statistics.mean(latencies), 2) + self.results.add_test("latency", TestResult(latency)) + self.results.add_test("jitter", TestResult(jitter)) + return (latency, jitter) + + def run_test_speed(self, test: TestSpec) -> list[int]: + """Run a test specification and collect speed results.""" + speeds = self.run_test(test).to_speeds(test) + self.results.add_test( + test.label, + TestResult(int(statistics.mean(speeds))), + ) + return speeds - def run_all(self, *, megabits: bool = False) -> SuiteResults: + def run_all(self) -> SuiteResults: """Run the full test suite.""" - meta = self.metadata() - self._sprint("ip", TestResult(meta.ip), meta=True) - self._sprint("isp", TestResult(meta.isp)) - self._sprint("location_code", TestResult(meta.location_code), meta=True) - self._sprint("location_city", TestResult(meta.city), meta=True) - self._sprint("location_region", TestResult(meta.region), meta=True) - - data = {"down": [], "up": []} - for test in self.tests: - timers = self.run_test(test) + self.results.meta = self.metadata() + data = {TestType.Down: [], TestType.Up: []} + for test in self.tests: if test.name == "latency": - latencies = timers.to_latencies() - jitter = timers.jitter_from(latencies) - if jitter: - jitter = round(jitter, 2) - self._sprint( - "latency", - TestResult(round(statistics.mean(latencies), 2)), - ) - self._sprint("jitter", TestResult(jitter)) + self.run_test_latency(test) continue - - speeds = timers.to_speeds(test) - data[test.type.name.lower()].extend(speeds) - # TODO: reduce code duplication of megabits reporting - mean_speed = int(statistics.mean(speeds)) - label_suffix = "bps" - if megabits: - mean_speed = round(mean_speed / 1e6, 2) - label_suffix = "mbps" - self._sprint( - f"{test.name}_{test.type.name.lower()}_{label_suffix}", - TestResult(mean_speed), - ) + data[test.type].extend(self.run_test_speed(test)) for k, v in data.items(): result = None if len(v) > 0: result = int(_calculate_percentile(v, 0.9)) - label_suffix = "bps" - if megabits: - result = round(result / 1e6, 2) if result else result - label_suffix = "mbps" - self._sprint( - f"90th_percentile_{k}_{label_suffix}", - TestResult(result), - ) + self.results.add_90th(k, TestResult(result)) return self.results - - @staticmethod - def results_to_dict( - results: SuiteResults, - ) -> dict[str, dict[str, dict[str, float]]]: - """Convert the test results to a full dictionary.""" - return { - k: {sk: sv._asdict()} - for k, v in results.items() - for sk, sv in v.items() - }