Skip to content

Commit

Permalink
Merge pull request #3352 from candlepin/mhorky/RHEL-9435_aws-imdsv2_1.28
Browse files Browse the repository at this point in the history
[1.28] RHEL-9435: Get AWS metadata via IMDSv2
  • Loading branch information
jirihnidek authored Nov 6, 2023
2 parents 5b8c215 + 944fd03 commit a67c5d6
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 79 deletions.
23 changes: 9 additions & 14 deletions src/cloud_what/providers/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,25 +285,20 @@ def _get_metadata_from_server_imds_v2(self) -> Union[str, None]:

def _get_metadata_from_server(self) -> Union[str, None]:
"""
Try to get metadata from server as is described in this document:
Try to get metadata from server as described in these documents:
- https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
- https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-metadata-v2-how-it-works.html
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
IMDSv2 requires two HTTP requests (first requests a token, second obtains the metadata).
If that fails, try to fall back to IDMSv1 (which is older and can be disabled in the AWS console).
It is possible to use two versions. We will try to use version IMDSv1 first (this version requires
only one HTTP request), when the usage of IMDSv1 is forbidden, then we will try to use IMDSv2 version.
The version requires two requests (get session TOKEN and then get own metadata using token)
:return: String with metadata or None
"""
metadata = self._get_metadata_from_server_imds_v2()
if metadata is not None:
return metadata

if self._token_exists() is False:
# First try to get metadata using IMDSv1
metadata = self._get_metadata_from_server_imds_v1()

if metadata is not None:
return metadata

# When it wasn't possible to get metadata using IMDSv1, then try to get metadata using IMDSv2
return self._get_metadata_from_server_imds_v2()
return self._get_metadata_from_server_imds_v1()

