-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eth hotplug check cable and routable before pinging (Bugfix) #1694
base: main
Are you sure you want to change the base?
Changes from all commits
bc55608
db931af
1c3c6df
d0bfc3c
ef6091d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,122 @@ | ||
#!/usr/bin/env python3 | ||
# Copyright 2021 Canonical Ltd. | ||
# All rights reserved. | ||
# | ||
# encoding: utf-8 | ||
# Copyright 2025 Canonical Ltd. | ||
# Written by: | ||
# Maciej Kisielewski <[email protected]> | ||
# Isaac Yang <[email protected]> | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License version 3, | ||
# as published by the Free Software Foundation. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
"""Check if hotplugging works on an ethernet port.""" | ||
|
||
import sys | ||
import os | ||
import time | ||
import glob | ||
import yaml | ||
import subprocess as sp | ||
|
||
from gateway_ping_test import perform_ping_test | ||
|
||
NETPLAN_CFG_PATHS = ("/etc/netplan", "/lib/netplan", "/run/netplan") | ||
|
||
|
||
def netplan_renderer(): | ||
""" | ||
Check the renderer used by netplan on the system if it is networkd or | ||
NetworkManager. | ||
This function looks for the renderer used in the yaml files located in the | ||
NETPLAN_CFG_PATHS directories, and returns the first renderer found. | ||
If the renderer is not found, it defaults to "networkd". | ||
If the netplan file is not found, it defaults to "NetworkManager". | ||
""" | ||
netplan_file_exist = False | ||
for basedir in NETPLAN_CFG_PATHS: | ||
if os.path.exists(basedir): | ||
files = glob.glob(os.path.join(basedir, "*.yaml")) | ||
for f in files: | ||
netplan_file_exist = True | ||
with open(f, "r") as file: | ||
data = yaml.safe_load(file) | ||
if "renderer" in data["network"]: | ||
return data["network"]["renderer"] | ||
if netplan_file_exist: | ||
return "networkd" | ||
return "NetworkManager" | ||
|
||
|
||
def get_interface_info(interface, renderer): | ||
""" | ||
Get the interface information (state and gateway) from the renderer. | ||
""" | ||
if renderer == "networkd": | ||
cmd = "networkctl status --no-pager --no-legend {}".format(interface) | ||
key_map = {"State": "state", "Gateway": "gateway"} | ||
elif renderer == "NetworkManager": | ||
cmd = "nmcli device show {}".format(interface) | ||
key_map = {"GENERAL.STATE": "state", "IP4.GATEWAY": "gateway"} | ||
else: | ||
raise ValueError("Unknown renderer: {}".format(renderer)) | ||
|
||
return _get_cmd_info(cmd, key_map, renderer) | ||
|
||
|
||
def _get_cmd_info(cmd, key_map, renderer): | ||
info = {} | ||
try: | ||
output = sp.check_output(cmd, shell=True) | ||
for line in output.decode(sys.stdout.encoding).splitlines(): | ||
# Skip lines that don't have a "key: value" format | ||
if ":" not in line: | ||
continue | ||
key, val = line.strip().split(":", maxsplit=1) | ||
key = key.strip() | ||
val = val.strip() | ||
if key in key_map: | ||
info[key_map[key]] = val | ||
except sp.CalledProcessError as e: | ||
print("Error running {} command: {}".format(renderer, e)) | ||
return info | ||
|
||
|
||
def _check_routable_state(interface, renderer): | ||
""" | ||
Check if the interface is in a routable state depending on the renderer | ||
""" | ||
routable = False | ||
state = "" | ||
info = get_interface_info(interface, renderer) | ||
state = info.get("state", "") | ||
if renderer == "networkd": | ||
routable = "routable" in state | ||
elif renderer == "NetworkManager": | ||
routable = "connected" in state and "disconnected" not in state | ||
return (routable, state) | ||
|
||
|
||
def wait_for_routable_state( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should check the (and we provide a |
||
interface, renderer, do_routable=True, max_wait=30 | ||
): | ||
attempts = 0 | ||
routable_msg = "routable" if do_routable else "NOT routable" | ||
while attempts <= max_wait: | ||
attempts += 1 | ||
(routable, _) = _check_routable_state(interface, renderer) | ||
if routable == do_routable: | ||
print("Reached {} state".format(routable_msg)) | ||
return | ||
time.sleep(1) | ||
raise SystemExit("Failed to reach {} state!".format(routable_msg)) | ||
|
||
|
||
def has_cable(iface): | ||
|
@@ -17,6 +126,47 @@ def has_cable(iface): | |
return carrier.read()[0] == "1" | ||
|
||
|
||
def wait_for_cable_state(iface, do_cable=True, max_wait=30): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, using the |
||
"""Wait for the cable state to be True or False.""" | ||
attempts = 0 | ||
cable_msg = "plugged" if do_cable else "unplugged" | ||
while attempts <= max_wait: | ||
attempts += 1 | ||
if has_cable(iface) == do_cable: | ||
print("Detected cable state: {}".format(cable_msg)) | ||
return | ||
time.sleep(1) | ||
raise SystemExit("Failed to detect {}!".format(cable_msg)) | ||
|
||
|
||
def help_wait_cable_and_routable_state(iface, do_check=True): | ||
if do_check: | ||
do_cable = True | ||
do_routable = True | ||
else: | ||
do_cable = False | ||
do_routable = False | ||
|
||
renderer = netplan_renderer() | ||
print( | ||
"Waiting for cable to get {}.".format( | ||
"connected" if do_cable else "disconnected" | ||
), | ||
flush=True, | ||
) | ||
wait_for_cable_state(iface, do_cable, 60) | ||
|
||
print( | ||
"Waiting for networkd/NetworkManager {}.".format( | ||
"routable" if do_routable else "NOT routable" | ||
) | ||
) | ||
wait_for_routable_state(iface, renderer, do_routable, 60) | ||
|
||
print("Cable {}!".format("connected" if do_cable else "disconnected")) | ||
print("Network {}!".format("routable" if do_routable else "NOT routable")) | ||
|
||
|
||
def main(): | ||
"""Entry point to the program.""" | ||
if len(sys.argv) != 2: | ||
|
@@ -37,28 +187,19 @@ def main(): | |
print("After 15 seconds plug it back in.") | ||
print("Checkbox session may be interrupted but it should come back up.") | ||
input() | ||
print("Waiting for cable to get disconnected.") | ||
elapsed = 0 | ||
while elapsed < 60: | ||
if not has_cable(sys.argv[1]): | ||
break | ||
time.sleep(1) | ||
print(".", flush=True, end="") | ||
elapsed += 1 | ||
else: | ||
raise SystemExit("Failed to detect unplugging!") | ||
print("Cable unplugged!") | ||
print("Waiting for the cable to get connected.") | ||
elapsed = 0 | ||
while elapsed < 60: | ||
if has_cable(sys.argv[1]): | ||
break | ||
time.sleep(1) | ||
print(".", flush=True, end="") | ||
elapsed += 1 | ||
|
||
help_wait_cable_and_routable_state(iface, False) | ||
|
||
print("\n\nPlease plug the cable back in.\n\n") | ||
|
||
help_wait_cable_and_routable_state(iface, True) | ||
|
||
print("Pinging gateway...") | ||
ping_state = perform_ping_test([iface]) | ||
if ping_state == 0: | ||
print("PASS: Ping to gateway successful") | ||
else: | ||
raise SystemExit("Failed to detect plugging it back!") | ||
print("Cable detected!") | ||
raise SystemExit("FAIL: Ping to gateway failed") | ||
|
||
|
||
if __name__ == "__main__": | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a case where you would want to return an empty dictionary and keep executing the script, even though the command returned something wrong?
Wouldn't it be better to just let the
CalledProcessError
exception bubble up?