Skip to content

Commit

Permalink
Add validate_instance_group_policy to the virt driver
Browse files Browse the repository at this point in the history
In VMWare we need to handle race conditions when spawning k8s
instances in parallel while building a new cluster.
Similar to how server groups are validated prior to spawning the
VM on the compute host, we add a new method on the driver
`validate_instance_group_policy` that checks driver-specific
grouping policy (in this case, the K8S shard for the instance)

Change-Id: I04151875fae44b72be52127e3b160f7f95abfb9e
  • Loading branch information
leust committed Sep 25, 2023
1 parent 03cb813 commit 112e9f6
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 65 deletions.
11 changes: 11 additions & 0 deletions nova/compute/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1733,6 +1733,15 @@ def _do_validation(context, instance, group):

_do_validation(context, instance, group)

def _validate_driver_instance_group_policy(self, context, instance):
lock_id = "driver-instance-group-validation-%s" % instance.uuid

@utils.synchronized(lock_id)
def _do_validation(context, instance):
self.driver.validate_instance_group_policy(context, instance)

_do_validation(context, instance)

def _log_original_error(self, exc_info, instance_uuid):
LOG.error('Error: %s', exc_info[1], instance_uuid=instance_uuid,
exc_info=exc_info)
Expand Down Expand Up @@ -2407,6 +2416,8 @@ def _build_and_run_instance(self, context, instance, image, injected_files,
# the host is set on the instance.
self._validate_instance_group_policy(context, instance,
scheduler_hints)
self._validate_driver_instance_group_policy(context,
instance)
image_meta = objects.ImageMeta.from_dict(image)

with self._build_resources(context, instance,
Expand Down
5 changes: 5 additions & 0 deletions nova/db/main/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2159,6 +2159,11 @@ def _handle_k8s_hosts_query_filters(query, filters=None):
query = query.filter(
models.Instance.availability_zone == availability_zone)

skip_instance_uuid = filters.get('skip_instance_uuid')
if skip_instance_uuid:
query.filter(
models.Instance.uuid != skip_instance_uuid)

return query


Expand Down
11 changes: 11 additions & 0 deletions nova/objects/compute_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,17 @@ def get_by_hypervisor_type(cls, context, hv_type):
return base.obj_make_list(context, cls(context), objects.ComputeNode,
db_computes)

@base.remotable_classmethod
def get_k8s_hosts_by_instances_metadata(cls, context, meta_key, meta_value,
filters=None):
return db.get_k8s_hosts_by_instances_metadata(
context, meta_key, meta_value, filters=filters)

@base.remotable_classmethod
def get_k8s_hosts_by_instances_tag(cls, context, tag, filters=None):
return db.get_k8s_hosts_by_instances_tag(
context, tag, filters=filters)


def _get_node_empty_ratio(context, max_count):
"""Query the DB for non-deleted compute_nodes with 0.0/None alloc ratios
Expand Down
66 changes: 5 additions & 61 deletions nova/scheduler/filters/shard_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict

import time

from keystoneauth1 import exceptions as kse
Expand All @@ -22,22 +20,18 @@

import nova.conf
from nova import context as nova_context
from nova.db.main import api as main_db_api
from nova import exception
from nova.objects.aggregate import AggregateList
from nova.objects.build_request import BuildRequest
from nova.objects.instance import Instance
from nova.scheduler import filters
from nova.scheduler import utils
from nova import utils as nova_utils
from nova.virt.vmwareapi import shard_util

LOG = logging.getLogger(__name__)

CONF = nova.conf.CONF

_SERVICE_AUTH = None
GARDENER_PREFIX = "kubernetes.io-cluster-"
KKS_PREFIX = "kubernikus:kluster"
HANA_PREFIX = "hana_"
VMWARE_HV_TYPE = 'VMware vCenter Server'

Expand Down Expand Up @@ -165,63 +159,13 @@ def _get_metadata():
"for the uuid %s", spec_obj.instance_uuid)
return

kks_tag = next((t.tag for t in _get_tags()
if t.tag.startswith(KKS_PREFIX)), None)
gardener_meta = None
if not kks_tag:
gardener_meta = \
{k: v for k, v in _get_metadata().items()
if k.startswith(GARDENER_PREFIX)}

if not kks_tag and not gardener_meta:
return None

q_filters = {'hv_type': VMWARE_HV_TYPE}
if spec_obj.availability_zone:
q_filters['availability_zone'] = spec_obj.availability_zone

results = None
if kks_tag:
results = nova_context.scatter_gather_skip_cell0(
elevated, main_db_api.get_k8s_hosts_by_instances_tag,
kks_tag, filters=q_filters)
elif gardener_meta:
(meta_key, meta_value) = next(iter(gardener_meta.items()))
results = nova_context.scatter_gather_skip_cell0(
elevated, main_db_api.get_k8s_hosts_by_instances_metadata,
meta_key, meta_value, filters=q_filters)

if not results:
return None

# hosts with count of instances from this K8S cluster
# {host: <count>}
k8s_hosts = defaultdict(lambda: 0)

for cell_uuid, cell_result in results.items():
if nova_context.is_cell_failure_sentinel(cell_result):
raise exception.NovaException(
"Unable to schedule the K8S instance because "
"cell %s is not responding." % cell_uuid)
cell_hosts = dict(cell_result)
for h, c in cell_hosts.items():
k8s_hosts[h] += c
k8s_shard_aggrs = shard_util.get_sorted_k8s_shard_aggregates(
elevated, _get_metadata(), _get_tags(), spec_obj.availability_zone)

if not k8s_hosts:
if not k8s_shard_aggrs:
return None

all_shard_aggrs = [agg for agg in AggregateList.get_all(elevated)
if agg.name.startswith(self._SHARD_PREFIX)]
if not all_shard_aggrs:
return None

shard_aggr = sorted(
all_shard_aggrs,
reverse=True,
key=lambda aggr: sum(i for h, i in k8s_hosts.items()
if h in aggr.hosts))[0]

return shard_aggr.name
return k8s_shard_aggrs[0].name

def filter_all(self, filter_obj_list, spec_obj):
# Only VMware
Expand Down
20 changes: 16 additions & 4 deletions nova/tests/unit/scheduler/filters/test_shard_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import mock

from nova.db.main import api as main_db_api
from nova import objects
from nova.scheduler.filters import shard_filter
from nova import test
Expand Down Expand Up @@ -84,6 +83,7 @@ def test_shard_baremetal_passes(self, agg_mock, get_by_uuid):
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='foo',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs'],
extra_specs=extra_specs))
Expand All @@ -102,6 +102,7 @@ def test_shard_project_not_found(self, agg_mock, mock_update_cache,
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='bar',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))
self._assert_passes(host, spec_obj, False)
Expand All @@ -116,6 +117,7 @@ def test_shard_project_no_shards(self, agg_mock, get_by_uuid):
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='foo',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))

Expand All @@ -130,6 +132,7 @@ def test_shard_host_no_shard_aggregate(self, agg_mock, get_by_uuid):
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='foo',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))

Expand All @@ -144,6 +147,7 @@ def test_shard_host_no_shards_in_aggregate(self, get_by_uuid):
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='foo',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))

Expand All @@ -158,6 +162,7 @@ def test_shard_project_shard_match_host_shard(self, get_by_uuid):
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='foo',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))

Expand All @@ -172,6 +177,7 @@ def test_shard_project_shard_do_not_match_host_shard(self, get_by_uuid):
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='foo',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))

Expand All @@ -186,6 +192,7 @@ def test_shard_project_has_multiple_shards_per_az(self, get_by_uuid):
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='foo',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))

Expand All @@ -205,6 +212,7 @@ def test_shard_project_has_multiple_shards_per_az_resize_same_shard(
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='foo',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']),
scheduler_hints=dict(_nova_check_type=['resize'],
Expand All @@ -227,6 +235,7 @@ def test_shard_project_has_multiple_shards_per_az_resize_other_shard(
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']),
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
scheduler_hints=dict(_nova_check_type=['resize'],
source_host=['host2']))

Expand All @@ -245,6 +254,7 @@ def test_shard_project_has_sharding_enabled_any_host_passes(
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='baz',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))
self._assert_passes(host, spec_obj, True)
Expand All @@ -261,6 +271,7 @@ def test_shard_project_has_sharding_enabled_and_single_shards(
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='baz',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))
self._assert_passes(host, spec_obj, True)
Expand All @@ -287,7 +298,7 @@ def test_same_shard_for_kubernikus_cluster(self, get_context,

gather_host.assert_called_once_with(
get_context.return_value,
main_db_api.get_k8s_hosts_by_instances_tag,
objects.ComputeNodeList.get_k8s_hosts_by_instances_tag,
'kubernikus:kluster-example',
filters={'hv_type': 'VMware vCenter Server',
'availability_zone': 'az-2'})
Expand Down Expand Up @@ -321,7 +332,7 @@ def test_same_shard_for_gardener_cluster(self, get_context,

gather_host.assert_called_once_with(
get_context.return_value,
main_db_api.get_k8s_hosts_by_instances_metadata,
objects.ComputeNodeList.get_k8s_hosts_by_instances_metadata,
gardener_cluster, '1',
filters={'hv_type': 'VMware vCenter Server',
'availability_zone': 'az-2'})
Expand Down Expand Up @@ -351,7 +362,7 @@ def test_same_shard_for_nonbuild_requests(self, get_context,

gather_host.assert_called_once_with(
get_context.return_value,
main_db_api.get_k8s_hosts_by_instances_metadata,
objects.ComputeNodeList.get_k8s_hosts_by_instances_metadata,
gardener_cluster, '1',
filters={'hv_type': 'VMware vCenter Server',
'availability_zone': 'az-2'})
Expand Down Expand Up @@ -467,6 +478,7 @@ def test_log_level_for_missing_vc_aggregate(self, agg_mock, log_mock,
spec_obj = objects.RequestSpec(
context=mock.sentinel.ctx, project_id='foo',
instance_uuid=self.fake_build_req.instance_uuid,
availability_zone=None,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))

Expand Down
8 changes: 8 additions & 0 deletions nova/virt/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,14 @@ def sync_server_group(self, context, sg_uuid):
called if a customer changed a server-group for this host via API.
"""

def validate_instance_group_policy(self, context, instance):
"""Validates that the instance meets driver-specific grouping policy
The driver can raise exception.RescheduledException to reject and
trigger rescheduling of the instance to a different host.
"""
pass


def load_compute_driver(virtapi, compute_driver=None):
"""Load a compute driver module.
Expand Down
3 changes: 3 additions & 0 deletions nova/virt/vmwareapi/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1256,3 +1256,6 @@ def post_live_migration_at_destination(self, context, instance,
volumes = self._get_volume_mappings(context, instance)
LOG.debug("Fixing shadow vms %s", volumes, instance=instance)
self._volumeops.fixup_shadow_vms(instance, volumes)

def validate_instance_group_policy(self, context, instance):
self._vmops._check_k8s_shard(instance)
Loading

0 comments on commit 112e9f6

Please sign in to comment.