def _get_signature_from_cache_file(self) -> None:
"""
Expand Down
140 changes: 75 additions & 65 deletions test/test_auto_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import unittest
import base64
from mock import patch, Mock
from unittest.mock import Mock

from subscription_manager.scripts.rhsmcertd_worker import _collect_cloud_info
from .rhsmlib_test.test_cloud_facts import AWS_METADATA
Expand Down Expand Up @@ -49,57 +49,71 @@
AWS_TOKEN = "ABCDEFGHIJKLMNOPQRSTVWXYZabcdefghijklmnopqrstvwxyz0123=="


def send_only_imds_v2_is_supported(request, *args, **kwargs):
def send__aws_imdsv2_only(request, *args, **kwargs):
"""
Mock result, when we try to get metadata using GET method against
AWS metadata provider. This mock is for the case, when only IMDSv2
is supported by instance.
Mock result for metadata request on AWS where only IMDSv2 is supported.
This function should be used to replace function `requests.Session.send()`.
:param request: HTTP request
:return: Mock with result
:return: Mocked server result.
"""
mock_result = Mock()

if request.method == 'PUT':
if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_TOKEN_URL:
if 'X-aws-ec2-metadata-token-ttl-seconds' in request.headers:
mock_result.status_code = 200
mock_result.text = AWS_TOKEN
else:
mock_result.status_code = 400
mock_result.text = 'Error: TTL for token not specified'
else:
mock_result.status_code = 400
mock_result.text = 'Error: Invalid URL'
elif request.method == 'GET':
if 'X-aws-ec2-metadata-token' in request.headers.keys():
if request.headers['X-aws-ec2-metadata-token'] == AWS_TOKEN:
if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_METADATA_URL:
mock_result.status_code = 200
mock_result.text = AWS_METADATA
elif request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_SIGNATURE_URL:
mock_result.status_code = 200
mock_result.text = AWS_SIGNATURE
else:
mock_result.status_code = 400
mock_result.text = 'Error: Invalid URL'
else:
mock_result.status_code = 400
mock_result.text = 'Error: Invalid metadata token provided'
else:
mock_result.status_code = 400
mock_result.text = 'Error: IMDSv1 is not supported on this instance'
else:
mock_result.status_code = 400
mock_result.text = 'Error: not supported request method'

return mock_result


def mock_prepare_request(request):
return request
result = Mock()

if request.method == "PUT":
if request.url != aws.AWSCloudProvider.CLOUD_PROVIDER_TOKEN_URL:
result.status_code = 400
result.text = "Error: Invalid URL"
return result

if "X-aws-ec2-metadata-token-ttl-seconds" not in request.headers:
result.status_code = 400
result.text = "Error: TTL for token not specified"
return result

result.status_code = 200
result.text = AWS_TOKEN
return result

if request.method == "GET":
if "X-aws-ec2-metadata-token" not in request.headers.keys():
result.status_code = 400
result.text = "Error: IMDSv1 is not supported on this instance"
return result

if request.headers["X-aws-ec2-metadata-token"] != AWS_TOKEN:
result.status_code = 400
result.text = "Error: Invalid metadata token provided"
return result

if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_METADATA_URL:
result.status_code = 200
result.text = AWS_METADATA
return result

if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_SIGNATURE_URL:
result.status_code = 200
result.text = AWS_SIGNATURE
return result

result.status_code = 400
result.text = "Error: Invalid URL"
return result

result.status_code = 400
result.text = "Error: not supported request method"
return result


class TestAutomaticRegistration(unittest.TestCase):
def setUp(self):
_ = aws.AWSCloudProvider({})
aws.AWSCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None)
aws.AWSCloudProvider._instance._get_token_from_cache_file = Mock(return_value=None)
aws.AWSCloudProvider._instance._write_token_to_cache_file = Mock()

_ = azure.AzureCloudProvider({})
azure.AzureCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None)
azure.AzureCloudProvider._instance.get_api_versions = Mock(return_value="")

def tearDown(self):
aws.AWSCloudProvider._instance = None
Expand All @@ -109,17 +123,16 @@ def tearDown(self):
gcp.GCPCloudProvider._instance = None
gcp.GCPCloudProvider._initialized = False

@patch('cloud_what.providers.aws.requests.Session')
def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class):
def test_collect_cloud_info_one_cloud_provider_detected(self):
"""
Test the case, when we try to collect cloud info only for
one detected cloud provider
"""
mock_session = Mock()
mock_session.send = send_only_imds_v2_is_supported
mock_session.prepare_request = Mock(side_effect=mock_prepare_request)
mock_session.hooks = {'response': []}
mock_session_class.return_value = mock_session
mock_session.send = send__aws_imdsv2_only
mock_session.prepare_request = Mock(side_effect=lambda request: request)
mock_session.hooks = {"response": []}
aws.AWSCloudProvider._instance._session = mock_session

cloud_list = ['aws']
cloud_info = _collect_cloud_info(cloud_list, Mock())
Expand All @@ -134,26 +147,23 @@ def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class
metadata = base64.b64decode(b64_metadata).decode('utf-8')
self.assertEqual(metadata, AWS_METADATA)
# Test signature
self.assertTrue('signature' in cloud_info)
b64_signature = cloud_info['signature']
signature = base64.b64decode(b64_signature).decode('utf-8')
self.assertEqual(
signature,
'-----BEGIN PKCS7-----\n' + AWS_SIGNATURE + '\n-----END PKCS7-----'
)
self.assertTrue("signature" in cloud_info)
b64_signature = cloud_info["signature"]
signature = base64.b64decode(b64_signature).decode("utf-8")
self.assertEqual(signature, "-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----")

@patch('cloud_what.providers.aws.requests.Session')
def test_collect_cloud_info_more_cloud_providers_detected(self, mock_session_class):
def test_collect_cloud_info_more_cloud_providers_detected(self):
"""
Test the case, when we try to collect cloud info only for
more than one cloud providers, because more than one cloud
providers were detected
"""
mock_session = Mock()
mock_session.send = send_only_imds_v2_is_supported
mock_session.prepare_request = Mock(side_effect=mock_prepare_request)
mock_session.hooks = {'response': []}
mock_session_class.return_value = mock_session
mock_session.send = send__aws_imdsv2_only
mock_session.prepare_request = Mock(side_effect=lambda request: request)
mock_session.hooks = {"response": []}
aws.AWSCloudProvider._instance._session = mock_session
azure.AzureCloudProvider._instance._session = Mock()

# More cloud providers detected
cloud_list = ['azure', 'aws']
Expand Down

0 comments on commit a67c5d6

Please sign in to comment.