Skip to content

Commit

Permalink
RHEL-9435: Get AWS metadata via IMDSv2
Browse files Browse the repository at this point in the history
* Card ID: RHEL-9435

Even though both versions are officially supported, the AWS teams are
tracking connections making v1 requests as WARNINGs [0].

This patch switches the order to try to use IMDSv2 first.

[0]: https://github.com/aws/aws-imds-packet-analyzer

Cherry-picked from 0eab79b.
  • Loading branch information
m-horky committed Nov 1, 2023
1 parent 5b8c215 commit 02994c5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 38 deletions.
23 changes: 9 additions & 14 deletions src/cloud_what/providers/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,25 +285,20 @@ def _get_metadata_from_server_imds_v2(self) -> Union[str, None]:

def _get_metadata_from_server(self) -> Union[str, None]:
"""
Try to get metadata from server as is described in this document:
Try to get metadata from server as described in these documents:
- https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
- https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-metadata-v2-how-it-works.html
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
IMDSv2 requires two HTTP requests (first requests a token, second obtains the metadata).
If that fails, try to fall back to IDMSv1 (which is older and can be disabled in the AWS console).
It is possible to use two versions. We will try to use version IMDSv1 first (this version requires
only one HTTP request), when the usage of IMDSv1 is forbidden, then we will try to use IMDSv2 version.
The version requires two requests (get session TOKEN and then get own metadata using token)
:return: String with metadata or None
"""
metadata = self._get_metadata_from_server_imds_v2()
if metadata is not None:
return metadata

if self._token_exists() is False:
# First try to get metadata using IMDSv1
metadata = self._get_metadata_from_server_imds_v1()

if metadata is not None:
return metadata

# When it wasn't possible to get metadata using IMDSv1, then try to get metadata using IMDSv2
return self._get_metadata_from_server_imds_v2()
return self._get_metadata_from_server_imds_v1()

def _get_signature_from_cache_file(self) -> None:
"""
Expand Down
49 changes: 25 additions & 24 deletions test/test_auto_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import unittest
import base64
from mock import patch, Mock
from unittest.mock import Mock

from subscription_manager.scripts.rhsmcertd_worker import _collect_cloud_info
from .rhsmlib_test.test_cloud_facts import AWS_METADATA
Expand Down Expand Up @@ -95,11 +95,16 @@ def send_only_imds_v2_is_supported(request, *args, **kwargs):
return mock_result


def mock_prepare_request(request):
return request


class TestAutomaticRegistration(unittest.TestCase):
def setUp(self):
_ = aws.AWSCloudProvider({})
aws.AWSCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None)
aws.AWSCloudProvider._instance._get_token_from_cache_file = Mock(return_value=None)
aws.AWSCloudProvider._instance._write_token_to_cache_file = Mock()

_ = azure.AzureCloudProvider({})
azure.AzureCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None)
azure.AzureCloudProvider._instance.get_api_versions = Mock(return_value="")

def tearDown(self):
aws.AWSCloudProvider._instance = None
Expand All @@ -109,17 +114,16 @@ def tearDown(self):
gcp.GCPCloudProvider._instance = None
gcp.GCPCloudProvider._initialized = False

@patch('cloud_what.providers.aws.requests.Session')
def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class):
def test_collect_cloud_info_one_cloud_provider_detected(self):
"""
Test the case, when we try to collect cloud info only for
one detected cloud provider
"""
mock_session = Mock()
mock_session.send = send_only_imds_v2_is_supported
mock_session.prepare_request = Mock(side_effect=mock_prepare_request)
mock_session.hooks = {'response': []}
mock_session_class.return_value = mock_session
mock_session.send = send_only_imds_v2_is_supported()
mock_session.prepare_request = Mock(side_effect=lambda request: request)
mock_session.hooks = {"response": []}
aws.AWSCloudProvider._instance._session = mock_session

cloud_list = ['aws']
cloud_info = _collect_cloud_info(cloud_list, Mock())
Expand All @@ -134,26 +138,23 @@ def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class
metadata = base64.b64decode(b64_metadata).decode('utf-8')
self.assertEqual(metadata, AWS_METADATA)
# Test signature
self.assertTrue('signature' in cloud_info)
b64_signature = cloud_info['signature']
signature = base64.b64decode(b64_signature).decode('utf-8')
self.assertEqual(
signature,
'-----BEGIN PKCS7-----\n' + AWS_SIGNATURE + '\n-----END PKCS7-----'
)
self.assertTrue("signature" in cloud_info)
b64_signature = cloud_info["signature"]
signature = base64.b64decode(b64_signature).decode("utf-8")
self.assertEqual(signature, "-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----")

@patch('cloud_what.providers.aws.requests.Session')
def test_collect_cloud_info_more_cloud_providers_detected(self, mock_session_class):
def test_collect_cloud_info_more_cloud_providers_detected(self):
"""
Test the case, when we try to collect cloud info only for
more than one cloud providers, because more than one cloud
providers were detected
"""
mock_session = Mock()
mock_session.send = send_only_imds_v2_is_supported
mock_session.prepare_request = Mock(side_effect=mock_prepare_request)
mock_session.hooks = {'response': []}
mock_session_class.return_value = mock_session
mock_session.send = send_only_imds_v2_is_supported()
mock_session.prepare_request = Mock(side_effect=lambda request: request)
mock_session.hooks = {"response": []}
aws.AWSCloudProvider._instance._session = mock_session
azure.AzureCloudProvider._instance._session = Mock()

# More cloud providers detected
cloud_list = ['azure', 'aws']
Expand Down

0 comments on commit 02994c5

Please sign in to comment.