-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathlabel_connected_comp.py
231 lines (198 loc) · 8.96 KB
/
label_connected_comp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# Urban_PointCloud_Processing by Amsterdam Intelligence, GPL-3.0 license
import numpy as np
import logging
# Two libraries necessary for the CloudCompare Python wrapper
# Installation instructions in notebook [5. Region growing.ipynb]
import pycc
import cccorelib
from ..abstract_processor import AbstractProcessor
from ..labels import Labels
from ..utils.math_utils import get_octree_level
logger = logging.getLogger(__name__)
class LabelConnectedComp(AbstractProcessor):
"""
Clustering based region growing implementation using the label connected
components method from CloudCompare.
Parameters
----------
label : int (default: -1)
Label to use when labelling the grown region.
exclude_labels : list-like (default: [])
Which labels to exclude (optional).
grid_size : float (default: 0.1)
Octree grid size for LCC method, in meters. Lower means a more
fine-grained clustering.
min_component_size : int (default: 100)
Minimum size of connected components to consider.
threshold : float (default: 0.5)
When labelling a cluster, at least this proportion of points should
already be labelled.
"""
def __init__(self, label=-1, exclude_labels=[], set_debug=False,
grid_size=0.1, min_component_size=100, threshold=0.1):
super().__init__(label)
""" Init variables. """
self.grid_size = grid_size
self.min_component_size = min_component_size
self.threshold = threshold
self.exclude_labels = exclude_labels
self.debug = set_debug
def _set_mask(self):
""" Configure the points that we want to perform region growing on. """
for exclude_label in self.exclude_labels:
self.mask = self.mask & (self.point_labels != exclude_label)
def _convert_input_cloud(self, points):
""" Function to convert to CloudCompare point cloud. """
# Be aware that CloudCompare stores coordinates on 32 bit floats.
# To avoid losing too much precision you should 'shift' your
# coordinates if they are 64 bit floats (which is the default in
# python land)
xs = (points[self.mask, 0]).astype(pycc.PointCoordinateType)
ys = (points[self.mask, 1]).astype(pycc.PointCoordinateType)
if points.shape[1] == 2:
zs = np.zeros(xs.shape).astype(pycc.PointCoordinateType)
else:
zs = (points[self.mask, 2]).astype(pycc.PointCoordinateType)
point_cloud = pycc.ccPointCloud(xs, ys, zs)
# (Optional) Create (if it does not exists already)
# a scalar field where we store the Labels
labels_sf_idx = point_cloud.getScalarFieldIndexByName('Labels')
if labels_sf_idx == -1:
labels_sf_idx = point_cloud.addScalarField('Labels')
point_cloud.setCurrentScalarField(labels_sf_idx)
# Compute the octree level based on the grid_size
self.octree_level = get_octree_level(points, self.grid_size)
self.labels_sf_idx = labels_sf_idx
# You can access the x,y,z fields using self.point_cloud.points()
self.point_cloud = point_cloud
def _label_connected_comp(self):
""" Perform the clustering algorithm: Label Connected Components. """
(cccorelib
.AutoSegmentationTools
.labelConnectedComponents(self.point_cloud, level=self.octree_level))
# Get the scalar field with labels and points coords as numpy array
labels_sf = self.point_cloud.getScalarField(self.labels_sf_idx)
self.point_components = np.around(labels_sf.asArray())
# Filter based on min_component_size
if self.min_component_size > 1:
cc_labels, counts = np.unique(self.point_components,
return_counts=True)
cc_labels_filtered = cc_labels[counts < self.min_component_size]
self.point_components[
np.in1d(self.point_components, cc_labels_filtered)] = -1
def _fill_components(self):
""" Clustering based region growing process. When one initial seed
point is found inside a component, make the whole component this
label. """
mask_indices = np.where(self.mask)[0]
label_mask = np.zeros(len(self.mask), dtype=bool)
cc_labels = np.unique(self.point_components)
cc_labels = set(cc_labels).difference((-1,))
cc_count = 0
for cc in cc_labels:
# select points that belong to the cluster
cc_mask = (self.point_components == cc)
# cluster size
# TODO is this still needed? (see line 77-80)
cc_size = np.count_nonzero(cc_mask)
if cc_size < self.min_component_size:
continue
# number of point in the cluster that are labelled as seed point
seed_count = np.count_nonzero(
self.point_labels[mask_indices[cc_mask]] == self.label)
# at least X% of the cluster should be seed points
if (float(seed_count) / cc_size) > self.threshold:
label_mask[mask_indices[cc_mask]] = True
cc_count += 1
# Add label to the regions
labels = self.point_labels
labels[label_mask] = self.label
logger.debug(f'Found {len(cc_labels)} clusters of ' +
f'>{self.min_component_size} points; ' +
f'{cc_count} added.')
return label_mask
def get_label_mask(self, points, labels, mask=None):
"""
Returns the label mask for the given pointcloud.
Parameters
----------
points : array of shape (n_points, 3) or (n_points, 2)
The point cloud <x, y, z>. If only two dimensions are provided, 'z'
is assumed to be all zeros.
labels : array of shape (n_points,)
The labels corresponding to each point.
mask : array of shape (n_points,) with dtype=bool
Pre-mask used to label only a subset of the points. Can be
overwritten by setting `exclude_labels` in the constructor.
Returns
-------
An array of shape (n_points,) with dtype=bool indicating which points
should be labelled according to this fuser.
"""
if not self.debug:
logger.info('Clustering based Region Growing ' +
f'(label={self.label}, {Labels.get_str(self.label)}).')
else:
logger.debug('Clustering based Region Growing ' +
f'(label={self.label}, ' +
f'{Labels.get_str(self.label)}).')
if self.label == -1:
logger.warning('Label not set, defaulting to -1.')
self.point_labels = labels
if self.exclude_labels:
self.mask = np.ones((len(points),), dtype=bool)
self._set_mask()
elif mask is None:
self.mask = np.ones((len(points),), dtype=bool)
else:
self.mask = mask.copy()
# We need to un-mask all points of the desired class label.
self.mask[labels == self.label] = True
self._convert_input_cloud(points)
self._label_connected_comp()
label_mask = self._fill_components()
return label_mask
def get_labels(self, points, labels, mask=None, tilecode=None):
"""
Returns the labels for the given pointcloud.
Parameters
----------
points : array of shape (n_points, 3)
The point cloud <x, y, z>.
labels : array of shape (n_points,)
The labels corresponding to each point.
mask : array of shape (n_points,) with dtype=bool
Ignored by this class, use `exclude_labels` in the constructor
instead.
tilecode : str
Ignored by this class.
Returns
-------
An array of shape (n_points,) with the updated labels.
"""
label_mask = self.get_label_mask(points, labels, mask)
labels[label_mask] = self.label
return labels
def get_components(self, points, labels=None):
"""
Simply get the components without caring about labels.
Parameters
----------
points : array of shape (n_points, 3) or (n_points, 2)
The point cloud <x, y, z>. If only two dimensions are provided, 'z'
is assumed to be all zeros.
labels : array of shape (n_points,)
The labels corresponding to each point. Optional, only used in
combination with `exclude_labels`.
Returns
-------
An array of shape (n_points,) with cluster labels for each point.
Clusters with a size less than `min_component_size` are labelled as -1.
"""
self.mask = np.ones((len(points),), dtype=bool)
if labels is not None and self.exclude_labels:
self.point_labels = labels
self._set_mask()
self._convert_input_cloud(points)
self._label_connected_comp()
return self.point_components