Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: implement result typing #24

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cfspeedtest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
"""cfspeedtest, a Cloudflare speedtest suite in Python."""

from cfspeedtest.cloudflare import CloudflareSpeedtest, TestSpec, TestType
from cfspeedtest.cloudflare import (
CloudflareSpeedtest,
SuiteResults,
TestSpec,
TestType,
)

__all__ = ("CloudflareSpeedtest", "TestSpec", "TestType")
__all__ = ("CloudflareSpeedtest", "SuiteResults", "TestSpec", "TestType")
8 changes: 4 additions & 4 deletions cfspeedtest/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from argparse import ArgumentParser

from cfspeedtest.cloudflare import CloudflareSpeedtest
from cfspeedtest.cloudflare import CloudflareSpeedtest, SuiteResults
from cfspeedtest.logger import log, set_verbosity, setup_log
from cfspeedtest.version import __version__

Expand All @@ -28,19 +28,19 @@ def cfspeedtest() -> None:
)
args = parser.parse_args()

setup_log(silent=args.json)
setup_log(silent=args.json and not args.version)
set_verbosity(debug=args.debug)

if args.version:
log.info("cfspeedtest %s", __version__)
log.debug("Python %s", sys.version)
sys.exit(0)

results = CloudflareSpeedtest().run_all(megabits=not args.bps)
results = CloudflareSpeedtest(SuiteResults(megabits=not args.bps)).run_all()

if args.json:
setup_log()
log.info(json.dumps(CloudflareSpeedtest.results_to_dict(results)))
log.info(json.dumps(results.to_full_dict()))


if __name__ == "__main__":
Expand Down
170 changes: 99 additions & 71 deletions cfspeedtest/cloudflare.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import logging
import statistics
import time
from collections import UserDict
from enum import Enum
from itertools import pairwise
from typing import Any, NamedTuple

import requests
Expand Down Expand Up @@ -38,6 +40,11 @@ def bits(self) -> int:
"""The size of the test in bits."""
return self.size * 8

@property
def label(self) -> str:
"""The output label for the test."""
return f"{self.name}_{self.type.name.lower()}"


TestSpecs = tuple[TestSpec, ...]

Expand Down Expand Up @@ -97,12 +104,7 @@ def jitter_from(latencies: list[float]) -> float | None:
"""Compute jitter as average deviation between consecutive latencies."""
if len(latencies) < 2:
return None
return statistics.mean(
[
abs(latencies[i] - latencies[i - 1])
for i in range(1, len(latencies))
]
)
return statistics.mean([abs(b - a) for a, b in pairwise(latencies)])


class TestMetadata(NamedTuple):
Expand All @@ -128,7 +130,67 @@ def _calculate_percentile(data: list[float], percentile: float) -> float:
return edges[0] + (edges[1] - edges[0]) * rem


SuiteResults = dict[str, dict[str, TestResult]]
def bits_to_megabits(bits: int) -> float:
"""Convert bits to megabits, rounded to 2 decimal places."""
return round(bits / 1e6, 2)


class SuiteResults(UserDict):
"""The results of a test suite."""

def __init__(self, *, megabits: bool = False):
super().__init__()
self.setdefault("tests", {})
self._megabits = megabits
log.info("megabits: %s", self._megabits)

@property
def meta(self) -> TestMetadata:
return self["meta"]

@meta.setter
def meta(self, value: TestMetadata) -> None:
self["meta"] = value
for meta_field, meta_value in value._asdict().items():
log.info("meta.%s: %s", meta_field, meta_value)

@property
def tests(self) -> dict[str, TestResult]:
return self["tests"]

def add_test(self, label: str, result: TestResult) -> None:
if (
self._megabits
and result.value is not None
and label not in {"latency", "jitter"}
):
result = TestResult(bits_to_megabits(result.value), result.time)
self.tests[label] = result
log.info("tests.%s: %s", label, result.value)

def add_90th(self, test_type: TestType, result: TestResult) -> None:
if self._megabits and result.value is not None:
result = TestResult(bits_to_megabits(result.value), result.time)
label = f"90th_percentile_{test_type.name.lower()}"
self[label] = result
log.info("%s: %s", label, result.value)

@property
def percentile_90th_down(self) -> TestResult:
return self["90th_percentile_down"]

@property
def percentile_90th_up(self) -> TestResult:
return self["90th_percentile_up"]

