Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use default interval from pollfile to stagger new jobs #339

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/337.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use default interval from pollfile to stagger new jobs
12 changes: 9 additions & 3 deletions src/zino/config/polldevs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
from zino.config.models import PollDevice


def read_polldevs(filename: str) -> Iterator[PollDevice]:
"""Reads and parses the legacy `polldevs.cf` format, yielding a sequence of PollDevice objects.
def read_polldevs(filename: str) -> Tuple[dict[str, PollDevice], dict[str, str]]:
"""
Reads and parses the legacy `polldevs.cf` format, returning a dictionary of device names and the associated
PollDevice object and a dictionary of default settings

This parser is slightly more lax than the original Tcl-based parser, in that it allows multiple empty lines or
multiple spaces in value assignments.
"""
defaults = {}
devices = {}
try:
with open(filename, "r") as devs:
for lineno, section in _read_conf_sections(devs):
Expand All @@ -22,7 +25,8 @@ def read_polldevs(filename: str) -> Iterator[PollDevice]:
continue

try:
yield PollDevice(**(defaults | section))
device = PollDevice(**(defaults | section))
devices[device.name] = device
except ValidationError as error:
first_error = error.errors()[0]
device_name = section.get("name", "N/A")
Expand All @@ -37,6 +41,8 @@ def read_polldevs(filename: str) -> Iterator[PollDevice]:
error.filename = filename
raise

return devices, defaults


def _read_conf_sections(filehandle: TextIO) -> Iterator[Tuple[int, dict]]:
"""Reads and yields individual configuration sections from `polldevs.cf`.
Expand Down
6 changes: 3 additions & 3 deletions src/zino/getuptime.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


async def run(args: argparse.Namespace):
devices = {d.name: d for d in read_polldevs(config.polling.file)}
devices, _ = read_polldevs(config.polling.file)

Check warning on line 23 in src/zino/getuptime.py

View check run for this annotation

Codecov / codecov/patch

src/zino/getuptime.py#L23

Added line #L23 was not covered by tests
device = devices[args.router]

snmp = SNMP(device)
Expand All @@ -29,9 +29,9 @@


def parse_args():
devicenames = [d.name for d in read_polldevs(config.polling.file)]
devices, _ = read_polldevs(config.polling.file)
parser = argparse.ArgumentParser(description="Fetch sysUptime from a device in the pollfile")
parser.add_argument("router", type=str, help="Zino router name", choices=devicenames)
parser.add_argument("router", type=str, help="Zino router name", choices=devices.keys())
return parser.parse_args()


Expand Down
17 changes: 9 additions & 8 deletions src/zino/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ def get_scheduler() -> AsyncIOScheduler:


@log_time_spent()
def load_polldevs(polldevs_conf: str) -> Tuple[Set, Set, Set]:
def load_polldevs(polldevs_conf: str) -> Tuple[Set, Set, Set, dict[str, str]]:
"""Loads pollfile into process state.

:returns: A tuple of (new_devices, deleted_devices, changed_devices)
"""
try:
devices = {d.name: d for d in read_polldevs(polldevs_conf)}
devices, defaults = read_polldevs(polldevs_conf)
except InvalidConfiguration as error:
_log.error(error)
return set(), set(), set()
return set(), set(), set(), dict()

new_devices = set(devices) - set(state.polldevs)
deleted_devices = set(state.polldevs) - set(devices)
Expand All @@ -68,7 +68,7 @@ def load_polldevs(polldevs_conf: str) -> Tuple[Set, Set, Set]:
for device in deleted_devices:
del state.polldevs[device]

return new_devices, deleted_devices, changed_devices
return new_devices, deleted_devices, changed_devices, defaults


def init_state_for_devices(devices: Sequence[PollDevice]):
Expand All @@ -79,12 +79,13 @@ def init_state_for_devices(devices: Sequence[PollDevice]):


async def load_and_schedule_polldevs(polldevs_conf: str):
new_devices, deleted_devices, changed_devices = load_polldevs(polldevs_conf)
new_devices, deleted_devices, changed_devices, defaults = load_polldevs(polldevs_conf)
deschedule_devices(deleted_devices | changed_devices)
schedule_devices(new_devices | changed_devices)
stagger_interval = defaults.get("interval", DEFAULT_INTERVAL_MINUTES)
schedule_devices(new_devices | changed_devices, int(stagger_interval))


