diff --git a/azext_iot/digitaltwins/common.py b/azext_iot/digitaltwins/common.py index b0a382a70..8b646cf5b 100644 --- a/azext_iot/digitaltwins/common.py +++ b/azext_iot/digitaltwins/common.py @@ -11,6 +11,9 @@ from enum import Enum +MAX_ADT_CREATE_RETRIES = 5 +ADT_CREATE_RETRY_AFTER = 60 + class ADTEndpointType(Enum): """ @@ -49,3 +52,12 @@ class ADTPublicNetworkAccessType(Enum): enabled = "Enabled" disabled = "Disabled" + + +class ProvisioningStateType(Enum): + """ + ARM poller provisioning states + """ + FINISHED = frozenset(['succeeded', 'canceled', 'failed']) + FAILED = frozenset(['canceled', 'failed']) + SUCCEEDED = frozenset(['succeeded']) diff --git a/azext_iot/digitaltwins/providers/resource.py b/azext_iot/digitaltwins/providers/resource.py index 958ae3287..8c675a851 100644 --- a/azext_iot/digitaltwins/providers/resource.py +++ b/azext_iot/digitaltwins/providers/resource.py @@ -6,6 +6,9 @@ from azext_iot.digitaltwins.common import ( ADTEndpointAuthType, ADTPublicNetworkAccessType, + ADT_CREATE_RETRY_AFTER, + MAX_ADT_CREATE_RETRIES, + ProvisioningStateType, ) from azext_iot.digitaltwins.providers import ( DigitalTwinsResourceManager, @@ -71,6 +74,25 @@ def create( long_running_operation_timeout=timeout, ) + def check_state(lro): + from time import sleep + instance = lro.resource().as_dict() + state = instance.get('provisioning_state', None) + retries = 0 + while (state.lower() not in ProvisioningStateType.FINISHED.value) and retries < MAX_ADT_CREATE_RETRIES: + retries += 1 + sleep(int(lro._response.headers.get('retry-after', ADT_CREATE_RETRY_AFTER))) + lro.update_status() + instance = lro.resource().as_dict() + state = instance.get('provisioning_state', None) + if state and state.lower() not in ProvisioningStateType.FINISHED.value: + logger.warning( + "The resource has been created and has not finished provisioning. Please monitor the status of " + "the Digital Twin instance using az dt show -n {} -g {}".format( + name, resource_group_name + ) + ) + def rbac_handler(lro): instance = lro.resource().as_dict() identity = instance.get("identity") @@ -96,6 +118,7 @@ def rbac_handler(lro): role_type=role_type, ) + create_or_update.add_done_callback(check_state) create_or_update.add_done_callback(rbac_handler) return create_or_update except CloudError as e: diff --git a/azext_iot/tests/digitaltwins/test_dt_resource_lifecycle_int.py b/azext_iot/tests/digitaltwins/test_dt_resource_lifecycle_int.py index 835e2c235..7ac99b584 100644 --- a/azext_iot/tests/digitaltwins/test_dt_resource_lifecycle_int.py +++ b/azext_iot/tests/digitaltwins/test_dt_resource_lifecycle_int.py @@ -99,7 +99,7 @@ def test_dt_resource(self): self.track_instance(create_output) assert_common_resource_attributes( - self.wait_for_hostname(create_output), + create_output, instance_names[0], self.rg, self.region, @@ -140,13 +140,14 @@ def test_dt_resource(self): # wait for identity assignment sleep(60) - assert_common_resource_attributes( - self.wait_for_hostname(create_msi_output), - instance_names[1], - self.rg, - self.rg_region, - tags=None, - assign_identity=True, + self.cmd( + "dt wait -n {} -g {} --custom \"{}\" --interval {} --timeout {}".format( + instance_names[1], + self.rg, + "identity!='None'", + 15, + 15 * 20 + ) ) show_msi_output = self.cmd( @@ -162,9 +163,6 @@ def test_dt_resource(self): assign_identity=True, ) - # Wait for RBAC to catch up - sleep(10) - role_assignment_egt_list = self.cmd( "role assignment list --scope {} --assignee {}".format( eventgrid_topic_id, show_msi_output["identity"]["principalId"] @@ -182,15 +180,30 @@ def test_dt_resource(self): # Update tags and disable MSI updated_tags = "env=test tier=premium" updated_tags_dict = {"env": "test", "tier": "premium"} - remove_msi_output = self.cmd( + self.cmd( "dt create -n {} -g {} --assign-identity false --tags {}".format( instance_names[1], self.rg, updated_tags ) ).get_output_in_json() + self.cmd( + "dt wait -n {} -g {} --custom \"{}\" --interval {} --timeout {}".format( + instance_names[1], + self.rg, + "identity=='None'", + 15, + 15 * 20 + ) + ) + + remove_msi_output = self.cmd( + "dt show -n {} -g {}".format( + instance_names[1], self.rg + ) + ).get_output_in_json() + assert_common_resource_attributes( - self.wait_for_hostname( - remove_msi_output, wait_in_sec=15, interval=20, extra_condition="identity=='None'"), + remove_msi_output, instance_names[1], self.rg, self.rg_region, diff --git a/azext_iot/tests/digitaltwins/test_dt_resource_unit.py b/azext_iot/tests/digitaltwins/test_dt_resource_unit.py new file mode 100644 index 000000000..8d2dea7e8 --- /dev/null +++ b/azext_iot/tests/digitaltwins/test_dt_resource_unit.py @@ -0,0 +1,166 @@ +# coding=utf-8 +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import re +import pytest +import responses +import json +import azext_iot.digitaltwins.providers.resource +from time import sleep +from azext_iot.digitaltwins import commands_resource as subject +from azext_iot.tests.digitaltwins.dt_helpers import generate_generic_id +from msrestazure.azure_exceptions import CloudError +from azext_iot.digitaltwins.common import ADTPublicNetworkAccessType + +# Default values +name = generate_generic_id() +resource_group_name = generate_generic_id() +location = 'westus' +role_type = "Contributor" +public_network_access = ADTPublicNetworkAccessType.enabled.value +provisioning = json.dumps({"provisioningState": "provisioning"}) +finished = json.dumps({"provisioningState": "succeeded"}) +failed = json.dumps({"provisioningState": "failed"}) + + +@pytest.fixture +def start_twin_response(mocked_response, fixture_dt_client): + mocked_response.assert_all_requests_are_fired = False + + mocked_response.add( + method=responses.GET, + content_type="application/json", + url=re.compile( + "https://management.azure.com/subscriptions/(.*)/resourcegroups/(.*)?" + ), + status=200, + match_querystring=False, + body=json.dumps({"name": name}), + ) + + mocked_response.add( + method=responses.GET, + content_type="application/json", + url=re.compile( + "https://management.azure.com/subscriptions/(.*)/" + "providers/Microsoft.DigitalTwins/digitalTwinsInstances" + ), + status=200, + match_querystring=False, + body=json.dumps({"name": name}), + ) + + yield mocked_response + + +class TestTwinCreateInstance(object): + @pytest.fixture + def service_client_with_retry(self, mocked_response, start_twin_response): + mocked_response.add( + method=responses.PUT, + url=re.compile( + "https://(.*)management.azure.com/subscriptions/(.*)/" + "resourceGroups/{}/providers/Microsoft.DigitalTwins/digitalTwinsInstances/{}".format( + resource_group_name, name + ) + ), + body=provisioning, + status=201, + headers={ + "Location": + "https://management.azure.com/subscriptions/xxx/providers/Microsoft.DigitalTwins/" + "locations/xxx/operationResults/operationkey" + }, + content_type="application/json", + match_querystring=False, + ) + + mocked_response.add( + method=responses.GET, + url="https://management.azure.com/subscriptions/xxx/providers/Microsoft.DigitalTwins/" + "locations/xxx/operationResults/operationkey", + body=finished, + status=200, + headers={ + "Location": + "https://management.azure.com/subscriptions/xxx/providers/Microsoft.DigitalTwins/" + "locations/xxx/operationResults/operationkey2" + }, + content_type="application/json", + match_querystring=False, + ) + + yield mocked_response + + def test_create_instance_with_retry(self, fixture_cmd, mocker, service_client_with_retry): + mocker.patch.object(azext_iot.digitaltwins.providers.resource, "ADT_CREATE_RETRY_AFTER", 0.0001) + subject.create_instance( + cmd=fixture_cmd, + name=name, + resource_group_name=resource_group_name, + location=location + ) + while len(service_client_with_retry.calls) == 1: + sleep(10) + check_request = service_client_with_retry.calls[1].request + assert "operationkey" in check_request.url + assert len(service_client_with_retry.calls) == 2 + assert service_client_with_retry.calls[1].response.content.decode("utf-8") == finished + + @pytest.fixture + def service_client_with_failed_retry(self, mocked_response, start_twin_response): + mocked_response.add( + method=responses.PUT, + url=re.compile( + "https://(.*)management.azure.com/subscriptions/(.*)/" + "resourceGroups/{}/providers/Microsoft.DigitalTwins/digitalTwinsInstances/{}".format( + resource_group_name, name + ) + ), + body=provisioning, + status=201, + headers={ + "Location": + "https://management.azure.com/subscriptions/xxx/providers/Microsoft.DigitalTwins/" + "locations/xxx/operationResults/operationkey" + }, + content_type="application/json", + match_querystring=False, + ) + + mocked_response.add( + method=responses.GET, + url="https://management.azure.com/subscriptions/xxx/providers/Microsoft.DigitalTwins/" + "locations/xxx/operationResults/operationkey", + body=failed, + status=500, + content_type="application/json", + match_querystring=False, + ) + + yield mocked_response + + def test_create_instance_with_failed_retry(self, fixture_cmd, mocker, service_client_with_failed_retry): + mocker.patch.object(azext_iot.digitaltwins.providers.resource, "ADT_CREATE_RETRY_AFTER", 0.0001) + result = subject.create_instance( + cmd=fixture_cmd, + name=name, + resource_group_name=resource_group_name, + location=location + ) + while len(service_client_with_failed_retry.calls) == 1: + sleep(10) + check_request = service_client_with_failed_retry.calls[1].request + assert "operationkey" in check_request.url + + # The LRO poller calls once more for some reason + assert len(service_client_with_failed_retry.calls) >= 2 + assert service_client_with_failed_retry.calls[1].response.content.decode("utf-8") == failed + + # The poller.result will have the error + assert result.status() == "Failed" + with pytest.raises(CloudError): + result.result()