From 28b6a65e40450bb3b892759b1d950b7de9763ebd Mon Sep 17 00:00:00 2001 From: Marius Leustean Date: Fri, 3 Feb 2023 15:39:36 +0200 Subject: [PATCH 1/2] Shard affinity for k8s workloads Instances that are part of the same K8S cluster will get scheduled to the same shard (vCenter). It identifies the K8S cluster by looking at the tags or metadata set by the k8s cluster orchestrators when creating the instances. Kubernikus and Gardener are supported for now. It queries the database to determine the dominant shard, by looking which shard contains the most instances of a given K8S cluster. BigVMs are "out of the picture" and should not adhere to shards. They are only scheduled on their allocated hosts. The K8S logic is skipped for offline migrations (and thus for resizes too) since offline migration is a non-usecase for K8S. Change-Id: I73d04ba295d23db1d4728e9db124fc2a27c2d4bc --- nova/db/main/api.py | 75 ++++ nova/scheduler/filters/shard_filter.py | 134 ++++++- .../scheduler/filters/test_shard_filter.py | 354 +++++++++++++++--- 3 files changed, 516 insertions(+), 47 deletions(-) diff --git a/nova/db/main/api.py b/nova/db/main/api.py index ae245351063..ac886f736d6 100644 --- a/nova/db/main/api.py +++ b/nova/db/main/api.py @@ -2087,6 +2087,81 @@ def instance_get_active_by_window_joined(context, begin, end=None, return _instances_fill_metadata(context, query.all(), manual_joins) +@require_context +@pick_context_manager_reader_allow_async +def get_k8s_hosts_by_instances_tag(context, tag, filters=None): + """Get the list of K8S hosts and the number of instances associated to + the K8S cluster running on that host, querying by instances tags. + + Returns a list of tuple + [(host1, 3), (host2, 1)] + """ + count_label = func.count('*').label('count') + query = context.session.query(models.Instance.host, count_label). \ + join(models.Instance.tags) + query = _handle_k8s_hosts_query_filters(query, filters) + query = query.filter(models.Instance.deleted == 0, + models.Tag.tag == tag) + + query = query.group_by(models.Instance.host). \ + order_by(sql.desc(count_label)) + + return query.all() + + +@require_context +@pick_context_manager_reader_allow_async +def get_k8s_hosts_by_instances_metadata(context, meta_key, meta_value, + filters=None): + """Get the list of K8S hosts and the number of instances associated to + the K8S cluster running on that host, querying by instances metadata. + + Returns a list of tuple + [(host1, 3), (host2, 1)] + """ + count_label = func.count('*').label('count') + query = context.session.query(models.Instance.host, count_label). \ + join(models.InstanceMetadata, + models.InstanceMetadata.instance_uuid == models.Instance.uuid) + query = _handle_k8s_hosts_query_filters(query, filters) + query = query.filter(models.Instance.deleted == 0, + models.InstanceMetadata.deleted == 0, + models.InstanceMetadata.key == meta_key, + models.InstanceMetadata.value == meta_value) + query = query.group_by(models.Instance.host). \ + order_by(sql.desc(count_label)) + + return query.all() + + +def _handle_k8s_hosts_query_filters(query, filters=None): + """Applies filters to the K8S related queries. + + Supported filters: + filters = { + 'hv_type': 'The hypervisor_type', + 'availability_zone': 'The availability zone' + } + """ + if not filters: + return query + hv_type = filters.get('hv_type') + if hv_type: + query = query.join( + models.ComputeNode, + sql.and_( + models.ComputeNode.deleted == 0, + models.ComputeNode.hypervisor_hostname == models.Instance.node, + models.ComputeNode.hypervisor_type == hv_type)) + + availability_zone = filters.get('availability_zone') + if availability_zone: + query = query.filter( + models.Instance.availability_zone == availability_zone) + + return query + + def _instance_get_all_query(context, project_only=False, joins=None): if joins is None: joins = ['info_cache', 'security_groups'] diff --git a/nova/scheduler/filters/shard_filter.py b/nova/scheduler/filters/shard_filter.py index bc9b5ff7f40..bf024cc4917 100644 --- a/nova/scheduler/filters/shard_filter.py +++ b/nova/scheduler/filters/shard_filter.py @@ -12,9 +12,17 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from collections import defaultdict + from oslo_log import log as logging import nova.conf +from nova import context as nova_context +from nova.db.main import api as main_db_api +from nova import exception +from nova.objects.aggregate import AggregateList +from nova.objects.build_request import BuildRequest +from nova.objects.instance import Instance from nova.scheduler import filters from nova.scheduler.mixins import ProjectTagMixin from nova.scheduler import utils @@ -24,6 +32,11 @@ CONF = nova.conf.CONF +GARDENER_PREFIX = "kubernetes.io-cluster-" +KKS_PREFIX = "kubernikus:kluster" +HANA_PREFIX = "hana_" +VMWARE_HV_TYPE = 'VMware vCenter Server' + class ShardFilter(filters.BaseHostFilter, ProjectTagMixin): """Filter hosts based on the vcenter-shard configured in their aggregate @@ -32,6 +45,8 @@ class ShardFilter(filters.BaseHostFilter, ProjectTagMixin): Alternatively the project may have the "sharding_enabled" tag set, which enables the project for hosts in all shards. + + Implements `filter_all` directly instead of `host_passes` """ _ALL_SHARDS = "sharding_enabled" @@ -45,11 +60,114 @@ def _get_shards(self, project_id): # _get_shards() so it's clear what we return return self._get_tags(project_id) - def host_passes(self, host_state, spec_obj): + def _get_k8s_shard(self, spec_obj): + """Returns the dominant shard of a K8S cluster. + + Returns None in any of the following scenarios: + - the request is not for an instance that's part of a K8S cluster + - this is the first instance of a new cluster + - the request is for a HANA flavor + - the request is for a resize/migration + """ + if (spec_obj.flavor.name.startswith(HANA_PREFIX) or + utils.request_is_resize(spec_obj)): + return None + elevated = nova_context.get_admin_context() + build_request = None + instance = None + + def _get_tags(): + return build_request.tags if build_request \ + else instance.tags + + def _get_metadata(): + return build_request.instance.metadata if build_request \ + else instance.metadata + + check_type = spec_obj.get_scheduler_hint('_nova_check_type') + if not check_type: + build_request = BuildRequest.get_by_instance_uuid( + elevated, spec_obj.instance_uuid) + if not build_request: + instance = Instance.get_by_uuid( + elevated, spec_obj.instance_uuid, + expected_attrs=['tags', 'metadata']) + if not instance and not build_request: + LOG.warning("There were no build_request and no instance " + "for the uuid %s", spec_obj.instance_uuid) + return + + kks_tag = next((t.tag for t in _get_tags() + if t.tag.startswith(KKS_PREFIX)), None) + gardener_meta = None + if not kks_tag: + gardener_meta = \ + {k: v for k, v in _get_metadata().items() + if k.startswith(GARDENER_PREFIX)} + + if not kks_tag and not gardener_meta: + return None + + q_filters = {'hv_type': VMWARE_HV_TYPE} + if spec_obj.availability_zone: + q_filters['availability_zone'] = spec_obj.availability_zone + + results = None + if kks_tag: + results = nova_context.scatter_gather_skip_cell0( + elevated, main_db_api.get_k8s_hosts_by_instances_tag, + kks_tag, filters=q_filters) + elif gardener_meta: + (meta_key, meta_value) = next(iter(gardener_meta.items())) + results = nova_context.scatter_gather_skip_cell0( + elevated, main_db_api.get_k8s_hosts_by_instances_metadata, + meta_key, meta_value, filters=q_filters) + + if not results: + return None + + # hosts with count of instances from this K8S cluster + # {host: } + k8s_hosts = defaultdict(lambda: 0) + + for cell_uuid, cell_result in results.items(): + if nova_context.is_cell_failure_sentinel(cell_result): + raise exception.NovaException( + "Unable to schedule the K8S instance because " + "cell %s is not responding." % cell_uuid) + cell_hosts = dict(cell_result) + for h, c in cell_hosts.items(): + k8s_hosts[h] += c + + if not k8s_hosts: + return None + + all_shard_aggrs = [agg for agg in AggregateList.get_all(elevated) + if agg.name.startswith(self._SHARD_PREFIX)] + if not all_shard_aggrs: + return None + + shard_aggr = sorted( + all_shard_aggrs, + reverse=True, + key=lambda aggr: sum(i for h, i in k8s_hosts.items() + if h in aggr.hosts))[0] + + return shard_aggr.name + + def filter_all(self, filter_obj_list, spec_obj): # Only VMware if utils.is_non_vmware_spec(spec_obj): - return True + LOG.debug("ShardFilter is not applicable for this non-vmware " + "request") + return filter_obj_list + + k8s_shard = self._get_k8s_shard(spec_obj) + + return [host_state for host_state in filter_obj_list + if self._host_passes(host_state, spec_obj, k8s_shard)] + def _host_passes(self, host_state, spec_obj, k8s_shard): host_shard_aggrs = [aggr for aggr in host_state.aggregates if aggr.name.startswith(self._SHARD_PREFIX)] @@ -79,14 +197,12 @@ def host_passes(self, host_state, spec_obj): if self._ALL_SHARDS in shards: LOG.debug('project enabled for all shards %(project_shards)s.', {'project_shards': shards}) - return True elif host_shard_names & set(shards): LOG.debug('%(host_state)s shard %(host_shard)s found in project ' 'shards %(project_shards)s.', {'host_state': host_state, 'host_shard': host_shard_names, 'project_shards': shards}) - return True else: LOG.debug('%(host_state)s shard %(host_shard)s not found in ' 'project shards %(project_shards)s.', @@ -94,3 +210,13 @@ def host_passes(self, host_state, spec_obj): 'host_shard': host_shard_names, 'project_shards': shards}) return False + + if k8s_shard: + if k8s_shard not in host_shard_names: + LOG.debug("%(host_state)s is not part of the requested " + "K8S cluster shard '%(k8s_shard)s'", + {'host_state': host_state, + 'k8s_shard': k8s_shard}) + return False + + return True diff --git a/nova/tests/unit/scheduler/filters/test_shard_filter.py b/nova/tests/unit/scheduler/filters/test_shard_filter.py index 93570192b4c..4bb4a4de8c9 100644 --- a/nova/tests/unit/scheduler/filters/test_shard_filter.py +++ b/nova/tests/unit/scheduler/filters/test_shard_filter.py @@ -16,9 +16,12 @@ import mock +from nova.db.main import api as main_db_api from nova import objects from nova.scheduler.filters import shard_filter from nova import test +from nova.tests.unit import fake_flavor +from nova.tests.unit import fake_instance from nova.tests.unit.scheduler import fakes @@ -31,6 +34,13 @@ def setUp(self): 'foo': ['vc-a-0', 'vc-b-0'], 'last_modified': time.time() } + self.fake_instance = fake_instance.fake_instance_obj( + mock.sentinel.ctx, expected_attrs=['metadata', 'tags']) + build_req = objects.BuildRequest() + build_req.instance_uuid = self.fake_instance.uuid + build_req.tags = objects.TagList(objects=[]) + build_req.instance = self.fake_instance + self.fake_build_req = build_req @mock.patch('nova.scheduler.filters.shard_filter.' 'ShardFilter._update_cache') @@ -63,93 +73,130 @@ def set_cache(): ['vc-a-1', 'vc-b-0']) mock_update_cache.assert_called_once() + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_shard_baremetal_passes(self, agg_mock): + def test_shard_baremetal_passes(self, agg_mock, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) extra_specs = {'capabilities:cpu_arch': 'x86_64'} spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs=extra_specs)) - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'], + extra_specs=extra_specs)) + self._assert_passes(host, spec_obj, True) + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.shard_filter.' 'ShardFilter._update_cache') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_shard_project_not_found(self, agg_mock, mock_update_cache): + def test_shard_project_not_found(self, agg_mock, mock_update_cache, + get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='bar', - flavor=objects.Flavor(extra_specs={})) - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) + self._assert_passes(host, spec_obj, False) + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_shard_project_no_shards(self, agg_mock): + def test_shard_project_no_shards(self, agg_mock, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) self.filt_cls._PROJECT_TAG_CACHE['foo'] = [] - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_shard_host_no_shard_aggregate(self, agg_mock): + def test_shard_host_no_shard_aggregate(self, agg_mock, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req host = fakes.FakeHostState('host1', 'compute', {}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) agg_mock.return_value = {} - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) - def test_shard_host_no_shards_in_aggregate(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_host_no_shards_in_aggregate(self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) - def test_shard_project_shard_match_host_shard(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_shard_match_host_shard(self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, True) - def test_shard_project_shard_do_not_match_host_shard(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_shard_do_not_match_host_shard(self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-1', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) - def test_shard_project_has_multiple_shards_per_az(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_multiple_shards_per_az(self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-1', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) self.filt_cls._PROJECT_TAG_CACHE['foo'] = ['vc-a-0', 'vc-a-1', 'vc-b-0'] - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, True) - def test_shard_project_has_multiple_shards_per_az_resize_same_shard(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_multiple_shards_per_az_resize_same_shard( + self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1', 'host2']), objects.Aggregate(id=1, name='vc-a-1', hosts=['host1', @@ -157,40 +204,55 @@ def test_shard_project_has_multiple_shards_per_az_resize_same_shard(self): host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={}), + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs']), scheduler_hints=dict(_nova_check_type=['resize'], source_host=['host2'])) self.filt_cls._PROJECT_TAG_CACHE['foo'] = ['vc-a-0', 'vc-a-1', 'vc-b-0'] - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, True) - def test_shard_project_has_multiple_shards_per_az_resize_other_shard(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_multiple_shards_per_az_resize_other_shard( + self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1', 'host2']), objects.Aggregate(id=1, name='vc-a-1', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={}), + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs']), + instance_uuid=self.fake_build_req.instance_uuid, scheduler_hints=dict(_nova_check_type=['resize'], source_host=['host2'])) self.filt_cls._PROJECT_TAG_CACHE['foo'] = ['vc-a-0', 'vc-a-1', 'vc-b-0'] - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, True) - def test_shard_project_has_sharding_enabled_any_host_passes(self): + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_sharding_enabled_any_host_passes( + self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req self.filt_cls._PROJECT_TAG_CACHE['baz'] = ['sharding_enabled'] aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), objects.Aggregate(id=1, name='vc-a-0', hosts=['host1'])] host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='baz', - flavor=objects.Flavor(extra_specs={})) - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) - - def test_shard_project_has_sharding_enabled_and_single_shards(self): + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) + self._assert_passes(host, spec_obj, True) + + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + def test_shard_project_has_sharding_enabled_and_single_shards( + self, get_by_uuid): + get_by_uuid.return_value = self.fake_build_req self.filt_cls._PROJECT_TAG_CACHE['baz'] = ['sharding_enabled', 'vc-a-1'] aggs = [objects.Aggregate(id=1, name='some-az-a', hosts=['host1']), @@ -198,16 +260,215 @@ def test_shard_project_has_sharding_enabled_and_single_shards(self): host = fakes.FakeHostState('host1', 'compute', {'aggregates': aggs}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='baz', - flavor=objects.Flavor(extra_specs={})) - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) + self._assert_passes(host, spec_obj, True) + + @mock.patch('nova.objects.AggregateList.get_all') + @mock.patch('nova.context.scatter_gather_skip_cell0') + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + @mock.patch('nova.context.get_admin_context') + def test_same_shard_for_kubernikus_cluster(self, get_context, + get_by_uuid, + gather_host, + get_aggrs): + kks_cluster = 'kubernikus:kluster-example' + build_req = objects.BuildRequest() + build_req.tags = objects.TagList(objects=[ + objects.Tag(tag=kks_cluster) + ]) + build_req.instance = self.fake_instance + get_by_uuid.return_value = build_req + + result = self._filter_k8s_hosts(get_context, + gather_host, + get_aggrs) + + gather_host.assert_called_once_with( + get_context.return_value, + main_db_api.get_k8s_hosts_by_instances_tag, + 'kubernikus:kluster-example', + filters={'hv_type': 'VMware vCenter Server', + 'availability_zone': 'az-2'}) + + self.assertEqual(2, len(result)) + self.assertEqual(result[0].host, 'host4') + self.assertEqual(result[1].host, 'host5') + + @mock.patch('nova.objects.AggregateList.get_all') + @mock.patch('nova.context.scatter_gather_skip_cell0') + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + @mock.patch('nova.context.get_admin_context') + def test_same_shard_for_gardener_cluster(self, get_context, + get_by_uuid, + gather_host, + get_aggrs): + gardener_cluster = 'kubernetes.io-cluster-shoot--garden--testCluster' + new_instance = fake_instance.fake_instance_obj( + get_context.return_value, + expected_attrs=['metadata'], + metadata={gardener_cluster: '1'}, + uuid=self.fake_instance.uuid) + build_req = objects.BuildRequest() + build_req.instance = new_instance + build_req.tags = objects.TagList() + get_by_uuid.return_value = build_req + + result = self._filter_k8s_hosts(get_context, + gather_host, + get_aggrs) + + gather_host.assert_called_once_with( + get_context.return_value, + main_db_api.get_k8s_hosts_by_instances_metadata, + gardener_cluster, '1', + filters={'hv_type': 'VMware vCenter Server', + 'availability_zone': 'az-2'}) + + self.assertEqual(2, len(result)) + self.assertEqual(result[0].host, 'host4') + self.assertEqual(result[1].host, 'host5') + + @mock.patch('nova.objects.AggregateList.get_all') + @mock.patch('nova.context.scatter_gather_skip_cell0') + @mock.patch('nova.objects.Instance.get_by_uuid') + @mock.patch('nova.context.get_admin_context') + def test_same_shard_for_nonbuild_requests(self, get_context, + get_by_uuid, + gather_host, + get_aggrs): + gardener_cluster = 'kubernetes.io-cluster-shoot--garden--testCluster' + new_instance = fake_instance.fake_instance_obj( + get_context.return_value, + expected_attrs=['metadata'], + metadata={gardener_cluster: '1'}) + get_by_uuid.return_value = new_instance + + result = self._filter_k8s_hosts( + get_context, gather_host, get_aggrs, + scheduler_hints={'_nova_check_type': ['live_migrate']}) + + gather_host.assert_called_once_with( + get_context.return_value, + main_db_api.get_k8s_hosts_by_instances_metadata, + gardener_cluster, '1', + filters={'hv_type': 'VMware vCenter Server', + 'availability_zone': 'az-2'}) + + self.assertEqual(2, len(result)) + self.assertEqual(result[0].host, 'host4') + self.assertEqual(result[1].host, 'host5') + + def _filter_k8s_hosts(self, get_context, gather_host, get_aggrs, + **request_spec): + """Given a K8S cluster that spans across 3 shards + (vc-a-0, vc-b-0, vc-b-1) and 2 availability zones (az-1, az-2) + where the most k8s hosts are in the vc-b-1 shard. When there is + a RequestSpec for 'az-2', then the hosts in 'vc-b-1' shard must + be returned, since it's the dominant shard. + """ + gather_host.return_value = {'cell1': [ + ('host3', 4), ('host4', 2), ('host5', 3) + ]} + + self.filt_cls._PROJECT_TAG_CACHE['foo'] = ['sharding_enabled', + 'vc-a-1'] + agg1 = objects.Aggregate(id=1, name='vc-a-0', hosts=['host1']) + agg2 = objects.Aggregate(id=2, name='vc-b-0', hosts=['host2', 'host3']) + agg3 = objects.Aggregate(id=3, name='vc-b-1', hosts=['host4', 'host5']) + + get_aggrs.return_value = [agg1, agg2, agg3] + + host1 = fakes.FakeHostState('host1', 'compute', + {'aggregates': [agg1]}) + host2 = fakes.FakeHostState('host2', 'compute', + {'aggregates': [agg2]}) + host3 = fakes.FakeHostState('host3', 'compute', + {'aggregates': [agg2]}) + host4 = fakes.FakeHostState('host4', 'compute', + {'aggregates': [agg3]}) + host5 = fakes.FakeHostState('host5', 'compute', + {'aggregates': [agg3]}) + + spec_obj = objects.RequestSpec( + context=get_context.return_value, project_id='foo', + availability_zone='az-2', + instance_uuid=self.fake_instance.uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'], + name='m1'), + **request_spec) + + return list(self.filt_cls.filter_all( + [host1, host2, host3, host4, host5], spec_obj)) + + @mock.patch('nova.objects.AggregateList.get_all') + @mock.patch('nova.context.scatter_gather_skip_cell0') + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') + @mock.patch('nova.context.get_admin_context') + def test_k8s_bypass_hana_flavors(self, get_context, + get_by_uuid, + gather_host, + get_aggrs): + gardener_cluster = 'kubernetes.io-cluster-shoot--garden--testCluster' + hana_flavor = fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'], + id=1, name='hana_flavor1', memory_mb=256, vcpus=1, root_gb=1) + new_instance = fake_instance.fake_instance_obj( + get_context.return_value, + flavor=hana_flavor, + expected_attrs=['metadata'], + metadata={gardener_cluster: '1'}) + build_req = objects.BuildRequest() + build_req.instance = new_instance + build_req.tags = objects.TagList() + + get_by_uuid.return_value = build_req + + self.filt_cls._PROJECT_TAG_CACHE['baz'] = ['sharding_enabled', + 'vc-a-1'] + agg1 = objects.Aggregate(id=1, name='vc-a-0', hosts=['host1']) + hana_agg = objects.Aggregate(id=1, name='vc-b-0', + hosts=['host2', 'host3']) + + host1 = fakes.FakeHostState('host1', 'compute', + {'aggregates': [agg1]}) + host2 = fakes.FakeHostState('host2', 'compute', + {'aggregates': [hana_agg]}) + host3 = fakes.FakeHostState('host3', 'compute', + {'aggregates': [hana_agg]}) + get_aggrs.return_value = [agg1, hana_agg] + spec_obj = objects.RequestSpec( + context=get_context.return_value, project_id='foo', + availability_zone='az-1', + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'], + name='hana_flavor1')) + + result = list(self.filt_cls.filter_all([host1, host2, host3], + spec_obj)) + + gather_host.assert_not_called() + self.assertEqual(3, len(result)) + self.assertEqual(result[0].host, 'host1') + self.assertEqual(result[1].host, 'host2') + self.assertEqual(result[2].host, 'host3') + + @mock.patch('nova.objects.BuildRequest.get_by_instance_uuid') @mock.patch('nova.scheduler.filters.shard_filter.LOG') @mock.patch('nova.scheduler.filters.utils.aggregate_metadata_get_by_host') - def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock): + def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock, + get_by_uuid): + get_by_uuid.return_value = self.fake_build_req host = fakes.FakeHostState('host1', 'compute', {}) spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', - flavor=objects.Flavor(extra_specs={})) + instance_uuid=self.fake_build_req.instance_uuid, + flavor=fake_flavor.fake_flavor_obj( + mock.sentinel.ctx, expected_attrs=['extra_specs'])) agg_mock.return_value = {} @@ -215,7 +476,7 @@ def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock): log_mock.debug = mock.Mock() log_mock.error = mock.Mock() host.hypervisor_type = 'ironic' - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) log_mock.debug.assert_called_once_with(mock.ANY, mock.ANY) log_mock.error.assert_not_called() @@ -223,14 +484,21 @@ def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock): log_mock.debug = mock.Mock() log_mock.error = mock.Mock() host.hypervisor_type = 'Some HV' - self.assertFalse(self.filt_cls.host_passes(host, spec_obj)) + self._assert_passes(host, spec_obj, False) log_mock.error.assert_called_once_with(mock.ANY, mock.ANY) log_mock.debug.assert_not_called() @mock.patch('nova.scheduler.utils.is_non_vmware_spec', return_value=True) def test_non_vmware_spec(self, mock_is_non_vmware_spec): - host = mock.sentinel.host + host1 = mock.sentinel.host1 + host2 = mock.sentinel.host2 spec_obj = mock.sentinel.spec_obj - self.assertTrue(self.filt_cls.host_passes(host, spec_obj)) + result = list(self.filt_cls.filter_all([host1, host2], spec_obj)) + + self.assertEqual([host1, host2], result) mock_is_non_vmware_spec.assert_called_once_with(spec_obj) + + def _assert_passes(self, host, spec_obj, passes): + result = bool(list(self.filt_cls.filter_all([host], spec_obj))) + self.assertEqual(passes, result) From f1ae593eb4692542a96c58996d731d4213bf8035 Mon Sep 17 00:00:00 2001 From: Marius Leustean Date: Mon, 25 Sep 2023 20:09:19 +0300 Subject: [PATCH 2/2] Add validate_instance_group_policy to the virt driver In VMWare we need to handle race conditions when spawning k8s instances in parallel while building a new cluster. Similar to how server groups are validated prior to spawning the VM on the compute host, we add a new method on the driver `validate_instance_group_policy` that checks driver-specific grouping policy (in this case, the K8S shard for the instance) Change-Id: I04151875fae44b72be52127e3b160f7f95abfb9e --- nova/compute/manager.py | 11 +++ nova/db/main/api.py | 5 + nova/objects/compute_node.py | 11 +++ nova/scheduler/filters/shard_filter.py | 68 ++------------ nova/tests/unit/objects/test_objects.py | 2 +- .../scheduler/filters/test_shard_filter.py | 20 +++- nova/virt/driver.py | 8 ++ nova/virt/vmwareapi/driver.py | 3 + nova/virt/vmwareapi/shard_util.py | 93 +++++++++++++++++++ nova/virt/vmwareapi/vmops.py | 23 +++++ 10 files changed, 178 insertions(+), 66 deletions(-) create mode 100644 nova/virt/vmwareapi/shard_util.py diff --git a/nova/compute/manager.py b/nova/compute/manager.py index e89199cd4bf..c687fe691b9 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -1733,6 +1733,15 @@ def _do_validation(context, instance, group): _do_validation(context, instance, group) + def _validate_driver_instance_group_policy(self, context, instance): + lock_id = "driver-instance-group-validation-%s" % instance.uuid + + @utils.synchronized(lock_id) + def _do_validation(context, instance): + self.driver.validate_instance_group_policy(context, instance) + + _do_validation(context, instance) + def _log_original_error(self, exc_info, instance_uuid): LOG.error('Error: %s', exc_info[1], instance_uuid=instance_uuid, exc_info=exc_info) @@ -2407,6 +2416,8 @@ def _build_and_run_instance(self, context, instance, image, injected_files, # the host is set on the instance. self._validate_instance_group_policy(context, instance, scheduler_hints) + self._validate_driver_instance_group_policy(context, + instance) image_meta = objects.ImageMeta.from_dict(image) with self._build_resources(context, instance, diff --git a/nova/db/main/api.py b/nova/db/main/api.py index ac886f736d6..fa8485a1603 100644 --- a/nova/db/main/api.py +++ b/nova/db/main/api.py @@ -2159,6 +2159,11 @@ def _handle_k8s_hosts_query_filters(query, filters=None): query = query.filter( models.Instance.availability_zone == availability_zone) + skip_instance_uuid = filters.get('skip_instance_uuid') + if skip_instance_uuid: + query.filter( + models.Instance.uuid != skip_instance_uuid) + return query diff --git a/nova/objects/compute_node.py b/nova/objects/compute_node.py index 62cf9d686b5..cf3a778100a 100644 --- a/nova/objects/compute_node.py +++ b/nova/objects/compute_node.py @@ -506,6 +506,17 @@ def get_by_hypervisor_type(cls, context, hv_type): return base.obj_make_list(context, cls(context), objects.ComputeNode, db_computes) + @base.remotable_classmethod + def get_k8s_hosts_by_instances_metadata(cls, context, meta_key, meta_value, + filters=None): + return db.get_k8s_hosts_by_instances_metadata( + context, meta_key, meta_value, filters=filters) + + @base.remotable_classmethod + def get_k8s_hosts_by_instances_tag(cls, context, tag, filters=None): + return db.get_k8s_hosts_by_instances_tag( + context, tag, filters=filters) + def _get_node_empty_ratio(context, max_count): """Query the DB for non-deleted compute_nodes with 0.0/None alloc ratios diff --git a/nova/scheduler/filters/shard_filter.py b/nova/scheduler/filters/shard_filter.py index bf024cc4917..26efbbe07eb 100644 --- a/nova/scheduler/filters/shard_filter.py +++ b/nova/scheduler/filters/shard_filter.py @@ -12,21 +12,17 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from collections import defaultdict - from oslo_log import log as logging import nova.conf from nova import context as nova_context -from nova.db.main import api as main_db_api -from nova import exception -from nova.objects.aggregate import AggregateList from nova.objects.build_request import BuildRequest from nova.objects.instance import Instance from nova.scheduler import filters from nova.scheduler.mixins import ProjectTagMixin from nova.scheduler import utils from nova import utils as nova_utils +from nova.virt.vmwareapi import shard_util LOG = logging.getLogger(__name__) @@ -97,63 +93,13 @@ def _get_metadata(): "for the uuid %s", spec_obj.instance_uuid) return - kks_tag = next((t.tag for t in _get_tags() - if t.tag.startswith(KKS_PREFIX)), None) - gardener_meta = None - if not kks_tag: - gardener_meta = \ - {k: v for k, v in _get_metadata().items() - if k.startswith(GARDENER_PREFIX)} - - if not kks_tag and not gardener_meta: - return None - - q_filters = {'hv_type': VMWARE_HV_TYPE} - if spec_obj.availability_zone: - q_filters['availability_zone'] = spec_obj.availability_zone - - results = None - if kks_tag: - results = nova_context.scatter_gather_skip_cell0( - elevated, main_db_api.get_k8s_hosts_by_instances_tag, - kks_tag, filters=q_filters) - elif gardener_meta: - (meta_key, meta_value) = next(iter(gardener_meta.items())) - results = nova_context.scatter_gather_skip_cell0( - elevated, main_db_api.get_k8s_hosts_by_instances_metadata, - meta_key, meta_value, filters=q_filters) - - if not results: - return None - - # hosts with count of instances from this K8S cluster - # {host: } - k8s_hosts = defaultdict(lambda: 0) - - for cell_uuid, cell_result in results.items(): - if nova_context.is_cell_failure_sentinel(cell_result): - raise exception.NovaException( - "Unable to schedule the K8S instance because " - "cell %s is not responding." % cell_uuid) - cell_hosts = dict(cell_result) - for h, c in cell_hosts.items(): - k8s_hosts[h] += c + k8s_shard_aggrs = shard_util.get_sorted_k8s_shard_aggregates( + elevated, _get_metadata(), _get_tags(), spec_obj.availability_zone) - if not k8s_hosts: + if not k8s_shard_aggrs: return None - all_shard_aggrs = [agg for agg in AggregateList.get_all(elevated) - if agg.name.startswith(self._SHARD_PREFIX)] - if not all_shard_aggrs: - return None - - shard_aggr = sorted( - all_shard_aggrs, - reverse=True, - key=lambda aggr: sum(i for h, i in k8s_hosts.items() - if h in aggr.hosts))[0] - - return shard_aggr.name + return k8s_shard_aggrs[0].name def filter_all(self, filter_obj_list, spec_obj): # Only VMware @@ -213,8 +159,8 @@ def _host_passes(self, host_state, spec_obj, k8s_shard): if k8s_shard: if k8s_shard not in host_shard_names: - LOG.debug("%(host_state)s is not part of the requested " - "K8S cluster shard '%(k8s_shard)s'", + LOG.debug("%(host_state)s is not part of the K8S " + "cluster's shard '%(k8s_shard)s'", {'host_state': host_state, 'k8s_shard': k8s_shard}) return False diff --git a/nova/tests/unit/objects/test_objects.py b/nova/tests/unit/objects/test_objects.py index 063e8d50810..4b1bdc404b2 100644 --- a/nova/tests/unit/objects/test_objects.py +++ b/nova/tests/unit/objects/test_objects.py @@ -1055,7 +1055,7 @@ def obj_name(cls): 'CellMapping': '1.1-5d652928000a5bc369d79d5bde7e497d', 'CellMappingList': '1.1-496ef79bb2ab41041fff8bcb57996352', 'ComputeNode': '1.19-af6bd29a6c3b225da436a0d8487096f2', - 'ComputeNodeList': '1.17-52f3b0962b1c86b98590144463ebb192', + 'ComputeNodeList': '1.17-bb54e3fd5415be274c5515577acafe3d', 'ConsoleAuthToken': '1.1-8da320fb065080eb4d3c2e5c59f8bf52', 'CpuDiagnostics': '1.0-d256f2e442d1b837735fd17dfe8e3d47', 'Destination': '1.4-3b440d29459e2c98987ad5b25ad1cb2c', diff --git a/nova/tests/unit/scheduler/filters/test_shard_filter.py b/nova/tests/unit/scheduler/filters/test_shard_filter.py index 4bb4a4de8c9..827906c2f63 100644 --- a/nova/tests/unit/scheduler/filters/test_shard_filter.py +++ b/nova/tests/unit/scheduler/filters/test_shard_filter.py @@ -16,7 +16,6 @@ import mock -from nova.db.main import api as main_db_api from nova import objects from nova.scheduler.filters import shard_filter from nova import test @@ -84,6 +83,7 @@ def test_shard_baremetal_passes(self, agg_mock, get_by_uuid): spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'], extra_specs=extra_specs)) @@ -102,6 +102,7 @@ def test_shard_project_not_found(self, agg_mock, mock_update_cache, spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='bar', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) self._assert_passes(host, spec_obj, False) @@ -116,6 +117,7 @@ def test_shard_project_no_shards(self, agg_mock, get_by_uuid): spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) @@ -130,6 +132,7 @@ def test_shard_host_no_shard_aggregate(self, agg_mock, get_by_uuid): spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) @@ -144,6 +147,7 @@ def test_shard_host_no_shards_in_aggregate(self, get_by_uuid): spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) @@ -158,6 +162,7 @@ def test_shard_project_shard_match_host_shard(self, get_by_uuid): spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) @@ -172,6 +177,7 @@ def test_shard_project_shard_do_not_match_host_shard(self, get_by_uuid): spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) @@ -186,6 +192,7 @@ def test_shard_project_has_multiple_shards_per_az(self, get_by_uuid): spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) @@ -205,6 +212,7 @@ def test_shard_project_has_multiple_shards_per_az_resize_same_shard( spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs']), scheduler_hints=dict(_nova_check_type=['resize'], @@ -227,6 +235,7 @@ def test_shard_project_has_multiple_shards_per_az_resize_other_shard( flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs']), instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, scheduler_hints=dict(_nova_check_type=['resize'], source_host=['host2'])) @@ -245,6 +254,7 @@ def test_shard_project_has_sharding_enabled_any_host_passes( spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='baz', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) self._assert_passes(host, spec_obj, True) @@ -261,6 +271,7 @@ def test_shard_project_has_sharding_enabled_and_single_shards( spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='baz', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) self._assert_passes(host, spec_obj, True) @@ -287,7 +298,7 @@ def test_same_shard_for_kubernikus_cluster(self, get_context, gather_host.assert_called_once_with( get_context.return_value, - main_db_api.get_k8s_hosts_by_instances_tag, + objects.ComputeNodeList.get_k8s_hosts_by_instances_tag, 'kubernikus:kluster-example', filters={'hv_type': 'VMware vCenter Server', 'availability_zone': 'az-2'}) @@ -321,7 +332,7 @@ def test_same_shard_for_gardener_cluster(self, get_context, gather_host.assert_called_once_with( get_context.return_value, - main_db_api.get_k8s_hosts_by_instances_metadata, + objects.ComputeNodeList.get_k8s_hosts_by_instances_metadata, gardener_cluster, '1', filters={'hv_type': 'VMware vCenter Server', 'availability_zone': 'az-2'}) @@ -351,7 +362,7 @@ def test_same_shard_for_nonbuild_requests(self, get_context, gather_host.assert_called_once_with( get_context.return_value, - main_db_api.get_k8s_hosts_by_instances_metadata, + objects.ComputeNodeList.get_k8s_hosts_by_instances_metadata, gardener_cluster, '1', filters={'hv_type': 'VMware vCenter Server', 'availability_zone': 'az-2'}) @@ -467,6 +478,7 @@ def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock, spec_obj = objects.RequestSpec( context=mock.sentinel.ctx, project_id='foo', instance_uuid=self.fake_build_req.instance_uuid, + availability_zone=None, flavor=fake_flavor.fake_flavor_obj( mock.sentinel.ctx, expected_attrs=['extra_specs'])) diff --git a/nova/virt/driver.py b/nova/virt/driver.py index b11bb2a98d8..eaf1f6de27d 100644 --- a/nova/virt/driver.py +++ b/nova/virt/driver.py @@ -1807,6 +1807,14 @@ def in_cluster_vmotion(self, context, instance, host_moref_value): """ raise NotImplementedError() + def validate_instance_group_policy(self, context, instance): + """Validates that the instance meets driver-specific grouping policy + + The driver can raise exception.RescheduledException to reject and + trigger rescheduling of the instance to a different host. + """ + pass + def load_compute_driver(virtapi, compute_driver=None): """Load a compute driver module. diff --git a/nova/virt/vmwareapi/driver.py b/nova/virt/vmwareapi/driver.py index b7ca867f50b..165dab8578a 100644 --- a/nova/virt/vmwareapi/driver.py +++ b/nova/virt/vmwareapi/driver.py @@ -1319,3 +1319,6 @@ def in_cluster_vmotion(self, context, instance, host_moref_value): vim_util.get_moref_value(current_host_ref), vim_util.get_moref_value(host_ref), instance=instance) + + def validate_instance_group_policy(self, context, instance): + self._vmops._check_k8s_shard(instance) diff --git a/nova/virt/vmwareapi/shard_util.py b/nova/virt/vmwareapi/shard_util.py new file mode 100644 index 00000000000..fbe6e523ed2 --- /dev/null +++ b/nova/virt/vmwareapi/shard_util.py @@ -0,0 +1,93 @@ +# Copyright (c) 2023 SAP SE +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from collections import defaultdict + +from nova import context as nova_context +from nova import exception +from nova.objects.aggregate import AggregateList +from nova.objects.compute_node import ComputeNodeList + +GARDENER_PREFIX = "kubernetes.io-cluster-" +KKS_PREFIX = "kubernikus:kluster" +VMWARE_HV_TYPE = 'VMware vCenter Server' +SHARD_PREFIX = 'vc-' + + +def get_sorted_k8s_shard_aggregates(context, metadata, tags, availability_zone, + skip_instance_uuid=None): + """Returns the shards of a K8S cluster sorted by the instances count. + + The K8S cluster is determined by Instance's metadata or tags. + Returns None if the cluster is new (first instance is being spawned there) + or if the K8S metadata/tags are not set. + """ + kks_tag = None + gardener_meta = None + no_ret = None + if tags: + kks_tag = next((t.tag for t in tags + if t.tag.startswith(KKS_PREFIX)), None) + if not kks_tag and metadata: + gardener_meta = \ + {k: v for k, v in metadata.items() + if k.startswith(GARDENER_PREFIX)} + + if not kks_tag and not gardener_meta: + return no_ret + + q_filters = {'hv_type': VMWARE_HV_TYPE} + if availability_zone: + q_filters['availability_zone'] = availability_zone + if skip_instance_uuid: + q_filters['skip_instance_uuid'] = skip_instance_uuid + + results = None + if kks_tag: + results = nova_context.scatter_gather_skip_cell0( + context, ComputeNodeList.get_k8s_hosts_by_instances_tag, + kks_tag, filters=q_filters) + elif gardener_meta: + (meta_key, meta_value) = next(iter(gardener_meta.items())) + results = nova_context.scatter_gather_skip_cell0( + context, ComputeNodeList.get_k8s_hosts_by_instances_metadata, + meta_key, meta_value, filters=q_filters) + + if not results: + return no_ret + + # hosts with count of instances from this K8S cluster + # {host: } + k8s_hosts = defaultdict(lambda: 0) + + for cell_uuid, cell_result in results.items(): + if nova_context.is_cell_failure_sentinel(cell_result): + raise exception.NovaException( + "Unable to schedule the K8S instance because " + "cell %s is not responding." % cell_uuid) + cell_hosts = dict(cell_result) + for h, c in cell_hosts.items(): + k8s_hosts[h] += c + + if not k8s_hosts: + return no_ret + + all_shard_aggrs = [agg for agg in AggregateList.get_all(context) + if agg.name.startswith(SHARD_PREFIX)] + + return sorted( + all_shard_aggrs, + reverse=True, + key=lambda aggr: sum(i for h, i in k8s_hosts.items() + if h in aggr.hosts)) diff --git a/nova/virt/vmwareapi/vmops.py b/nova/virt/vmwareapi/vmops.py index eea0310cc32..60981076646 100644 --- a/nova/virt/vmwareapi/vmops.py +++ b/nova/virt/vmwareapi/vmops.py @@ -74,6 +74,7 @@ from nova.virt.vmwareapi import imagecache from nova.virt.vmwareapi import images from nova.virt.vmwareapi.rpc import VmwareRpcApi +from nova.virt.vmwareapi import shard_util from nova.virt.vmwareapi import special_spawning from nova.virt.vmwareapi import vif as vmwarevif from nova.virt.vmwareapi import vim_util @@ -1358,6 +1359,28 @@ def prepare_for_spawn(self, instance): raise exception.InstanceUnacceptable(instance_id=instance.uuid, reason=reason) + def _check_k8s_shard(self, instance): + """Handles race condition when spawning K8S instances in parallel. + + If the instance is part of a K8S cluster, ensures that this host + is part of any shard bound to that cluster, otherwise we should + reschedule the instance. + """ + k8s_shard_aggrs = shard_util.get_sorted_k8s_shard_aggregates( + nova_context.get_admin_context(), instance.metadata, instance.tags, + instance.availability_zone, skip_instance_uuid=instance.uuid) + + if k8s_shard_aggrs: + matches = any(self._compute_host in aggr.hosts + for aggr in k8s_shard_aggrs) + if not matches: + msg = ("Host %(host)s rejected K8S instance %(instance_uuid)s " + "because the K8S cluster is not part to this shard." + % ({"host": self._compute_host, + "instance_uuid": instance.uuid})) + raise exception.RescheduledException( + instance_uuid=instance.uuid, reason=msg) + def spawn(self, context, instance, image_meta, injected_files, admin_password, network_info, block_device_info=None):