def to_full_dict(self) -> dict:
return {
"megabits": self._megabits,
"meta": self.meta._asdict(),
"tests": {k: v._asdict() for k, v in self.tests.items()},
"90th_percentile_down": self.percentile_90th_down._asdict(),
"90th_percentile_up": self.percentile_90th_up._asdict(),
}


class CloudflareSpeedtest:
Expand All @@ -155,10 +217,7 @@ def __init__( # noqa: D417
no logging will occur.

"""
self.results = results or {}
self.results.setdefault("tests", {})
self.results.setdefault("meta", {})

self.results = results or SuiteResults()
self.tests = tests
self.request_sess = requests.Session()
self.timeout = timeout
Expand Down Expand Up @@ -192,81 +251,50 @@ def run_test(self, test: TestSpec) -> TestTimers:
)
coll.full.append(time.time() - start)
coll.server.append(
float(r.headers["Server-Timing"].split("=")[1].split(",")[0]) / 1e3
float(r.headers["Server-Timing"].split("=")[1].split(",")[0])
/ 1e3
)
coll.request.append(
r.elapsed.seconds + r.elapsed.microseconds / 1e6
)
return coll

def _sprint(
self, label: str, result: TestResult, *, meta: bool = False
) -> None:
"""Add an entry to the suite results and log it."""
log.info("%s: %s", label, result.value)
save_to = self.results["meta"] if meta else self.results["tests"]
save_to[label] = result
def run_test_latency(self, test: TestSpec) -> tuple[float, float | None]:
"""Run a test specification and collect latency results."""
timers = self.run_test(test)
latencies = timers.to_latencies()
jitter = timers.jitter_from(latencies)
if jitter:
jitter = round(jitter, 2)
latency = round(statistics.mean(latencies), 2)
self.results.add_test("latency", TestResult(latency))
self.results.add_test("jitter", TestResult(jitter))
return (latency, jitter)

def run_test_speed(self, test: TestSpec) -> list[int]:
"""Run a test specification and collect speed results."""
speeds = self.run_test(test).to_speeds(test)
self.results.add_test(
test.label,
TestResult(int(statistics.mean(speeds))),
)
return speeds

def run_all(self, *, megabits: bool = False) -> SuiteResults:
def run_all(self) -> SuiteResults:
"""Run the full test suite."""
meta = self.metadata()
self._sprint("ip", TestResult(meta.ip), meta=True)
self._sprint("isp", TestResult(meta.isp))
self._sprint("location_code", TestResult(meta.location_code), meta=True)
self._sprint("location_city", TestResult(meta.city), meta=True)
self._sprint("location_region", TestResult(meta.region), meta=True)

data = {"down": [], "up": []}
for test in self.tests:
timers = self.run_test(test)
self.results.meta = self.metadata()

data = {TestType.Down: [], TestType.Up: []}
for test in self.tests:
if test.name == "latency":
latencies = timers.to_latencies()
jitter = timers.jitter_from(latencies)
if jitter:
jitter = round(jitter, 2)
self._sprint(
"latency",
TestResult(round(statistics.mean(latencies), 2)),
)
self._sprint("jitter", TestResult(jitter))
self.run_test_latency(test)
continue

speeds = timers.to_speeds(test)
data[test.type.name.lower()].extend(speeds)
# TODO: reduce code duplication of megabits reporting
mean_speed = int(statistics.mean(speeds))
label_suffix = "bps"
if megabits:
mean_speed = round(mean_speed / 1e6, 2)
label_suffix = "mbps"
self._sprint(
f"{test.name}_{test.type.name.lower()}_{label_suffix}",
TestResult(mean_speed),
)
data[test.type].extend(self.run_test_speed(test))

for k, v in data.items():
result = None
if len(v) > 0:
result = int(_calculate_percentile(v, 0.9))
label_suffix = "bps"
if megabits:
result = round(result / 1e6, 2) if result else result
label_suffix = "mbps"
self._sprint(
f"90th_percentile_{k}_{label_suffix}",
TestResult(result),
)
self.results.add_90th(k, TestResult(result))

return self.results

@staticmethod
def results_to_dict(
results: SuiteResults,
) -> dict[str, dict[str, dict[str, float]]]:
"""Convert the test results to a full dictionary."""
return {
k: {sk: sv._asdict()}
for k, v in results.items()
for sk, sv in v.items()
}