def schedule_devices(devices: Sequence[str]):
def schedule_devices(devices: Sequence[str], stagger_interval: int = DEFAULT_INTERVAL_MINUTES):
devices = sorted((state.polldevs[name] for name in devices), key=operator.attrgetter("priority"), reverse=True)
if not devices:
return
Expand All @@ -94,7 +95,7 @@ def schedule_devices(devices: Sequence[str]):
scheduler = get_scheduler()

# Spread poll jobs evenly across the entire default interval
stagger_factor = (DEFAULT_INTERVAL_MINUTES * 60) / len(devices)
stagger_factor = (stagger_interval * 60) / len(devices)
for index, device in enumerate(devices):
first_run_time = datetime.now() + timedelta(seconds=index * stagger_factor)

Expand Down
27 changes: 17 additions & 10 deletions tests/config/polldevs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,46 @@

class TestReadPolldevs:
def test_should_generate_two_polldevices_from_test_config(self, polldevs_conf):
result = list(read_polldevs(polldevs_conf))
result, _ = read_polldevs(polldevs_conf)
assert len(result) == 2
assert all(isinstance(device, PollDevice) for device in result)
assert all(isinstance(device, PollDevice) for device in result.values())

def test_should_return_default_values_from_test_config(self, polldevs_conf):
_, defaults = read_polldevs(polldevs_conf)
assert "community" in defaults
assert defaults["community"] == "foobar"
assert "domain" in defaults
assert defaults["domain"] == "uninett.no"

def test_should_use_default_values_in_polldevices_generated_from_test_config(self, polldevs_conf):
result = list(read_polldevs(polldevs_conf))
assert all(device.community == "foobar" for device in result)
assert all(device.domain == "uninett.no" for device in result)
result, _ = read_polldevs(polldevs_conf)
assert all(device.community == "foobar" for device in result.values())
assert all(device.domain == "uninett.no" for device in result.values())


class TestReadInvalidPolldevs:
def test_should_raise_exception(self, invalid_polldevs_conf):
with pytest.raises(InvalidConfiguration):
list(read_polldevs(invalid_polldevs_conf))
read_polldevs(invalid_polldevs_conf)

def test_should_have_filename_in_exception(self, invalid_polldevs_conf):
with pytest.raises(InvalidConfiguration) as e:
list(read_polldevs(invalid_polldevs_conf))
read_polldevs(invalid_polldevs_conf)
assert "polldevs.cf" in str(e.value)

def test_should_have_line_number_in_exception(self, invalid_polldevs_conf):
with pytest.raises(InvalidConfiguration) as e:
list(read_polldevs(invalid_polldevs_conf))
read_polldevs(invalid_polldevs_conf)
assert "2" in str(e.value)

def test_exception_should_include_device_name_on_missing_address(self, missing_device_address_polldevs_conf):
with pytest.raises(InvalidConfiguration) as e:
list(read_polldevs(missing_device_address_polldevs_conf))
read_polldevs(missing_device_address_polldevs_conf)
assert "example-gw" in str(e.value)

def test_exception_should_include_missing_attribute_on_missing_address(self, missing_device_address_polldevs_conf):
with pytest.raises(InvalidConfiguration) as e:
list(read_polldevs(missing_device_address_polldevs_conf))
read_polldevs(missing_device_address_polldevs_conf)
assert "Field required ('address')" in str(e.value)


Expand Down
43 changes: 37 additions & 6 deletions tests/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,31 @@ class TestLoadPolldevs:
@patch("zino.state.polldevs", dict())
@patch("zino.state.state", ZinoState())
def test_should_return_all_new_devices_on_first_run(self, polldevs_conf):
new_devices, deleted_devices, changed_devices = scheduler.load_polldevs(polldevs_conf)
new_devices, deleted_devices, changed_devices, _ = scheduler.load_polldevs(polldevs_conf)
assert len(new_devices) > 0
assert not deleted_devices
assert not changed_devices

