-
Notifications
You must be signed in to change notification settings - Fork 0
/
collect.py
155 lines (108 loc) · 3.7 KB
/
collect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import docker
import utils
import sys
import generate
client = docker.from_env()
def find_container_by_network(network_name):
containers = client.containers.list()
found = []
for container in containers:
attribute = container.attrs
networks = attribute['NetworkSettings']['Networks'].keys()
if network_name in networks:
found.append(container.id)
return found
def collect_Container_data(c):
data = {
'image': '',
'container_name': '',
'restart': '',
'networks': {},
'command': '',
'labels': {},
'volumes': [],
'environment': [],
'ports': []
}
attributes = c.attrs
# set image
utils.find_and_set(attributes, 'Config.Image', data, 'image')
# set container name
data['container_name'] = c.name
# restart policy
utils.find_and_set(
attributes, 'HostConfig.RestartPolicy.Name', data, 'restart')
if data['restart'] == 'no':
data['restart'] = None
# set container labels
c_labels: dict = attributes['Config']['Labels']
if c_labels:
for key in c_labels.keys():
data['labels'][key] = c_labels[key]
# set container CMD
if utils.find(attributes, 'Config.Cmd'):
data['command'] = " ".join(
attributes['Config']['Cmd'])
# set networks
networks = utils.find(attributes, 'NetworkSettings.Networks')
if networks:
data['networks'] = [x for x in networks.keys()]
# ser volumes
utils.find_and_set(attributes, 'HostConfig.Binds', data, 'volumes')
# set environments
c_env = utils.find(attributes, 'Config.Env')
if c_env:
for env in c_env:
for ignore in utils.ignoreEvn:
if ignore not in env:
equal_index = env.index('=')
key = env[:equal_index]
value = env[equal_index+1:]
data['environment'].append({key: value})
# set exposed port
exposed_port = utils.find(attributes, 'HostConfig.PortBindings')
if exposed_port:
for port in exposed_port:
container_port = port
item = exposed_port[port]
host_port = item[0]['HostIp'] + item[0]['HostPort']
data['ports'].append(f'{host_port}:{container_port}')
# filter truthy value
filtered_data = data.copy()
for key in data:
if not data[key]:
del filtered_data[key]
return filtered_data
def collect_by_identifier(container_identifier):
exist = False
data = None
networks = {}
containers = client.containers.list()
for container in containers:
found = container.name == container_identifier or \
container_identifier in container.id
if found:
exist = True
c = client.containers.get(container.id)
data = collect_Container_data(c)
# find network driver
if data['networks']:
networklist = client.networks.list()
for network in networklist:
if network.attrs['Name'] in data['networks']:
networks[network.attrs['Name']] = {
'external': (not network.attrs['Internal'])}
return (exist, data, networks)
def collect(containers):
services = []
networks = {}
for container in containers:
exist, c_service, c_networks = collect_by_identifier(container)
if exist:
services.append(c_service)
if c_networks:
networks.update(c_networks)
if not c_service:
print('container notfound')
sys.exit(1)
return (services, networks)