-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathopergroup.py
executable file
·85 lines (71 loc) · 2.16 KB
/
opergroup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import sys
import json
# count_up_uplinks returns the number of monitored uplinks that have oper-state=up
def count_up_uplinks(paths):
up_cnt = 0
for path in paths:
if path.get("value", "down") == "up":
up_cnt = up_cnt + 1
return up_cnt
# required_up_uplinks returns the value of the `required-up-uplinks` option
def required_up_uplinks(options):
return int(options.get("required-up-uplinks", 1))
# main entry function for event handler
def event_handler_main(in_json_str):
# parse input json string passed by event handler
in_json = json.loads(in_json_str)
paths = in_json["paths"]
options = in_json["options"]
num_up_uplinks = count_up_uplinks(paths)
downlinks_new_state = (
"down" if num_up_uplinks < required_up_uplinks(options) else "up"
)
if options.get("debug") == "true":
print(
f"num of required up uplinks = {required_up_uplinks(options)}\n\
detected num of up uplinks = {num_up_uplinks}\n\
downlinks new state = {downlinks_new_state}"
)
response_actions = []
for downlink in options.get("down-links", []):
response_actions.append(
{
"set-ephemeral-path": {
"path": f"interface {downlink} oper-state",
"value": downlinks_new_state,
}
}
)
response = {"actions": response_actions}
return json.dumps(response)
#
# This code is only if you want to test it from bash - this isn't used when invoked from SRL
#
def main():
example_in_json_str = """
{
"paths": [
{
"path":"interface ethernet-1/49 oper-status",
"value":"down"
},
{
"path":"interface ethernet-1/50 oper-status",
"value":"down"
}
],
"options": {
"required-up-uplinks":1,
"down-links": [
"Ethernet-1/1",
"Ethernet-1/2"
],
"debug": "true"
},
"persistent-data": {"last-state":"up"}
}
"""
json_response = event_handler_main(example_in_json_str)
print(f"Response JSON:\n{json_response}")
if __name__ == "__main__":
sys.exit(main())