Skip to content

Commit

Permalink
tests: add end.b6.encaps topotest
Browse files Browse the repository at this point in the history
The topotest for End.B6.Encaps binding sid is provided.

At rt4 out the effect of End.B6.Encaps:
1. decrement the counter of the current segment list
2. add the SRH [0]fc00:0:5::, [1]fc00:0:4::

Signed-off-by: Dmytro Shytyi <[email protected]>
  • Loading branch information
Dmytro Shytyi authored and dmytroshytyi-6WIND committed Mar 28, 2024
1 parent 9a4280b commit fe9afd7
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 4 deletions.
Empty file.
25 changes: 25 additions & 0 deletions tests/topotests/isis_srv6_te_topo1/rt2/step6/show_ipv6_route.ref
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"fc00:0:2::/128":[
{
"prefix":"fc00:0:2::/128",
"prefixLen":128,
"protocol":"isis",
"vrfId":0,
"vrfName":"default",
"selected":true,
"installed":true,
"table":254,
"nexthops":[
{
"flags":3,
"fib":true,
"directlyConnected":true,
"weight":1,
"seg6local":{
"action":"End"
}
}
]
}
]
}
208 changes: 204 additions & 4 deletions tests/topotests/isis_srv6_te_topo1/test_isis_srv6_te_topo1.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import json
import functools
import pytest
import subprocess

CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, "../"))
Expand All @@ -89,6 +90,7 @@
required_linux_kernel_version,
create_interface_in_kernel,
)
from multiprocessing import Process

pytestmark = [pytest.mark.isisd, pytest.mark.bgpd]

Expand Down Expand Up @@ -431,7 +433,7 @@ def test_srv6_te_policy_additional_route():


def test_srv6_te_policy_removed():
logger.info("Test (step 3): verify SRv6 TE policy removed")
logger.info("Test (step 4): verify SRv6 TE policy removed")
tgen = get_topogen()

# Skip if previous fatal error condition is raised
Expand All @@ -440,6 +442,7 @@ def test_srv6_te_policy_removed():

tgen.gears["rt1"].vtysh_cmd(
"configure \n \
no ipv6 route fc00:0:6b::/48 fc00:0:6:: color 1 \n \
segment-routing \n \
traffic-eng \n \
no policy color 1 endpoint fc00:0:6:: \n \
Expand All @@ -449,11 +452,208 @@ def test_srv6_te_policy_removed():
!"
)

for rname in ["rt1"]:
# for rname in ["rt1"]:
# router_compare_json_output(
# rname,
# "show ipv6 route static json",
# "step4/show_ipv6_route.ref",
# )


#
# Step 5
#
# Test SRv6 End.B6.Encaps
#


def test_srv6_end_b6_encaps():
logger.info("Test (step 5): verify SRv6 END.B6.Encaps")
tgen = get_topogen()

# Skip if previous fatal error condition is raised
if tgen.routers_have_failure():
pytest.skip(tgen.errors)

tgen.gears["rt1"].vtysh_cmd(
"configure \n \
ipv6 route fc00:0:6b::/48 fc00:0:6:: color 1 \n \
segment-routing \n \
traffic-eng \n \
segment-list srv6 \n \
index 1 ipv6-address fc00:0:2:: \n \
index 2 ipv6-address fc00:0:6:: \n \
exit \n \
policy color 1 endpoint fc00:0:6:: \n \
candidate-path preference 1 name srv6 explicit segment-list srv6 \n \
exit \n \
exit \n \
!"
)

tgen.gears["rt2"].vtysh_cmd(
"configure \n \
segment-routing \n \
traffic-eng \n \
segment-list srv6-header \n \
index 1 ipv6-address fc00:0:4:: \n \
index 2 ipv6-address fc00:0:5:: \n \
exit \n \
policy color 1 endpoint fc00:0:6:: \n \
srv6-binding-sid fc00:0:2:: \n \
candidate-path preference 1 name srv6 explicit segment-list srv6-header \n \
exit \n \
exit \n \
!"
)

tgen.gears["rt5"].vtysh_cmd(
"configure \n \
interface sr0 \n \
ipv6 address fc00:0:5::/48 \n \
exit"
)

tgen.gears["rt5"].run("sysctl -w net.ipv6.conf.all.seg6_enabled=1")
tgen.gears["rt5"].run("sysctl -w net.ipv6.conf.default.seg6_enabled=1")
tgen.gears["rt5"].run("sysctl -w net.ipv6.conf.eth-rt4.seg6_enabled=1")

subprocess.check_call(["apt", "install", "-y", "python3-venv"])
if not os.path.isdir("/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/scapy-latest"):
subprocess.check_call(
["git", "clone", "https://github.com/secdev/scapy.git", "/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/scapy-latest"]
)
if not os.path.isdir("/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/python_packages"):
subprocess.check_call(
["pip", "download", "--dest", "/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/python_packages/", "setuptools==62.0.0"]
)
subprocess.check_call(
["pip", "download", "--dest", "/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/python_packages/", "wheel"]
)
tgen.gears["rt2"].run(
"python3 -m venv /tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/; source /tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/venv/bin/activate; pip3 install --no-index --force-reinstall --find-links=/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/python_packages/ setuptools==62.0.0; pip3 install --no-index --find-links=/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/python_packages /tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/scapy-latest/"
)

