-
Notifications
You must be signed in to change notification settings - Fork 0
/
graphtools.py
332 lines (248 loc) · 9.31 KB
/
graphtools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
#!/usr/bin/env python3
"""
Tools for graphviz parametrization
"""
import time,sys, warnings
import pygraphviz as pgv
from collections import OrderedDict
import yaml
from yaml import load,dump
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
tnow = time.perf_counter
def print_meta_graph_dict():
"""
Dict of properties of the graph that cannot be tuned with standard graphviz tools
"""
meta_graph_dict = {
'node_size_depth' : False,
'size_depth_init' : 20,
'size_depth_delta': 2,
}
return meta_graph_dict
def print_default_graph_param():
"""
Default graph parameters
Order of parameters matters
"""
graph_param_dict = OrderedDict({
('graph','outputorder') : 'edgesfirst',
('graph','splines') : 'spline', # 'compound', 'line', 'curved', 'polyline', 'none', 'ortho', 'spline'
('node','shape') : 'rectangle',
('graph','overlap') : 'false',
('graph','rankdir') : 'LR',
('graph','layout') : 'dot', # 'dot', 'fdp', 'neato', 'twopi'
('graph','pad') : 0.1,
('node','fontsize') : 14,
('node','style') : 'filled',
('node','fillcolor') : 'white',
})
return graph_param_dict
def get_node_depth(graph,node,depth=0):
"""
Get the depth of the node (an integer starting from 0) using the predecessors method.
"""
predecessors = graph.predecessors(node)
if len(predecessors) > 0:
depth = get_node_depth(graph,predecessors[0],depth=depth+1)
return depth
def get_all_graph_successors(graph,node,glob_list=None):
"""
Get all the successors of a node recursively.
"""
if glob_list is None:
glob_list = []
successors = graph.successors(node)
glob_list += successors
for successor in successors:
get_all_graph_successors(graph,successor,glob_list=glob_list)
return glob_list
def create_call_graph(graph_dict,callable_dict,root_node_name,hide_from_files=None,hide_nodes=None,allowed_connections=None,forbidden_connections=None):
"""
Create a pygraphviz graph based on callable_dict
"""
t1 = tnow()
print('\nCreating graph')
call_graph = pgv.AGraph(graph_dict,strict=False,directed=True)#.reverse()
root_node = call_graph.get_node(root_node_name)
#
# Remove the nodes that are from the files in hide_from_files list
# or if a node is in hide_nodes list
#
if hide_from_files is not None or hide_nodes is not None:
remove_nodes_from_file_or_list(call_graph,callable_dict,hide_from_files,hide_nodes)
#
# Allowed connections for specific nodes
#
if allowed_connections is not None:
modify_node_connections(call_graph,allowed_connections,action='keep')
if forbidden_connections is not None:
modify_node_connections(call_graph,forbidden_connections,action='exclude')
#
# Remove all the nodes that are not the successors of the root node
#
all_successors = get_all_graph_successors(call_graph,root_node)
#for node in call_graph.nodes():
# if node not in all_successors and node != root_node:
# safely_delete_node(call_graph,node)
print('Done: {:.2f} s'.format(tnow() - t1))
return call_graph
def remove_nodes_from_file_or_list(graph,callable_dict,hide_from_files,hide_nodes):
"""
Remove the nodes from the graph. The nodes that belong to specific files or
that are listed will be removed.
"""
for node in graph.nodes():
# Nodes specified explicitly by name
node_in_list = node in hide_nodes
# Nodes that belong to a file from the file list
node_from_hid_file = \
node in callable_dict.keys() and \
callable_dict[node].filename in hide_from_files
if node_in_list or node_from_hid_file:
safely_delete_node(graph,node)
def safely_delete_node(graph,node):
"""
Safely delete a node from a graph
"""
try:
if graph.has_node(node):
graph.delete_node(node)
else:
warn_message = f'Attempt to delete a node {node}, which is not in the graph.'
warnings.warn(warn_message)
except Exception:
raise RuntimeError(f'node error: node {node} !')
def modify_node_connections(graph,path_to_dict,action='keep'):
"""
Remove or to keep only some connections of specific nodes.
"""
with open(path_to_dict ,'r') as stream:
action_dict = load(stream,Loader=yaml.FullLoader)
for node_name, connections in action_dict.items():
# TODO try expect blocks
# Segmentation fault sometimes occures
if graph.has_node(node_name):
node = graph.get_node(node_name)
for successor in graph.itersucc(node):
if action == 'keep':
if successor not in connections:
safely_delete_node(graph,successor)
elif action == 'exclude':
if successor in connections:
safely_delete_node(graph,successor)
else:
sys.exit(f'Wrong action: {action}')
else:
warn_message = f'Attempt to access a node {node}, which is not in the graph.'
warnings.warn(warn_message)
def apply_node_size_depth(graph,size_init,size_delta,root_node):
"""
Make the size of the node change as a function of depth.
"""
node_list = graph.nodes()
depth_list = [get_node_depth(graph,x) for x in node_list]
for node_name, depth in zip(node_list,depth_list):
fontsize = size_init - size_delta * depth
node = graph.get_node(node_name)
node.attr['fontsize'] = fontsize
node.attr['width'] = 0.1
node.attr['height'] = 0.1
def get_meta_graph_dict(graph,param_dict,root_node):
"""
Read the entire meta dict first, then apply together with other parameters.
This is done to provide flexibility in the order.
"""
# Dictionary of graph properties
meta_graph_dict = print_meta_graph_dict()
manual_meta_graph_dict = {}
for key,value in param_dict.items():
# Meta parameters
if key[0] == 'meta':
param = key[1]
manual_meta_graph_dict[param] = value
meta_graph_dict = print_meta_graph_dict()
meta_graph_dict = meta_graph_dict | manual_meta_graph_dict
return meta_graph_dict
def apply_meta_properties(graph,meta_graph_dict,root_node):
"""
Properties of the graph that cannot be tuned with standard graphviz tools.
"""
if meta_graph_dict['node_size_depth']:
apply_node_size_depth(graph,meta_graph_dict['size_depth_init'],meta_graph_dict['size_depth_delta'],root_node)
def apply_graph_param(graph,param_dict,root_node):
"""
Apply graph parameters to graph.
Order in which parameters apply is important.
"""
# Dictionary of graph properties
meta_graph_dict = get_meta_graph_dict(graph,param_dict,root_node)
for key,value in param_dict.items():
# General graph parameters
if key[0] == 'graph':
param = key[1]
if param == 'layout':
graph.layout(prog=value)
else:
graph.graph_attr[param]=value
# Node parameters
elif key[0] == 'node':
# General node parameters
if len(key) == 2:
param = key[1]
graph.node_attr[param]=value
# Parameters for a specific node
elif len(key) == 3:
node_name = key[1]
param = key[2]
node = graph.get_node(node_name)
node.attr[param] = value
else:
sys.exit('Error in reading graph parameters.')
# Edge parameters
elif key[0] == 'edge':
# General node parameters
if len(key) == 2:
param = key[1]
graph.edge_attr[param]=value
else:
sys.exit('Error in reading graph parameters.')
# Meta parameters
elif key[0] == 'meta':
param = key[1]
if param == 'node_size_depth':
if meta_graph_dict['node_size_depth']:
size_init = meta_graph_dict['size_depth_init']
size_delta = meta_graph_dict['size_depth_delta']
apply_node_size_depth(graph,size_init,size_delta,root_node)
else:
print(type(key))
sys.exit('Error in reading graph parameters.')
def load_manual_param_dict(manual_param_path):
"""
Load the dict of parameters from YAML for graph and nodes
In this dict, keys are tuples, but YAML does not natively load tuples,
they will be read as string, therefore, a conversion needed.
"""
with open(manual_param_path ,'r') as stream:
key_string_dict = load(stream,Loader=yaml.FullLoader)
key_tuple_dict = OrderedDict()
# Convert string key to tuple key
for key_str, value in key_string_dict.items():
key_list = key_str.replace('(','').replace(')','').replace("'","").split(',')
key_tuple = tuple(key_list)
key_tuple_dict[key_tuple] = value
return key_tuple_dict
def set_graph_param(graph, root_node, manual_param_path = None):
"""
Wrapper to apply different sets of parameters to a graph
"""
# First, read the default parameters
graph_param_dict = print_default_graph_param()
# Second, read the custom graph parameters if specified
if manual_param_path is not None:
manual_graph_param_dict = load_manual_param_dict(manual_param_path)
graph_param_dict = graph_param_dict | manual_graph_param_dict
apply_graph_param(graph,graph_param_dict,root_node)