@patch("zino.state.polldevs", dict())
@patch("zino.state.state", ZinoState())
def test_should_return_defaults_on_first_run(self, polldevs_conf):
_, _, _, defaults = scheduler.load_polldevs(polldevs_conf)
assert len(defaults) > 0
assert "interval" in defaults

@patch("zino.state.polldevs", dict())
@patch("zino.state.state", ZinoState())
def test_should_return_deleted_devices_on_second_run(self, polldevs_conf, polldevs_conf_with_single_router):
scheduler.load_polldevs(polldevs_conf)
new_devices, deleted_devices, changed_devices = scheduler.load_polldevs(polldevs_conf_with_single_router)
new_devices, deleted_devices, changed_devices, _ = scheduler.load_polldevs(polldevs_conf_with_single_router)
assert not new_devices
assert len(deleted_devices) > 0
assert not changed_devices

@patch("zino.state.polldevs", dict())
@patch("zino.state.state", ZinoState())
def test_should_return_no_new_or_deleted_devices_on_invalid_configuration(self, invalid_polldevs_conf):
new_devices, deleted_devices, changed_devices = scheduler.load_polldevs(invalid_polldevs_conf)
new_devices, deleted_devices, changed_devices, _ = scheduler.load_polldevs(invalid_polldevs_conf)
assert not new_devices
assert not deleted_devices
assert not changed_devices
Expand All @@ -41,6 +48,30 @@ def test_should_log_error_on_invalid_configuration(self, caplog, invalid_polldev
scheduler.load_polldevs(invalid_polldevs_conf)
assert "'lalala' is not a valid configuration line" in caplog.text

@patch("zino.state.polldevs", dict())
@patch("zino.state.state", ZinoState())
def test_should_return_changed_defaults(self, polldevs_conf, tmp_path):
polldevs_with_changed_defaults = tmp_path.joinpath("changed-defaults-polldevs.cf")
with open(polldevs_with_changed_defaults, "w") as conf:
conf.write(
"""# polldevs test config
default interval: 10
default community: barfoo
default domain: uninett.no
default statistics: yes
default hcounters: yes

name: example-gw
address: 10.0.42.1

name: example-gw2
address: 10.0.43.1""" # Lack of a new-line here is intentional to test the parser
)

_, _, _, defaults = scheduler.load_polldevs(polldevs_conf)
_, _, _, changed_defaults = scheduler.load_polldevs(polldevs_with_changed_defaults)
assert defaults != changed_defaults

@patch("zino.state.polldevs", dict())
@patch("zino.state.state", ZinoState())
def test_should_return_changed_devices_on_changed_defaults(self, polldevs_conf, tmp_path):
Expand All @@ -62,7 +93,7 @@ def test_should_return_changed_devices_on_changed_defaults(self, polldevs_conf,
)

scheduler.load_polldevs(polldevs_conf)
new_devices, deleted_devices, changed_devices = scheduler.load_polldevs(polldevs_with_changed_defaults)
new_devices, deleted_devices, changed_devices, _ = scheduler.load_polldevs(polldevs_with_changed_defaults)
assert not new_devices
assert not deleted_devices
assert changed_devices
Expand All @@ -89,7 +120,7 @@ def test_should_return_changed_devices_on_changed_interval(self, polldevs_conf,
)

scheduler.load_polldevs(polldevs_conf)
new_devices, deleted_devices, changed_devices = scheduler.load_polldevs(polldevs_with_changed_defaults)
new_devices, deleted_devices, changed_devices, _ = scheduler.load_polldevs(polldevs_with_changed_defaults)
assert not new_devices
assert not deleted_devices
assert changed_devices
Expand All @@ -99,7 +130,7 @@ class TestScheduleNewDevices:
@patch("zino.state.polldevs", dict())
@patch("zino.state.state", ZinoState())
def test_should_schedule_jobs_for_new_devices(self, polldevs_conf, mocked_scheduler):
new_devices, _, _ = scheduler.load_polldevs(polldevs_conf)
new_devices, _, _, _ = scheduler.load_polldevs(polldevs_conf)
assert len(new_devices) > 0

scheduler.schedule_devices(new_devices)
Expand Down
Loading