From 02994c5c1199ea7efb22be0dc82f39116b60c4e2 Mon Sep 17 00:00:00 2001 From: mhorky Date: Mon, 30 Oct 2023 11:42:56 +0100 Subject: [PATCH] RHEL-9435: Get AWS metadata via IMDSv2 * Card ID: RHEL-9435 Even though both versions are officially supported, the AWS teams are tracking connections making v1 requests as WARNINGs [0]. This patch switches the order to try to use IMDSv2 first. [0]: https://github.com/aws/aws-imds-packet-analyzer Cherry-picked from 0eab79b. --- src/cloud_what/providers/aws.py | 23 ++++++---------- test/test_auto_registration.py | 49 +++++++++++++++++---------------- 2 files changed, 34 insertions(+), 38 deletions(-) diff --git a/src/cloud_what/providers/aws.py b/src/cloud_what/providers/aws.py index 332c8d1a89..32e1a229cb 100644 --- a/src/cloud_what/providers/aws.py +++ b/src/cloud_what/providers/aws.py @@ -285,25 +285,20 @@ def _get_metadata_from_server_imds_v2(self) -> Union[str, None]: def _get_metadata_from_server(self) -> Union[str, None]: """ - Try to get metadata from server as is described in this document: + Try to get metadata from server as described in these documents: + - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html + - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-metadata-v2-how-it-works.html - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html + IMDSv2 requires two HTTP requests (first requests a token, second obtains the metadata). + If that fails, try to fall back to IDMSv1 (which is older and can be disabled in the AWS console). - It is possible to use two versions. We will try to use version IMDSv1 first (this version requires - only one HTTP request), when the usage of IMDSv1 is forbidden, then we will try to use IMDSv2 version. - The version requires two requests (get session TOKEN and then get own metadata using token) :return: String with metadata or None """ + metadata = self._get_metadata_from_server_imds_v2() + if metadata is not None: + return metadata - if self._token_exists() is False: - # First try to get metadata using IMDSv1 - metadata = self._get_metadata_from_server_imds_v1() - - if metadata is not None: - return metadata - - # When it wasn't possible to get metadata using IMDSv1, then try to get metadata using IMDSv2 - return self._get_metadata_from_server_imds_v2() + return self._get_metadata_from_server_imds_v1() def _get_signature_from_cache_file(self) -> None: """ diff --git a/test/test_auto_registration.py b/test/test_auto_registration.py index 8bcf93f2ab..983ea2ea4b 100644 --- a/test/test_auto_registration.py +++ b/test/test_auto_registration.py @@ -20,7 +20,7 @@ import unittest import base64 -from mock import patch, Mock +from unittest.mock import Mock from subscription_manager.scripts.rhsmcertd_worker import _collect_cloud_info from .rhsmlib_test.test_cloud_facts import AWS_METADATA @@ -95,11 +95,16 @@ def send_only_imds_v2_is_supported(request, *args, **kwargs): return mock_result -def mock_prepare_request(request): - return request - - class TestAutomaticRegistration(unittest.TestCase): + def setUp(self): + _ = aws.AWSCloudProvider({}) + aws.AWSCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None) + aws.AWSCloudProvider._instance._get_token_from_cache_file = Mock(return_value=None) + aws.AWSCloudProvider._instance._write_token_to_cache_file = Mock() + + _ = azure.AzureCloudProvider({}) + azure.AzureCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None) + azure.AzureCloudProvider._instance.get_api_versions = Mock(return_value="") def tearDown(self): aws.AWSCloudProvider._instance = None @@ -109,17 +114,16 @@ def tearDown(self): gcp.GCPCloudProvider._instance = None gcp.GCPCloudProvider._initialized = False - @patch('cloud_what.providers.aws.requests.Session') - def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class): + def test_collect_cloud_info_one_cloud_provider_detected(self): """ Test the case, when we try to collect cloud info only for one detected cloud provider """ mock_session = Mock() - mock_session.send = send_only_imds_v2_is_supported - mock_session.prepare_request = Mock(side_effect=mock_prepare_request) - mock_session.hooks = {'response': []} - mock_session_class.return_value = mock_session + mock_session.send = send_only_imds_v2_is_supported() + mock_session.prepare_request = Mock(side_effect=lambda request: request) + mock_session.hooks = {"response": []} + aws.AWSCloudProvider._instance._session = mock_session cloud_list = ['aws'] cloud_info = _collect_cloud_info(cloud_list, Mock()) @@ -134,26 +138,23 @@ def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class metadata = base64.b64decode(b64_metadata).decode('utf-8') self.assertEqual(metadata, AWS_METADATA) # Test signature - self.assertTrue('signature' in cloud_info) - b64_signature = cloud_info['signature'] - signature = base64.b64decode(b64_signature).decode('utf-8') - self.assertEqual( - signature, - '-----BEGIN PKCS7-----\n' + AWS_SIGNATURE + '\n-----END PKCS7-----' - ) + self.assertTrue("signature" in cloud_info) + b64_signature = cloud_info["signature"] + signature = base64.b64decode(b64_signature).decode("utf-8") + self.assertEqual(signature, "-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----") - @patch('cloud_what.providers.aws.requests.Session') - def test_collect_cloud_info_more_cloud_providers_detected(self, mock_session_class): + def test_collect_cloud_info_more_cloud_providers_detected(self): """ Test the case, when we try to collect cloud info only for more than one cloud providers, because more than one cloud providers were detected """ mock_session = Mock() - mock_session.send = send_only_imds_v2_is_supported - mock_session.prepare_request = Mock(side_effect=mock_prepare_request) - mock_session.hooks = {'response': []} - mock_session_class.return_value = mock_session + mock_session.send = send_only_imds_v2_is_supported() + mock_session.prepare_request = Mock(side_effect=lambda request: request) + mock_session.hooks = {"response": []} + aws.AWSCloudProvider._instance._session = mock_session + azure.AzureCloudProvider._instance._session = Mock() # More cloud providers detected cloud_list = ['azure', 'aws']