Skip to content

Commit

Permalink
Fix: Eth hotplug use the right way to check cable before pinging
Browse files Browse the repository at this point in the history
  • Loading branch information
seankingyang committed Jan 23, 2025
1 parent a680960 commit bc55608
Show file tree
Hide file tree
Showing 2 changed files with 546 additions and 22 deletions.
167 changes: 145 additions & 22 deletions providers/base/bin/eth_hotplugging.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,103 @@
"""Check if hotplugging works on an ethernet port."""

import sys
import os
import time
import glob
import yaml
import subprocess as sp

from gateway_ping_test import perform_ping_test

NETPLAN_CFG_PATHS = ("/etc/netplan", "/lib/netplan", "/run/netplan")


def netplan_renderer():
"""
Check the renderer used by netplan on the system if it is networkd or
NetworkManager.
This function looks for the renderer used in the yaml files located in the
NETPLAN_CFG_PATHS directories, and returns the first renderer found.
If the renderer is not found, it defaults to "networkd".
If the netplan file is not found, it defaults to "NetworkManager".
"""
netplan_file_exist = False
for basedir in NETPLAN_CFG_PATHS:
if os.path.exists(basedir):
files = glob.glob(os.path.join(basedir, "*.yaml"))
for f in files:
netplan_file_exist = True
with open(f, "r") as file:
data = yaml.safe_load(file)
if "renderer" in data["network"]:
return data["network"]["renderer"]
if netplan_file_exist:
return "networkd"
return "NetworkManager"


def get_interface_info(interface, renderer):
"""
Get the interface information (state and gateway) from the renderer.
"""
if renderer == "networkd":
cmd = "networkctl status --no-pager --no-legend {}".format(interface)
key_map = {"State": "state", "Gateway": "gateway"}
elif renderer == "NetworkManager":
cmd = "nmcli device show {}".format(interface)
key_map = {"GENERAL.STATE": "state", "IP4.GATEWAY": "gateway"}
else:
raise ValueError("Unknown renderer: {}".format(renderer))

return _get_cmd_info(cmd, key_map, renderer)


def _get_cmd_info(cmd, key_map, renderer):
info = {}
try:
output = sp.check_output(cmd, shell=True)
for line in output.decode(sys.stdout.encoding).splitlines():
# Skip lines that don't have a "key: value" format
if ":" not in line:
continue
key, val = line.strip().split(":", maxsplit=1)
key = key.strip()
val = val.strip()
if key in key_map:
info[key_map[key]] = val
except sp.CalledProcessError as e:
print("Error running {} command: {}".format(renderer, e))
return info


def _check_routable_state(interface, renderer):
"""
Check if the interface is in a routable state depending on the renderer
"""
routable = False
state = ""
info = get_interface_info(interface, renderer)
state = info.get("state", "")
if renderer == "networkd":
routable = "routable" in state
elif renderer == "NetworkManager":
routable = "connected" in state and "disconnected" not in state
return (routable, state)


def wait_for_routable_state(
interface, renderer, do_routable=True, max_wait=30
):
attempts = 0
routable_msg = "routable" if do_routable else "NOT routable"
while attempts <= max_wait:
attempts += 1
(routable, _) = _check_routable_state(interface, renderer)
if routable == do_routable:
print("Reached {} state".format(routable_msg))
return
time.sleep(1)
raise SystemExit("Failed to reach {} state!".format(routable_msg))


def has_cable(iface):
Expand All @@ -17,6 +113,46 @@ def has_cable(iface):
return carrier.read()[0] == "1"


def wait_for_cable_state(iface, do_cable=True, max_wait=30):
"""Wait for the cable state to be True or False."""
attempts = 0
cable_msg = "plugged" if do_cable else "unplugged"
while attempts <= max_wait:
attempts += 1
if has_cable(iface) == do_cable:
print("Detected cable state: {}".format(cable_msg))
return
time.sleep(1)
raise SystemExit("Failed to detect {}!".format(cable_msg))


def help_wait_cable_and_routable_state(iface, do_check=True):
if do_check:
do_cable = True
do_routable = True
else:
do_cable = False
do_routable = False

renderer = netplan_renderer()
print(
"Waiting for cable to get {}.".format(
"connected" if do_cable else "disconnected"
)
)
wait_for_cable_state(iface, do_cable, 60)

print(
"Waiting for networkd/NetworkManager {}.".format(
"routable" if do_routable else "NOT routable"
)
)
wait_for_routable_state(iface, renderer, do_routable, 60)

print("Cable {}!".format("connected" if do_cable else "disconnected"))
print("Network {}!".format("routable" if do_routable else "NOT routable"))


def main():
"""Entry point to the program."""
if len(sys.argv) != 2:
Expand All @@ -37,28 +173,15 @@ def main():
print("After 15 seconds plug it back in.")
print("Checkbox session may be interrupted but it should come back up.")
input()
print("Waiting for cable to get disconnected.")
elapsed = 0
while elapsed < 60:
if not has_cable(sys.argv[1]):
break
time.sleep(1)
print(".", flush=True, end="")
elapsed += 1
else:
raise SystemExit("Failed to detect unplugging!")
print("Cable unplugged!")
print("Waiting for the cable to get connected.")
elapsed = 0
while elapsed < 60:
if has_cable(sys.argv[1]):
break
time.sleep(1)
print(".", flush=True, end="")
elapsed += 1
else:
raise SystemExit("Failed to detect plugging it back!")
print("Cable detected!")

help_wait_cable_and_routable_state(iface, False)

print("Please plug the cable back in.")

help_wait_cable_and_routable_state(iface, True)

print("Pinging gateway...")
perform_ping_test(iface)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit bc55608

Please sign in to comment.