def ping_func():
tgen = get_topogen()
tgen.gears["src"].run("ping 10.0.10.2 -c 128 -i 0.1")

p1 = Process(target=ping_func)
p1.start()
# tgen.gears["rt2"].run("tcpdump -i any -q -w ./rt2-dump.pcap & ping 10.0.10.2 -c 5; killall tcpdump")
tgen.gears["rt2"].run("touch ./rt2-dump.pcap && chmod oug+rwx ./rt2-dump.pcap")
tgen.gears["rt2"].run("tcpdump -i any -w ./rt2-dump.pcap -c 64")

# tgen.gears["rt2"].run("tshark -ni any -w /tmp/rt2-dump.pcap -c 64")
p1.join()

file_content = """\
from scapy.all import rdpcap, IPv6, IPv6ExtHdrSegmentRouting
scapy_cap = rdpcap(
"/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/rt2-dump.pcap"
)
# scapy_cap = rdpcap('/tmp/rt2-dump.pcap')
output_pkt_flag = False
outer_srh = -1
outer_srhl = -1
inner_srh = -1
inner_srhl = -1
inner_most_srh = -1
inner_most_srhl = -1
for packet in scapy_cap:
if "echo-request" in str(packet):
print(packet)
if packet.haslayer(IPv6) and packet.haslayer(IPv6ExtHdrSegmentRouting):
outer_srh = packet[IPv6][IPv6ExtHdrSegmentRouting].addresses
outer_srhl = packet[IPv6][IPv6ExtHdrSegmentRouting].segleft
encap = packet[IPv6][IPv6ExtHdrSegmentRouting]
if encap.haslayer(IPv6) and encap.haslayer(IPv6ExtHdrSegmentRouting):
inner_srh = encap[IPv6][IPv6ExtHdrSegmentRouting].addresses
inner_srhl = encap[IPv6][IPv6ExtHdrSegmentRouting].segleft
bgp_encap = encap[IPv6][IPv6ExtHdrSegmentRouting]
if bgp_encap.haslayer(IPv6) and bgp_encap.haslayer(
IPv6ExtHdrSegmentRouting
):
inner_most_srh = bgp_encap[IPv6][
IPv6ExtHdrSegmentRouting
].addresses
inner_most_srhl = bgp_encap[IPv6][
IPv6ExtHdrSegmentRouting
].segleft
if (
("fc00:0:6::" in inner_srh)
and ("fc00:0:2::" in inner_srh)
and (inner_srhl == 0)
and ("fc00:0:5::" in outer_srh)
and ("fc00:0:4::" in outer_srh)
and (outer_srhl == 1)
and ("fc00:0:6b:1::" in inner_most_srh)
and (inner_most_srhl == 0)
):
output_pkt_flag = True
else:
print(
"Observed: outer srh: {}, segleft={}, inner srh: {}, segleft={}, inner_most_srh: {}, segleft={}".format(
outer_srh,
outer_srhl,
inner_srh,
inner_srhl,
inner_most_srh,
inner_most_srhl,
)
)
if output_pkt_flag == False:
assertmsg = ('Unexpected SRH in the captured packet in rt2'
'Expected outer srh: [fc00:0:5::, fc00:0:4::], segleft=1, inner srh: [fc00:0:6::, fc00:0:2::], segleft=0, inner_most_srh: [fc00:0:6b:1::], segleft=0')
assert False, assertmsg
"""

file_path = (
"/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/scapy_test_srv6.py"
)
with open(file_path, "w") as file:
file.write(file_content)

activate_cmd = f"source {os.path.join('/tmp/topotests/isis_srv6_te_topo1.test_isis_srv6_te_topo1/rt2/venv', 'bin', 'activate')}"
subprocess.run(
[activate_cmd, ";", "python3", file_path], shell=True, executable="/bin/bash"
)



#
# Step 6
#
# Test SRv6 End.B6.Encaps removal
#


def test_srv6_end_b6_encaps_removal():
logger.info("Test (step 6): verify SRv6 END.B6.Encaps removal")
tgen = get_topogen()

# Skip if previous fatal error condition is raised
if tgen.routers_have_failure():
pytest.skip(tgen.errors)

tgen.gears["rt2"].vtysh_cmd(
"configure \n \
segment-routing \n \
traffic-eng \n \
policy color 1 endpoint fc00:0:6:: \n \
no srv6-binding-sid fc00:0:2:: \n \
exit \n \
exit \n \
!"
)

for rname in ["rt2"]:
router_compare_json_output(
rname,
"show ipv6 route static json",
"step4/show_ipv6_route.ref",
"show ipv6 route fc00:0:2:: json",
"step6/show_ipv6_route.ref",
)


Expand Down

0 comments on commit fe9afd7

Please sign in to comment.