diff --git a/CredScanSuppressions.json b/CredScanSuppressions.json index 9d59a4227..3e56732e4 100644 --- a/CredScanSuppressions.json +++ b/CredScanSuppressions.json @@ -25,14 +25,10 @@ "_justification": "Completely made up keys for unit tests." }, { - "file": "azext_iot\\tests\\dps\\test_iot_dps_int.py", + "file": "azext_iot\\tests\\dps\\enrollment_group\\test_iot_dps_enrollment_group_int.py", "placeholder": "cT/EXZvsplPEpT//p98Pc6sKh8mY3kYgSxavHwMkl7w=", "_justification": "Ensure made up endorsement key evaluates to the expected device key." }, - { - "file": "azext_iot\\tests\\dps\\test_iot_dps_unit.py", - "_justification": "Completely made up keys for unit tests." - }, { "file": "azext_iot\\tests\\iothub\\core\\test_iot_ext_unit.py", "placeholder": "+XLy+MVZ+aTeOnVzN2kLeB16O+kSxmz6g3rS6fAf6rw=", diff --git a/HISTORY.rst b/HISTORY.rst index c36fb0205..8ff577038 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -9,7 +9,7 @@ Release History **General updates** -* The generic CLIErrors raised across the extension have been changed to more specific semantically correct exceptions aligning with CLI core. +* The generic CLIErrors raised across the extension have been changed to more specific semantically correct exceptions aligning with CLI core. * Fix for issue #475 resolving `sys.excepthook` upon terminating monitor-events process in Py 3.9+ environments [IoT Hub, IoT Central]. **Digital Twin updates** @@ -19,6 +19,12 @@ Release History * Updated both controlplane and dataplane SDKs to now use the newer 2021-06-30-preview API version. +* Added `--no-wait` parameter to the following functions: + + - az dt create + - az dt endpoint create + - az dt private-endpoint create + **IoT Central updates** * Added commands for Edge devices and modules: @@ -29,17 +35,20 @@ Release History - az iot central device edge manifest - az iot central device edge manifest show - + - az iot central device edge children - az iot central device edge children list - az iot central device edge children add - az iot central device edge children remove - -* Added `--no-wait` parameter to the following functions: - - az dt create - - az dt endpoint create - - az dt private-endpoint create +**IoT DPS updates** + +* Reorganizing command structure for enrollment-group commands: + - 'az iot dps compute-device-key' is deprecated use 'az iot dps enrollment-group compute-device-key' instead. + - 'az iot dps registration' is deprecated use 'az iot dps enrollment-group registration' instead. + - 'az iot dps registration delete' is deprecated use 'az iot dps enrollment-group registration delete' instead. + - 'az iot dps registration list' is deprecated use 'az iot dps enrollment-group registration list' instead. + - 'az iot dps registration show' is deprecated use 'az iot dps enrollment-group registration show' instead. 0.13.0 diff --git a/azext_iot/_help.py b/azext_iot/_help.py index dab623e86..a8b4ddbb8 100644 --- a/azext_iot/_help.py +++ b/azext_iot/_help.py @@ -1340,6 +1340,54 @@ short-summary: Delete an enrollment group in an Azure IoT Hub Device Provisioning Service. """ +helps[ + "iot dps enrollment-group registration" +] = """ + type: group + short-summary: Manage device registrations for an enrollment group in an Azure IoT Hub Device + Provisioning Service. +""" + +helps[ + "iot dps enrollment-group registration list" +] = """ + type: command + short-summary: List device registrations for an enrollment group in an Azure IoT Hub Device + Provisioning Service. +""" + +helps[ + "iot dps enrollment-group registration show" +] = """ + type: command + short-summary: Get a device registration for an enrollment group in an Azure IoT Hub Device + Provisioning Service. +""" + +helps[ + "iot dps enrollment-group registration delete" +] = """ + type: command + short-summary: Delete a device registration for an enrollment group in an Azure IoT Hub Device + Provisioning Service. +""" + +helps[ + "iot dps enrollment-group compute-device-key" +] = """ + type: command + short-summary: Generate a derived device SAS key for an enrollment group in an Azure IoT Hub Device + Provisioning Service. + examples: + - name: Compute the device key with the given symmetric key. + text: > + az iot dps enrollment-group compute-device-key --key {enrollement_group_symmetric_key} --registration-id {registration_id} + - name: Compute the device key with the given enrollment group. + text: > + az iot dps enrollment-group compute-device-key -g {resource_group_name} --dps-name {dps_name} + --enrollment-id {enrollment_id} --registration-id {registration_id} +""" + helps[ "iot dps registration" ] = """ diff --git a/azext_iot/_params.py b/azext_iot/_params.py index 4cd06f741..4ef6eed97 100644 --- a/azext_iot/_params.py +++ b/azext_iot/_params.py @@ -1069,6 +1069,11 @@ def load_arguments(self, _): "If attestation with a root CA certificate is desired then a root ca name must be provided.", arg_group="Authentication" ) + context.argument( + "registration_id", + options_list=["--registration-id", "--rid"], + help="ID of device registration." + ) with self.argument_context("iot dps enrollment-group show") as context: context.argument( @@ -1078,6 +1083,16 @@ def load_arguments(self, _): help="Include attestation keys and information in enrollment group results.", ) + with self.argument_context("iot dps enrollment-group compute-device-key") as context: + context.argument( + "symmetric_key", + options_list=["--symmetric-key", "--key"], + help="The symmetric shared access key for the enrollment group. This bypasses the " + "Device Provisioning Service registry and generates the SAS token directly " + "from the supplied symmetric key without further validation. All other command " + "parameters aside from registration ID will be ignored.", + ) + with self.argument_context("iot dps registration") as context: context.argument("registration_id", help="ID of device registration.") diff --git a/azext_iot/commands.py b/azext_iot/commands.py index 84c08f6c0..d24374181 100644 --- a/azext_iot/commands.py +++ b/azext_iot/commands.py @@ -166,7 +166,11 @@ def load_command_table(self, _): with self.command_group("iot dps", command_type=iotdps_ops) as cmd_group: cmd_group.command( - "compute-device-key", "iot_dps_compute_device_key", is_preview=True + "compute-device-key", + "iot_dps_compute_device_key", + deprecate_info=self.deprecate( + redirect='iot dps enrollment-group compute-device-key', hide=True + ) ) with self.command_group("iot dps enrollment", command_type=iotdps_ops) as cmd_group: @@ -184,15 +188,28 @@ def load_command_table(self, _): cmd_group.show_command("show", "iot_dps_device_enrollment_group_get") cmd_group.command("update", "iot_dps_device_enrollment_group_update") cmd_group.command("delete", "iot_dps_device_enrollment_group_delete") + cmd_group.command( + "compute-device-key", "iot_dps_compute_device_key" + ) + + with self.command_group( + "iot dps enrollment-group registration", command_type=iotdps_ops + ) as cmd_group: + cmd_group.command("list", "iot_dps_registration_list") + cmd_group.show_command("show", "iot_dps_registration_get") + cmd_group.command("delete", "iot_dps_registration_delete") with self.command_group( - "iot dps registration", command_type=iotdps_ops + "iot dps registration", + command_type=iotdps_ops, + deprecate_info=self.deprecate(redirect='iot dps enrollment-group registration', hide=True) ) as cmd_group: cmd_group.command("list", "iot_dps_registration_list") cmd_group.show_command("show", "iot_dps_registration_get") cmd_group.command("delete", "iot_dps_registration_delete") with self.command_group( - "iot dps connection-string", command_type=iotdps_ops + "iot dps connection-string", + command_type=iotdps_ops, ) as cmd_group: cmd_group.show_command("show", "iot_dps_connection_string_show") diff --git a/azext_iot/tests/conftest.py b/azext_iot/tests/conftest.py index 4fab9c304..ebe959d38 100644 --- a/azext_iot/tests/conftest.py +++ b/azext_iot/tests/conftest.py @@ -16,6 +16,7 @@ from azext_iot.tests.generators import generate_generic_id from azext_iot.common.shared import DeviceAuthApiType +# Patch paths path_get_device = "azext_iot.operations.hub._iot_device_show" path_iot_hub_service_factory = "azext_iot._factory.iot_hub_service_factory" path_service_client = "msrest.service_client.ServiceClient.send" @@ -33,10 +34,16 @@ path_iot_device_show = "azext_iot.operations.hub._iot_device_show" path_update_device_twin = "azext_iot.operations.hub._iot_device_twin_update" hub_entity = "myhub.azure-devices.net" +path_iot_service_provisioning_factory = "azext_iot._factory.iot_service_provisioning_factory" +path_gdcs = "azext_iot.dps.providers.discovery.DPSDiscovery.get_target" +path_discovery_dps_init = ( + "azext_iot.dps.providers.discovery.DPSDiscovery._initialize_client" +) instance_name = generate_generic_id() hostname = "{}.subdomain.domain".format(instance_name) +# Mock Iot Hub Target mock_target = {} mock_target["entity"] = hub_entity mock_target["primarykey"] = "rJx/6rJ6rmG4ak890+eW5MYGH+A0uzRvjGNjg3Ve8sfo=" @@ -47,6 +54,22 @@ mock_target["sku_tier"] = "Standard" mock_target["resourcegroup"] = "myresourcegroup" +# Mock Iot DPS Target +mock_dps_target = {} +mock_dps_target['cs'] = 'HostName=mydps;SharedAccessKeyName=name;SharedAccessKey=value' +mock_dps_target['entity'] = 'mydps' +mock_dps_target['primarykey'] = 'rJx/6rJ6rmG4ak890+eW5MYGH+A0uzRvjGNjg3Ve8sfo=' +mock_dps_target['secondarykey'] = 'aCd/6rJ6rmG4ak890+eW5MYGH+A0uzRvjGNjg3Ve8sfo=' +mock_dps_target['policy'] = 'provisioningserviceowner' +mock_dps_target['subscription'] = "5952cff8-bcd1-4235-9554-af2c0348bf23" + +mock_symmetric_key_attestation = { + "type": "symmetricKey", + "symmetricKey": { + "primaryKey": "primary_key", + "secondaryKey": "secondary_key" + }, +} generic_cs_template = "HostName={};SharedAccessKeyName={};SharedAccessKey={}" @@ -380,3 +403,23 @@ def fixture_dt_client(mocker, fixture_cmd): def pytest_addoption(parser): parser.addoption("--api-version", action="store", default=None) + + +# DPS Fixtures +@pytest.fixture() +def fixture_gdcs(mocker): + gdcs = mocker.patch(path_gdcs) + gdcs.return_value = mock_dps_target + mocker.patch(path_iot_service_provisioning_factory) + mocker.patch(path_discovery_dps_init) + + return gdcs + + +@pytest.fixture() +def fixture_dps_sas(mocker): + r = SasTokenAuthentication(mock_dps_target['entity'], + mock_dps_target['policy'], + mock_dps_target['primarykey']) + sas = mocker.patch(path_sas) + sas.return_value = r diff --git a/azext_iot/tests/dps/__init__.py b/azext_iot/tests/dps/__init__.py index 15088be4e..a11a841dc 100644 --- a/azext_iot/tests/dps/__init__.py +++ b/azext_iot/tests/dps/__init__.py @@ -36,6 +36,14 @@ PREFIX_GROUP_ENROLLMENT = "test-groupenroll-" USER_ROLE = "Device Provisioning Service Data Contributor" +TEST_ENDORSEMENT_KEY = ( + "AToAAQALAAMAsgAgg3GXZ0SEs/gakMyNRqXXJP1S124GUgtk8qHaGzMUaaoABgCAAEMAEAgAAAAAAAEAibym9HQP9vxCGF5dVc1Q" + "QsAGe021aUGJzNol1/gycBx3jFsTpwmWbISRwnFvflWd0w2Mc44FAAZNaJOAAxwZvG8GvyLlHh6fGKdh+mSBL4iLH2bZ4Ry22cB3" + "CJVjXmdGoz9Y/j3/NwLndBxQC+baNvzvyVQZ4/A2YL7vzIIj2ik4y+ve9ir7U0GbNdnxskqK1KFIITVVtkTIYyyFTIR0BySjPrRI" + "Dj7r7Mh5uF9HBppGKQCBoVSVV8dI91lNazmSdpGWyqCkO7iM4VvUMv2HT/ym53aYlUrau+Qq87Tu+uQipWYgRdF11KDfcpMHqqzB" + "QQ1NpOJVhrsTrhyJzO7KNw==" +) + # Test Environment Variables settings = DynamoSettings( req_env_set=ENV_SET_TEST_IOTHUB_REQUIRED, diff --git a/azext_iot/tests/dps/enrollment/__init__.py b/azext_iot/tests/dps/enrollment/__init__.py new file mode 100644 index 000000000..55614acbf --- /dev/null +++ b/azext_iot/tests/dps/enrollment/__init__.py @@ -0,0 +1,5 @@ +# coding=utf-8 +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- diff --git a/azext_iot/tests/dps/enrollment/test_iot_dps_enrollment_int.py b/azext_iot/tests/dps/enrollment/test_iot_dps_enrollment_int.py new file mode 100644 index 000000000..a8f6cc8a4 --- /dev/null +++ b/azext_iot/tests/dps/enrollment/test_iot_dps_enrollment_int.py @@ -0,0 +1,388 @@ +# coding=utf-8 +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from azext_iot.common.shared import EntityStatusType, AttestationType, AllocationType, ReprovisionType +from azext_iot.common.utility import generate_key +from azext_iot.tests.dps import ( + API_VERSION, + CERT_PATH, + DATAPLANE_AUTH_TYPES, + WEBHOOK_URL, + TEST_ENDORSEMENT_KEY, + IoTDPSLiveScenarioTest +) + + +class TestDPSEnrollments(IoTDPSLiveScenarioTest): + def __init__(self, test_method): + super(TestDPSEnrollments, self).__init__(test_method) + + def test_dps_enrollment_tpm_lifecycle(self): + attestation_type = AttestationType.tpm.value + for auth_phase in DATAPLANE_AUTH_TYPES: + enrollment_id = self.generate_enrollment_names()[0] + device_id = self.generate_device_names()[0] + + enrollment = self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment create --enrollment-id {} --attestation-type {}" + " -g {} --dps-name {} --endorsement-key {}" + " --provisioning-status {} --device-id {} --initial-twin-tags {}" + " --initial-twin-properties {} --device-information {} " + "--allocation-policy {} --iot-hubs {}".format( + enrollment_id, + attestation_type, + self.entity_rg, + self.entity_dps_name, + TEST_ENDORSEMENT_KEY, + EntityStatusType.enabled.value, + device_id, + '"{generic_dict}"', + '"{generic_dict}"', + '"{generic_dict}"', + AllocationType.static.value, + self.hub_host_name, + ), + auth_type=auth_phase + ), + checks=[ + self.check("attestation.type", attestation_type), + self.check("registrationId", enrollment_id), + self.check("provisioningStatus", EntityStatusType.enabled.value), + self.check("deviceId", device_id), + self.check("allocationPolicy", AllocationType.static.value), + self.check("iotHubs", self.hub_host_name.split()), + self.check("initialTwin.tags", self.kwargs["generic_dict"]), + self.check("optionalDeviceInformation", self.kwargs["generic_dict"]), + self.check( + "initialTwin.properties.desired", self.kwargs["generic_dict"] + ), + self.exists("reprovisionPolicy"), + self.check("reprovisionPolicy.migrateDeviceData", True), + self.check("reprovisionPolicy.updateHubAssignment", True), + ], + ).get_output_in_json() + etag = enrollment["etag"] + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment list -g {} --dps-name {}".format( + self.entity_rg, self.entity_dps_name + ), + auth_type=auth_phase + ), + checks=[ + self.check("length(@)", 1), + self.check("[0].registrationId", enrollment_id), + ], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment show -g {} --dps-name {} --enrollment-id {}".format( + self.entity_rg, self.entity_dps_name, enrollment_id + ), + auth_type=auth_phase + ), + checks=[self.check("registrationId", enrollment_id)], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment show -g {} --dps-name {} --enrollment-id {} --show-keys".format( + self.entity_rg, self.entity_dps_name, enrollment_id + ), + auth_type=auth_phase + ), + checks=[ + self.check("registrationId", enrollment_id), + self.check("attestation.type", attestation_type), + self.exists("attestation.{}".format(attestation_type)), + ], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment update -g {} --dps-name {} --enrollment-id {}" + " --provisioning-status {} --etag {} --info {}".format( + self.entity_rg, + self.entity_dps_name, + enrollment_id, + EntityStatusType.disabled.value, + etag, + '""' + ), + auth_type=auth_phase + ), + checks=[ + self.check("attestation.type", attestation_type), + self.check("registrationId", enrollment_id), + self.check("provisioningStatus", EntityStatusType.disabled.value), + self.check("deviceId", device_id), + self.check("allocationPolicy", AllocationType.static.value), + self.check("iotHubs", self.hub_host_name.split()), + self.exists("initialTwin.tags"), + self.exists("initialTwin.properties.desired"), + self.exists("optionalDeviceInformation"), + ], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment delete -g {} --dps-name {} --enrollment-id {}".format( + self.entity_rg, self.entity_dps_name, enrollment_id + ), + auth_type=auth_phase + ), + ) + + def test_dps_enrollment_x509_lifecycle(self): + attestation_type = AttestationType.x509.value + for auth_phase in DATAPLANE_AUTH_TYPES: + enrollment_id = self.generate_enrollment_names()[0] + device_id = self.generate_device_names()[0] + + etag = self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment create --enrollment-id {} --attestation-type {}" + " -g {} --dps-name {} --cp {} --scp {}" + " --provisioning-status {} --device-id {}" + " --initial-twin-tags {} --initial-twin-properties {}" + " --allocation-policy {} --iot-hubs {}".format( + enrollment_id, + attestation_type, + self.entity_rg, + self.entity_dps_name, + CERT_PATH, + CERT_PATH, + EntityStatusType.enabled.value, + device_id, + '"{generic_dict}"', + '"{generic_dict}"', + AllocationType.hashed.value, + self.hub_host_name, + ), + auth_type=auth_phase + ), + checks=[ + self.check("attestation.type", attestation_type), + self.check("registrationId", enrollment_id), + self.check("provisioningStatus", EntityStatusType.enabled.value), + self.check("deviceId", device_id), + self.check("allocationPolicy", AllocationType.hashed.value), + self.check("iotHubs", self.hub_host_name.split()), + self.check("initialTwin.tags", self.kwargs["generic_dict"]), + self.check( + "initialTwin.properties.desired", self.kwargs["generic_dict"] + ), + self.exists("reprovisionPolicy"), + self.check("reprovisionPolicy.migrateDeviceData", True), + self.check("reprovisionPolicy.updateHubAssignment", True), + ], + ).get_output_in_json()["etag"] + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment list -g {} --dps-name {}".format(self.entity_rg, self.entity_dps_name), + auth_type=auth_phase + ), + checks=[ + self.check("length(@)", 1), + self.check("[0].registrationId", enrollment_id), + ], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment show -g {} --dps-name {} --enrollment-id {}".format( + self.entity_rg, self.entity_dps_name, enrollment_id + ), + auth_type=auth_phase + ), + checks=[self.check("registrationId", enrollment_id)], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment update -g {} --dps-name {} --enrollment-id {}" + " --provisioning-status {} --etag {} --info {} --rc".format( + self.entity_rg, + self.entity_dps_name, + enrollment_id, + EntityStatusType.disabled.value, + etag, + '"{generic_dict}"', + ), + auth_type=auth_phase + ), + checks=[ + self.check("attestation.type", attestation_type), + self.check("registrationId", enrollment_id), + self.check("provisioningStatus", EntityStatusType.disabled.value), + self.check("deviceId", device_id), + self.check("allocationPolicy", AllocationType.hashed.value), + self.check("iotHubs", self.hub_host_name.split()), + self.exists("initialTwin.tags"), + self.exists("initialTwin.properties.desired"), + self.check("optionalDeviceInformation", self.kwargs["generic_dict"]), + self.check("attestation.type.x509.clientCertificates.primary", None), + ], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment delete -g {} --dps-name {} --enrollment-id {}".format( + self.entity_rg, self.entity_dps_name, enrollment_id + ), + auth_type=auth_phase + ), + ) + + def test_dps_enrollment_symmetrickey_lifecycle(self): + attestation_type = AttestationType.symmetricKey.value + for auth_phase in DATAPLANE_AUTH_TYPES: + enrollment_id, enrollment_id2 = self.generate_enrollment_names(count=2) + primary_key = generate_key() + secondary_key = generate_key() + device_id = self.generate_enrollment_names()[0] + + # Use provided keys + etag = self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment create --enrollment-id {} --attestation-type {}" + " -g {} --dps-name {} --pk {} --sk {}" + " --provisioning-status {} --device-id {}" + " --initial-twin-tags {} --initial-twin-properties {} --device-information {}" + " --allocation-policy {} --rp {} --iot-hubs {} --edge-enabled".format( + enrollment_id, + attestation_type, + self.entity_rg, + self.entity_dps_name, + primary_key, + secondary_key, + EntityStatusType.enabled.value, + device_id, + '"{generic_dict}"', + '"{generic_dict}"', + '"{generic_dict}"', + AllocationType.geolatency.value.lower(), + ReprovisionType.reprovisionandresetdata.value, + self.hub_host_name, + ), + auth_type=auth_phase + ), + checks=[ + self.check("attestation.type", attestation_type), + self.check("registrationId", enrollment_id), + self.check("provisioningStatus", EntityStatusType.enabled.value), + self.check("deviceId", device_id), + self.check("allocationPolicy", AllocationType.geolatency.value), + self.check("iotHubs", self.hub_host_name.split()), + self.check("initialTwin.tags", self.kwargs["generic_dict"]), + self.check("optionalDeviceInformation", self.kwargs["generic_dict"]), + self.check( + "initialTwin.properties.desired", self.kwargs["generic_dict"] + ), + self.exists("reprovisionPolicy"), + self.check("reprovisionPolicy.migrateDeviceData", False), + self.check("reprovisionPolicy.updateHubAssignment", True), + self.check("capabilities.iotEdge", True), + ], + ).get_output_in_json()["etag"] + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment list -g {} --dps-name {}".format(self.entity_rg, self.entity_dps_name), + auth_type=auth_phase + ), + checks=[ + self.check("length(@)", 1), + self.check("[0].registrationId", enrollment_id), + ], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment show -g {} --dps-name {} --enrollment-id {}".format( + self.entity_rg, self.entity_dps_name, enrollment_id + ), + auth_type=auth_phase + ), + checks=[self.check("registrationId", enrollment_id)], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment update -g {} --dps-name {} --enrollment-id {}" + " --provisioning-status {} --etag {} --edge-enabled False" + " --allocation-policy {} --webhook-url {} --api-version {}".format( + self.entity_rg, + self.entity_dps_name, + enrollment_id, + EntityStatusType.disabled.value, + etag, + AllocationType.custom.value, + WEBHOOK_URL, + API_VERSION, + ), + auth_type=auth_phase + ), + checks=[ + self.check("attestation.type", attestation_type), + self.check("registrationId", enrollment_id), + self.check("provisioningStatus", EntityStatusType.disabled.value), + self.check("deviceId", device_id), + self.check("allocationPolicy", "custom"), + self.check("customAllocationDefinition.webhookUrl", WEBHOOK_URL), + self.check("customAllocationDefinition.apiVersion", API_VERSION), + self.check("iotHubs", None), + self.exists("initialTwin.tags"), + self.exists("initialTwin.properties.desired"), + self.check("attestation.symmetricKey.primaryKey", primary_key), + self.check("capabilities.iotEdge", False), + ], + ) + + # Use service generated keys + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment create --enrollment-id {} --attestation-type {}" + " -g {} --dps-name {} --allocation-policy {} --webhook-url {} --api-version {}".format( + enrollment_id2, + attestation_type, + self.entity_rg, + self.entity_dps_name, + AllocationType.custom.value, + WEBHOOK_URL, + API_VERSION, + ), + auth_type=auth_phase + ), + checks=[ + self.check("attestation.type", attestation_type), + self.check("registrationId", enrollment_id2), + self.check("allocationPolicy", "custom"), + self.check("customAllocationDefinition.webhookUrl", WEBHOOK_URL), + self.check("customAllocationDefinition.apiVersion", API_VERSION), + ], + ) + + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment delete -g {} --dps-name {} --enrollment-id {}".format( + self.entity_rg, self.entity_dps_name, enrollment_id + ), + auth_type=auth_phase + ), + ) + self.cmd( + self.set_cmd_auth_type( + "iot dps enrollment delete -g {} --dps-name {} --enrollment-id {}".format( + self.entity_rg, self.entity_dps_name, enrollment_id2 + ), + auth_type=auth_phase + ), + ) diff --git a/azext_iot/tests/dps/enrollment/test_iot_dps_enrollment_unit.py b/azext_iot/tests/dps/enrollment/test_iot_dps_enrollment_unit.py new file mode 100644 index 000000000..d71aa21d9 --- /dev/null +++ b/azext_iot/tests/dps/enrollment/test_iot_dps_enrollment_unit.py @@ -0,0 +1,632 @@ +# coding=utf-8 +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import pytest +import json +import responses +from azext_iot.operations import dps as subject +from knack.util import CLIError +from azext_iot.tests.conftest import mock_dps_target, mock_symmetric_key_attestation + + +enrollment_id = 'myenrollment' +resource_group = 'myrg' +etag = 'AAAA==' + + +def generate_enrollment_create_req(attestation_type=None, endorsement_key=None, + certificate_path=None, secondary_certificate_path=None, + device_id=None, iot_hub_host_name=None, + initial_twin_tags=None, initial_twin_properties=None, + provisioning_status=None, reprovision_policy=None, + primary_key=None, secondary_key=None, allocation_policy=None, + iot_hubs=None, edge_enabled=False, webhook_url=None, api_version=None): + return {'client': None, + 'enrollment_id': enrollment_id, + 'rg': resource_group, + 'dps_name': mock_dps_target['entity'], + 'attestation_type': attestation_type, + 'endorsement_key': endorsement_key, + 'certificate_path': certificate_path, + 'secondary_certificate_path': secondary_certificate_path, + 'device_id': device_id, + 'iot_hub_host_name': iot_hub_host_name, + 'initial_twin_tags': initial_twin_tags, + 'initial_twin_properties': initial_twin_properties, + 'provisioning_status': provisioning_status, + 'reprovision_policy': reprovision_policy, + 'primary_key': primary_key, + 'secondary_key': secondary_key, + 'allocation_policy': allocation_policy, + 'iot_hubs': iot_hubs, + 'edge_enabled': edge_enabled, + 'webhook_url': webhook_url, + 'api_version': api_version} + + +class TestEnrollmentCreate(): + @pytest.fixture() + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas): + mocked_response.add( + method=responses.PUT, + url="https://{}/enrollments/{}".format(mock_dps_target['entity'], enrollment_id), + body='{}', + status=200, + content_type="application/json", + match_querystring=False, + ) + yield mocked_response + + @pytest.fixture(params=[400, 401, 500]) + def serviceclient_generic_error(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): + mocked_response.add( + method=responses.PUT, + url="https://{}/enrollments/{}".format(mock_dps_target['entity'], enrollment_id), + body='{}', + status=request.param, + content_type="application/json", + match_querystring=False, + ) + yield mocked_response + + @pytest.mark.parametrize("req", [ + (generate_enrollment_create_req(attestation_type='tpm', + endorsement_key='mykey')), + (generate_enrollment_create_req(attestation_type='tpm', + endorsement_key='mykey', + device_id='1', + iot_hub_host_name='myHub', + provisioning_status='disabled')), + (generate_enrollment_create_req(attestation_type='tpm', + endorsement_key='mykey', + provisioning_status='enabled', + initial_twin_tags={'key': 'value'})), + (generate_enrollment_create_req(attestation_type='x509', + certificate_path='myCert')), + (generate_enrollment_create_req(attestation_type='x509', + secondary_certificate_path='myCert2')), + (generate_enrollment_create_req(attestation_type='x509', + certificate_path='myCert', + device_id='1', + iot_hub_host_name='myHub', + provisioning_status='disabled')), + (generate_enrollment_create_req(attestation_type='x509', + certificate_path='myCert', + provisioning_status='enabled', + initial_twin_properties={'key': 'value'})), + (generate_enrollment_create_req(attestation_type='symmetricKey')), + (generate_enrollment_create_req(attestation_type='symmetricKey', + primary_key='primarykey', + secondary_key='secondarykey')), + (generate_enrollment_create_req(attestation_type='tpm', + endorsement_key='mykey', + reprovision_policy='reprovisionandmigratedata')), + (generate_enrollment_create_req(attestation_type='x509', + certificate_path='myCert', + reprovision_policy='reprovisionandresetdata')), + (generate_enrollment_create_req(attestation_type='symmetricKey', + primary_key='primarykey', + secondary_key='secondarykey', + reprovision_policy='never')), + (generate_enrollment_create_req(attestation_type='symmetricKey', + primary_key='primarykey', + secondary_key='secondarykey', + reprovision_policy='never', + allocation_policy='static', + iot_hubs='hub1')), + (generate_enrollment_create_req(attestation_type='symmetricKey', + primary_key='primarykey', + secondary_key='secondarykey', + reprovision_policy='never', + allocation_policy='hashed', + iot_hubs='hub1 hub2')), + (generate_enrollment_create_req(attestation_type='symmetricKey', + primary_key='primarykey', + secondary_key='secondarykey', + reprovision_policy='never', + allocation_policy='geoLatency')), + (generate_enrollment_create_req(attestation_type='symmetricKey', + primary_key='primarykey', + secondary_key='secondarykey', + reprovision_policy='never', + allocation_policy='custom', + webhook_url="https://www.test.test", + api_version="2019-03-31")), + (generate_enrollment_create_req(attestation_type='symmetricKey', + primary_key='primarykey', + secondary_key='secondarykey', + edge_enabled=True)), + (generate_enrollment_create_req(attestation_type='symmetricKey', + primary_key='primarykey', + secondary_key='secondarykey', + edge_enabled=True, + initial_twin_properties={'key': ['value1', 'value2']})), + (generate_enrollment_create_req(attestation_type='tpm', + endorsement_key='mykey', + provisioning_status='enabled', + initial_twin_properties={'key': ['value1', 'value2']})) + ]) + def test_enrollment_create(self, serviceclient, fixture_cmd, req): + subject.iot_dps_device_enrollment_create( + cmd=fixture_cmd, + enrollment_id=req['enrollment_id'], + attestation_type=req['attestation_type'], + dps_name=req['dps_name'], + resource_group_name=req['rg'], + endorsement_key=req['endorsement_key'], + certificate_path=req['certificate_path'], + secondary_certificate_path=req['secondary_certificate_path'], + primary_key=req['primary_key'], + secondary_key=req['secondary_key'], + device_id=req['device_id'], + iot_hub_host_name=req['iot_hub_host_name'], + initial_twin_tags=req['initial_twin_tags'], + initial_twin_properties=req['initial_twin_properties'], + provisioning_status=req['provisioning_status'], + reprovision_policy=req['reprovision_policy'], + allocation_policy=req['allocation_policy'], + iot_hubs=req['iot_hubs'], + edge_enabled=req['edge_enabled'], + webhook_url=req['webhook_url'], + api_version=req['api_version'] + ) + request = serviceclient.calls[0].request + url = request.url + assert "{}/enrollments/{}?".format(mock_dps_target['entity'], enrollment_id) in url + assert request.method == 'PUT' + + body = json.loads(request.body) + assert body['registrationId'] == req['enrollment_id'] + if req['attestation_type'] == 'tpm': + assert body['attestation']['type'] == req['attestation_type'] + assert body['attestation']['tpm']['endorsementKey'] == req['endorsement_key'] + elif req['attestation_type'] == 'x509': + assert body['attestation']['type'] == req['attestation_type'] + assert body['attestation']['x509']['clientCertificates'] is not None + if req['certificate_path']: + assert body['attestation']['x509']['clientCertificates']['primary'] is not None + if req['secondary_certificate_path']: + assert body['attestation']['x509']['clientCertificates']['secondary'] is not None + else: + assert body['attestation']['type'] == req['attestation_type'] + assert body['attestation']['symmetricKey'] is not None + if req['primary_key']: + assert body['attestation']['symmetricKey']['primaryKey'] is not None + if req['secondary_key']: + assert body['attestation']['symmetricKey']['secondaryKey'] is not None + + if req['device_id']: + assert body['deviceId'] == req['device_id'] + if req['iot_hub_host_name']: + assert body['allocationPolicy'] == 'static' + assert body['iotHubs'] == req['iot_hub_host_name'].split() + if req['provisioning_status']: + assert body['provisioningStatus'] == req['provisioning_status'] + if req['initial_twin_properties']: + assert body['initialTwin']['properties']['desired'] == req['initial_twin_properties'] + if req['initial_twin_tags']: + assert body['initialTwin']['tags'] == req['initial_twin_tags'] + if not req['reprovision_policy']: + assert body['reprovisionPolicy']['migrateDeviceData'] + assert body['reprovisionPolicy']['updateHubAssignment'] + if req['reprovision_policy'] == 'reprovisionandmigratedata': + assert body['reprovisionPolicy']['migrateDeviceData'] + assert body['reprovisionPolicy']['updateHubAssignment'] + if req['reprovision_policy'] == 'reprovisionandresetdata': + assert not body['reprovisionPolicy']['migrateDeviceData'] + assert body['reprovisionPolicy']['updateHubAssignment'] + if req['reprovision_policy'] == 'never': + assert not body['reprovisionPolicy']['migrateDeviceData'] + assert not body['reprovisionPolicy']['updateHubAssignment'] + if req['allocation_policy']: + assert body['allocationPolicy'] == req['allocation_policy'] + if body['allocationPolicy'] == 'custom': + assert body['customAllocationDefinition']['webhookUrl'] == req['webhook_url'] + assert body['customAllocationDefinition']['apiVersion'] == req['api_version'] + if req['iot_hubs']: + assert body['iotHubs'] == req['iot_hubs'].split() + if req['edge_enabled']: + assert body['capabilities']['iotEdge'] + + @pytest.mark.parametrize("req", [ + (generate_enrollment_create_req(attestation_type='x509')), + (generate_enrollment_create_req(attestation_type='x509', endorsement_key='myKey')), + (generate_enrollment_create_req(attestation_type='tpm')), + (generate_enrollment_create_req(attestation_type='tpm', certificate_path='myCert')), + (generate_enrollment_create_req(reprovision_policy='invalid')), + (generate_enrollment_create_req(allocation_policy='invalid')), + (generate_enrollment_create_req(allocation_policy='static')), + (generate_enrollment_create_req(allocation_policy='custom')), + (generate_enrollment_create_req(allocation_policy='custom', webhook_url="https://www.test.test")), + (generate_enrollment_create_req(allocation_policy='static', iot_hubs='hub1 hub2')), + (generate_enrollment_create_req(allocation_policy='static', iot_hub_host_name='hubname')), + (generate_enrollment_create_req(iot_hubs='hub1 hub2')) + ]) + def test_enrollment_create_invalid_args(self, fixture_gdcs, fixture_cmd, req): + with pytest.raises(CLIError): + subject.iot_dps_device_enrollment_create( + cmd=fixture_cmd, + enrollment_id=req['enrollment_id'], + attestation_type=req['attestation_type'], + dps_name=req['dps_name'], + resource_group_name=req['rg'], + endorsement_key=req['endorsement_key'], + certificate_path=req['certificate_path'], + primary_key=req['primary_key'], + iot_hub_host_name=req['iot_hub_host_name'], + reprovision_policy=req['reprovision_policy'], + allocation_policy=req['allocation_policy'], + iot_hubs=req['iot_hubs'], + edge_enabled=req['edge_enabled'], + webhook_url=req['webhook_url'], + api_version=req['api_version'] + ) + + @pytest.mark.parametrize("req", [ + (generate_enrollment_create_req(attestation_type='tpm', endorsement_key='mykey')) + ]) + def test_enrollment_show_error(self, serviceclient_generic_error, fixture_cmd, req): + with pytest.raises(CLIError): + subject.iot_dps_device_enrollment_create( + cmd=fixture_cmd, + enrollment_id=req['enrollment_id'], + attestation_type=req['attestation_type'], + dps_name=req['dps_name'], + resource_group_name=req['rg'], + endorsement_key=req['endorsement_key'], + device_id=req['device_id'], + iot_hub_host_name=req['iot_hub_host_name'], + initial_twin_tags=req['initial_twin_tags'], + initial_twin_properties=req['initial_twin_properties'], + provisioning_status=req['provisioning_status'], + ) + + +def generate_enrollment_show(**kvp): + payload = {'attestation': + {'x509': {'clientCertificates': {'primary': + {'info': + {'issuerName': 'test', 'notAfterUtc': '2037-01-01T00:00:00Z', + 'notBeforeUtc': '2017-01-01T00:00:00Z', + 'serialNumber': '1A2B3C4D5E', + 'sha1Thumbprint': '109F2ED9D3FC92C88C1DE2203488B93D6B3F05F5', + 'sha256Thumbprint': 'F7D272B400C88FAC2A12A990F14DD2E881CA1F', + 'subjectName': 'test', + 'version': '3'}}, 'secondary': None}, + }, 'tpm': None, 'type': 'x509'}, + 'registrationId': enrollment_id, 'etag': etag, + 'provisioningStatus': 'disabled', 'iotHubHostName': 'myHub', + 'deviceId': 'myDevice'} + for k in kvp: + if payload.get(k): + payload[k] = kvp[k] + return payload + + +def generate_enrollment_update_req(certificate_path=None, iot_hub_host_name=None, + initial_twin_tags=None, + secondary_certificate_path=None, + remove_certificate_path=None, + remove_secondary_certificate_path=None, + initial_twin_properties=None, provisioning_status=None, + device_id=None, + etag=None, reprovision_policy=None, + allocation_policy=None, iot_hubs=None, + edge_enabled=None, webhook_url=None, api_version=None): + return {'client': None, + 'enrollment_id': enrollment_id, + 'rg': resource_group, + 'dps_name': mock_dps_target['entity'], + 'certificate_path': certificate_path, + 'secondary_certificate_path': secondary_certificate_path, + 'remove_certificate_path': remove_certificate_path, + 'remove_secondary_certificate_path': remove_secondary_certificate_path, + 'iot_hub_host_name': iot_hub_host_name, + 'initial_twin_tags': initial_twin_tags, + 'initial_twin_properties': initial_twin_properties, + 'provisioning_status': provisioning_status, + 'device_id': device_id, + 'etag': etag, + 'reprovision_policy': reprovision_policy, + 'allocation_policy': allocation_policy, + 'iot_hubs': iot_hubs, + 'edge_enabled': edge_enabled, + 'webhook_url': webhook_url, + 'api_version': api_version} + + +class TestEnrollmentUpdate(): + @pytest.fixture() + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): + # Initial GET + mocked_response.add( + method=responses.GET, + url="https://{}/enrollments/{}".format(mock_dps_target['entity'], enrollment_id), + body=json.dumps(generate_enrollment_show()), + status=200, + content_type="application/json", + match_querystring=False, + ) + + # Update PUT + mocked_response.add( + method=responses.PUT, + url="https://{}/enrollments/{}".format(mock_dps_target['entity'], enrollment_id), + body=json.dumps(generate_enrollment_show()), + status=200, + content_type="application/json", + match_querystring=False, + ) + yield mocked_response + + @pytest.mark.parametrize("req", [ + (generate_enrollment_update_req(etag=etag, secondary_certificate_path='someOtherCertPath')), + (generate_enrollment_update_req(certificate_path='newCertPath', secondary_certificate_path='someOtherCertPath')), + (generate_enrollment_update_req(remove_certificate_path='true')), + (generate_enrollment_update_req(iot_hub_host_name='someOtherHubName', + initial_twin_tags={'newKey': 'newValue'}, + initial_twin_properties={'newKey': 'newValue'}, + provisioning_status='enabled', + device_id='newId')), + (generate_enrollment_update_req(reprovision_policy='reprovisionandmigratedata')), + (generate_enrollment_update_req(reprovision_policy='reprovisionandresetdata')), + (generate_enrollment_update_req(reprovision_policy='never')), + (generate_enrollment_update_req(allocation_policy='static', iot_hubs='hub1')), + (generate_enrollment_update_req(allocation_policy='hashed', iot_hubs='hub1 hub2')), + (generate_enrollment_update_req(allocation_policy='geoLatency')), + (generate_enrollment_update_req(allocation_policy='custom', + webhook_url="https://www.test.test", + api_version="2019-03-31")), + (generate_enrollment_update_req(edge_enabled=True)), + (generate_enrollment_update_req(edge_enabled=False)) + ]) + def test_enrollment_update(self, serviceclient, fixture_cmd, req): + subject.iot_dps_device_enrollment_update( + cmd=fixture_cmd, + enrollment_id=req['enrollment_id'], + dps_name=req['dps_name'], + resource_group_name=req['rg'], + etag=req['etag'], + endorsement_key=None, + certificate_path=req['certificate_path'], + secondary_certificate_path=req['secondary_certificate_path'], + remove_certificate=req['remove_certificate_path'], + remove_secondary_certificate=req['remove_secondary_certificate_path'], + primary_key=None, + secondary_key=None, + device_id=req['device_id'], + iot_hub_host_name=req['iot_hub_host_name'], + initial_twin_tags=req['initial_twin_tags'], + initial_twin_properties=req['initial_twin_properties'], + provisioning_status=req['provisioning_status'], + reprovision_policy=req['reprovision_policy'], + allocation_policy=req['allocation_policy'], + iot_hubs=req['iot_hubs'], + edge_enabled=req['edge_enabled'], + webhook_url=req['webhook_url'], + api_version=req['api_version'] + ) + get_request = serviceclient.calls[0].request + assert get_request.method == 'GET' + assert "{}/enrollments/{}?".format(mock_dps_target['entity'], enrollment_id) in get_request.url + + update_request = serviceclient.calls[1].request + url = update_request.url + + assert "{}/enrollments/{}?".format(mock_dps_target['entity'], enrollment_id) in url + assert update_request.method == 'PUT' + + assert update_request.headers["If-Match"] == req['etag'] if req['etag'] else "*" + + body = json.loads(update_request.body) + if not req['certificate_path']: + if req['remove_certificate_path']: + assert body['attestation']['x509']['clientCertificates'].get('primary') is None + else: + assert body['attestation']['x509']['clientCertificates']['primary']['info'] is not None + if req['certificate_path']: + assert body['attestation']['x509']['clientCertificates']['primary']['certificate'] is not None + if req['secondary_certificate_path']: + assert body['attestation']['x509']['clientCertificates']['secondary']['certificate'] is not None + if req['iot_hub_host_name']: + assert body['allocationPolicy'] == 'static' + assert body['iotHubs'] == req['iot_hub_host_name'].split() + if req['provisioning_status']: + assert body['provisioningStatus'] == req['provisioning_status'] + if req['initial_twin_properties']: + assert body['initialTwin']['properties']['desired'] == req['initial_twin_properties'] + if req['initial_twin_tags']: + assert body['initialTwin']['tags'] == req['initial_twin_tags'] + if req['device_id']: + assert body['deviceId'] == req['device_id'] + if req['reprovision_policy'] == 'reprovisionandmigratedata': + assert body['reprovisionPolicy']['migrateDeviceData'] + assert body['reprovisionPolicy']['updateHubAssignment'] + if req['reprovision_policy'] == 'reprovisionandresetdata': + assert not body['reprovisionPolicy']['migrateDeviceData'] + assert body['reprovisionPolicy']['updateHubAssignment'] + if req['reprovision_policy'] == 'never': + assert not body['reprovisionPolicy']['migrateDeviceData'] + assert not body['reprovisionPolicy']['updateHubAssignment'] + if req['allocation_policy']: + assert body['allocationPolicy'] == req['allocation_policy'] + if body['allocationPolicy'] == 'custom': + assert body['customAllocationDefinition']['webhookUrl'] == req['webhook_url'] + assert body['customAllocationDefinition']['apiVersion'] == req['api_version'] + if req['iot_hubs']: + assert body['iotHubs'] == req['iot_hubs'].split() + if req['edge_enabled'] is not None: + assert body['capabilities']['iotEdge'] == req['edge_enabled'] + + +class TestEnrollmentShow(): + @pytest.fixture(params=[200]) + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): + mocked_response.add( + method=responses.GET, + url="https://{}/enrollments/{}".format(mock_dps_target['entity'], enrollment_id), + body=json.dumps(generate_enrollment_show()), + status=request.param, + content_type="application/json", + match_querystring=False, + ) + + yield mocked_response + + @pytest.fixture() + def serviceclient_attestation(self, mocked_response, fixture_gdcs, fixture_dps_sas): + mocked_response.add( + method=responses.GET, + url="https://{}/enrollments/{}".format(mock_dps_target['entity'], enrollment_id), + body=json.dumps(generate_enrollment_show(attestation=mock_symmetric_key_attestation)), + status=200, + content_type="application/json", + match_querystring=False, + ) + + mocked_response.add( + method=responses.POST, + url="https://{}/enrollments/{}/attestationmechanism".format(mock_dps_target['entity'], enrollment_id), + body=json.dumps(mock_symmetric_key_attestation), + status=200, + content_type="application/json", + match_querystring=False, + ) + yield mocked_response + + def test_enrollment_show(self, fixture_cmd, serviceclient): + result = subject.iot_dps_device_enrollment_get( + cmd=fixture_cmd, + enrollment_id=enrollment_id, + dps_name=mock_dps_target['entity'], + resource_group_name=resource_group, + ) + + assert result['registrationId'] == enrollment_id + + request = serviceclient.calls[0].request + url = request.url + method = request.method + + assert "{}/enrollments/{}?".format(mock_dps_target['entity'], enrollment_id) in url + assert method == 'GET' + + def test_enrollment_show_with_keys(self, fixture_cmd, serviceclient_attestation): + result = subject.iot_dps_device_enrollment_get( + cmd=fixture_cmd, + enrollment_id=enrollment_id, + dps_name=mock_dps_target['entity'], + resource_group_name=resource_group, + show_keys=True + ) + + assert result['registrationId'] == enrollment_id + assert result['attestation'] + + request = serviceclient_attestation.calls[0].request + url = request.url + method = request.method + + assert "{}/enrollments/{}?".format(mock_dps_target['entity'], enrollment_id) in url + assert method == 'GET' + + request = serviceclient_attestation.calls[1].request + url = request.url + method = request.method + + assert "{}/enrollments/{}/attestationmechanism?".format(mock_dps_target['entity'], enrollment_id) in url + assert method == 'POST' + + def test_enrollment_show_error(self, fixture_cmd, serviceclient_generic_error): + with pytest.raises(CLIError): + subject.iot_dps_device_enrollment_get( + cmd=fixture_cmd, + enrollment_id=enrollment_id, + dps_name=mock_dps_target['entity'], + resource_group_name=resource_group, + ) + + +class TestEnrollmentList(): + @pytest.fixture(params=[200]) + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): + mocked_response.add( + method=responses.POST, + url="https://{}/enrollments/query?".format(mock_dps_target['entity']), + body=json.dumps([generate_enrollment_show()]), + status=200, + content_type="application/json", + match_querystring=False, + ) + yield mocked_response + + @pytest.mark.parametrize("top", [3, None]) + def test_enrollment_list(self, serviceclient, fixture_cmd, top): + result = subject.iot_dps_device_enrollment_list( + cmd=fixture_cmd, + dps_name=mock_dps_target['entity'], + resource_group_name=resource_group, + top=top + ) + request = serviceclient.calls[0].request + headers = request.headers + url = request.url + method = request.method + + assert str(headers.get("x-ms-max-item-count")) == str(top) + assert "{}/enrollments/query?".format(mock_dps_target['entity']) in url + assert method == "POST" + assert json.dumps(result) + + def test_enrollment_list_error(self, fixture_cmd, serviceclient_generic_error): + with pytest.raises(CLIError): + subject.iot_dps_device_enrollment_list( + cmd=fixture_cmd, + dps_name=mock_dps_target['entity'], + resource_group_name=resource_group, + ) + + +class TestEnrollmentDelete(): + @pytest.fixture(params=[204]) + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): + mocked_response.add( + method=responses.DELETE, + url="https://{}/enrollments/{}".format(mock_dps_target['entity'], enrollment_id), + body='{}', + status=request.param, + content_type="application/json", + match_querystring=False, + ) + yield mocked_response + + @pytest.mark.parametrize( + "etag", + [None, etag] + ) + def test_enrollment_delete(self, serviceclient, fixture_cmd, etag): + subject.iot_dps_device_enrollment_delete( + cmd=fixture_cmd, + enrollment_id=enrollment_id, + dps_name=mock_dps_target['entity'], + resource_group_name=resource_group, + etag=etag + ) + request = serviceclient.calls[0].request + url = request.url + method = request.method + assert "{}/enrollments/{}?".format(mock_dps_target['entity'], enrollment_id) in url + assert method == 'DELETE' + assert request.headers["If-Match"] == etag if etag else "*" + + def test_enrollment_delete_error(self, serviceclient_generic_error, fixture_cmd): + with pytest.raises(CLIError): + subject.iot_dps_device_enrollment_delete( + cmd=fixture_cmd, + enrollment_id=enrollment_id, + dps_name=mock_dps_target['entity'], + resource_group_name=resource_group, + ) diff --git a/azext_iot/tests/dps/enrollment_group/__init__.py b/azext_iot/tests/dps/enrollment_group/__init__.py new file mode 100644 index 000000000..55614acbf --- /dev/null +++ b/azext_iot/tests/dps/enrollment_group/__init__.py @@ -0,0 +1,5 @@ +# coding=utf-8 +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- diff --git a/azext_iot/tests/dps/test_iot_dps_int.py b/azext_iot/tests/dps/enrollment_group/test_iot_dps_enrollment_group_int.py similarity index 55% rename from azext_iot/tests/dps/test_iot_dps_int.py rename to azext_iot/tests/dps/enrollment_group/test_iot_dps_enrollment_group_int.py index 4f37587b7..9ee7c63fa 100644 --- a/azext_iot/tests/dps/test_iot_dps_int.py +++ b/azext_iot/tests/dps/enrollment_group/test_iot_dps_enrollment_group_int.py @@ -11,397 +11,23 @@ CERT_PATH, DATAPLANE_AUTH_TYPES, WEBHOOK_URL, + TEST_ENDORSEMENT_KEY, IoTDPSLiveScenarioTest ) -test_endorsement_key = ( - "AToAAQALAAMAsgAgg3GXZ0SEs/gakMyNRqXXJP1S124GUgtk8qHaGzMUaaoABgCAAEMAEAgAAAAAAAEAibym9HQP9vxCGF5dVc1Q" - "QsAGe021aUGJzNol1/gycBx3jFsTpwmWbISRwnFvflWd0w2Mc44FAAZNaJOAAxwZvG8GvyLlHh6fGKdh+mSBL4iLH2bZ4Ry22cB3" - "CJVjXmdGoz9Y/j3/NwLndBxQC+baNvzvyVQZ4/A2YL7vzIIj2ik4y+ve9ir7U0GbNdnxskqK1KFIITVVtkTIYyyFTIR0BySjPrRI" - "Dj7r7Mh5uF9HBppGKQCBoVSVV8dI91lNazmSdpGWyqCkO7iM4VvUMv2HT/ym53aYlUrau+Qq87Tu+uQipWYgRdF11KDfcpMHqqzB" - "QQ1NpOJVhrsTrhyJzO7KNw==" -) - -class TestDPSEnrollments(IoTDPSLiveScenarioTest): +class TestDPSEnrollmentGroups(IoTDPSLiveScenarioTest): def __init__(self, test_method): - super(TestDPSEnrollments, self).__init__(test_method) + super(TestDPSEnrollmentGroups, self).__init__(test_method) def test_dps_compute_device_key(self): offline_device_key = self.cmd( - 'az iot dps compute-device-key --key "{}" ' - "--registration-id myarbitrarydeviceId".format(test_endorsement_key) + 'az iot dps enrollment-group compute-device-key --key "{}" ' + "--registration-id myarbitrarydeviceId".format(TEST_ENDORSEMENT_KEY) ).output offline_device_key = offline_device_key.strip("\"'\n") assert offline_device_key == "cT/EXZvsplPEpT//p98Pc6sKh8mY3kYgSxavHwMkl7w=" - def test_dps_enrollment_tpm_lifecycle(self): - attestation_type = AttestationType.tpm.value - for auth_phase in DATAPLANE_AUTH_TYPES: - enrollment_id = self.generate_enrollment_names()[0] - device_id = self.generate_device_names()[0] - - enrollment = self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment create --enrollment-id {} --attestation-type {}" - " -g {} --dps-name {} --endorsement-key {}" - " --provisioning-status {} --device-id {} --initial-twin-tags {}" - " --initial-twin-properties {} --device-information {} " - "--allocation-policy {} --iot-hubs {}".format( - enrollment_id, - attestation_type, - self.entity_rg, - self.entity_dps_name, - test_endorsement_key, - EntityStatusType.enabled.value, - device_id, - '"{generic_dict}"', - '"{generic_dict}"', - '"{generic_dict}"', - AllocationType.static.value, - self.hub_host_name, - ), - auth_type=auth_phase - ), - checks=[ - self.check("attestation.type", attestation_type), - self.check("registrationId", enrollment_id), - self.check("provisioningStatus", EntityStatusType.enabled.value), - self.check("deviceId", device_id), - self.check("allocationPolicy", AllocationType.static.value), - self.check("iotHubs", self.hub_host_name.split()), - self.check("initialTwin.tags", self.kwargs["generic_dict"]), - self.check("optionalDeviceInformation", self.kwargs["generic_dict"]), - self.check( - "initialTwin.properties.desired", self.kwargs["generic_dict"] - ), - self.exists("reprovisionPolicy"), - self.check("reprovisionPolicy.migrateDeviceData", True), - self.check("reprovisionPolicy.updateHubAssignment", True), - ], - ).get_output_in_json() - etag = enrollment["etag"] - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment list -g {} --dps-name {}".format( - self.entity_rg, self.entity_dps_name - ), - auth_type=auth_phase - ), - checks=[ - self.check("length(@)", 1), - self.check("[0].registrationId", enrollment_id), - ], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment show -g {} --dps-name {} --enrollment-id {}".format( - self.entity_rg, self.entity_dps_name, enrollment_id - ), - auth_type=auth_phase - ), - checks=[self.check("registrationId", enrollment_id)], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment show -g {} --dps-name {} --enrollment-id {} --show-keys".format( - self.entity_rg, self.entity_dps_name, enrollment_id - ), - auth_type=auth_phase - ), - checks=[ - self.check("registrationId", enrollment_id), - self.check("attestation.type", attestation_type), - self.exists("attestation.{}".format(attestation_type)), - ], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment update -g {} --dps-name {} --enrollment-id {}" - " --provisioning-status {} --etag {} --info {}".format( - self.entity_rg, - self.entity_dps_name, - enrollment_id, - EntityStatusType.disabled.value, - etag, - '""' - ), - auth_type=auth_phase - ), - checks=[ - self.check("attestation.type", attestation_type), - self.check("registrationId", enrollment_id), - self.check("provisioningStatus", EntityStatusType.disabled.value), - self.check("deviceId", device_id), - self.check("allocationPolicy", AllocationType.static.value), - self.check("iotHubs", self.hub_host_name.split()), - self.exists("initialTwin.tags"), - self.exists("initialTwin.properties.desired"), - self.exists("optionalDeviceInformation"), - ], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment delete -g {} --dps-name {} --enrollment-id {}".format( - self.entity_rg, self.entity_dps_name, enrollment_id - ), - auth_type=auth_phase - ), - ) - - def test_dps_enrollment_x509_lifecycle(self): - attestation_type = AttestationType.x509.value - for auth_phase in DATAPLANE_AUTH_TYPES: - enrollment_id = self.generate_enrollment_names()[0] - device_id = self.generate_device_names()[0] - - etag = self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment create --enrollment-id {} --attestation-type {}" - " -g {} --dps-name {} --cp {} --scp {}" - " --provisioning-status {} --device-id {}" - " --initial-twin-tags {} --initial-twin-properties {}" - " --allocation-policy {} --iot-hubs {}".format( - enrollment_id, - attestation_type, - self.entity_rg, - self.entity_dps_name, - CERT_PATH, - CERT_PATH, - EntityStatusType.enabled.value, - device_id, - '"{generic_dict}"', - '"{generic_dict}"', - AllocationType.hashed.value, - self.hub_host_name, - ), - auth_type=auth_phase - ), - checks=[ - self.check("attestation.type", attestation_type), - self.check("registrationId", enrollment_id), - self.check("provisioningStatus", EntityStatusType.enabled.value), - self.check("deviceId", device_id), - self.check("allocationPolicy", AllocationType.hashed.value), - self.check("iotHubs", self.hub_host_name.split()), - self.check("initialTwin.tags", self.kwargs["generic_dict"]), - self.check( - "initialTwin.properties.desired", self.kwargs["generic_dict"] - ), - self.exists("reprovisionPolicy"), - self.check("reprovisionPolicy.migrateDeviceData", True), - self.check("reprovisionPolicy.updateHubAssignment", True), - ], - ).get_output_in_json()["etag"] - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment list -g {} --dps-name {}".format(self.entity_rg, self.entity_dps_name), - auth_type=auth_phase - ), - checks=[ - self.check("length(@)", 1), - self.check("[0].registrationId", enrollment_id), - ], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment show -g {} --dps-name {} --enrollment-id {}".format( - self.entity_rg, self.entity_dps_name, enrollment_id - ), - auth_type=auth_phase - ), - checks=[self.check("registrationId", enrollment_id)], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment update -g {} --dps-name {} --enrollment-id {}" - " --provisioning-status {} --etag {} --info {} --rc".format( - self.entity_rg, - self.entity_dps_name, - enrollment_id, - EntityStatusType.disabled.value, - etag, - '"{generic_dict}"', - ), - auth_type=auth_phase - ), - checks=[ - self.check("attestation.type", attestation_type), - self.check("registrationId", enrollment_id), - self.check("provisioningStatus", EntityStatusType.disabled.value), - self.check("deviceId", device_id), - self.check("allocationPolicy", AllocationType.hashed.value), - self.check("iotHubs", self.hub_host_name.split()), - self.exists("initialTwin.tags"), - self.exists("initialTwin.properties.desired"), - self.check("optionalDeviceInformation", self.kwargs["generic_dict"]), - self.check("attestation.type.x509.clientCertificates.primary", None), - ], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment delete -g {} --dps-name {} --enrollment-id {}".format( - self.entity_rg, self.entity_dps_name, enrollment_id - ), - auth_type=auth_phase - ), - ) - - def test_dps_enrollment_symmetrickey_lifecycle(self): - attestation_type = AttestationType.symmetricKey.value - for auth_phase in DATAPLANE_AUTH_TYPES: - enrollment_id, enrollment_id2 = self.generate_enrollment_names(count=2) - primary_key = generate_key() - secondary_key = generate_key() - device_id = self.generate_enrollment_names()[0] - - # Use provided keys - etag = self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment create --enrollment-id {} --attestation-type {}" - " -g {} --dps-name {} --pk {} --sk {}" - " --provisioning-status {} --device-id {}" - " --initial-twin-tags {} --initial-twin-properties {} --device-information {}" - " --allocation-policy {} --rp {} --iot-hubs {} --edge-enabled".format( - enrollment_id, - attestation_type, - self.entity_rg, - self.entity_dps_name, - primary_key, - secondary_key, - EntityStatusType.enabled.value, - device_id, - '"{generic_dict}"', - '"{generic_dict}"', - '"{generic_dict}"', - AllocationType.geolatency.value.lower(), - ReprovisionType.reprovisionandresetdata.value, - self.hub_host_name, - ), - auth_type=auth_phase - ), - checks=[ - self.check("attestation.type", attestation_type), - self.check("registrationId", enrollment_id), - self.check("provisioningStatus", EntityStatusType.enabled.value), - self.check("deviceId", device_id), - self.check("allocationPolicy", AllocationType.geolatency.value), - self.check("iotHubs", self.hub_host_name.split()), - self.check("initialTwin.tags", self.kwargs["generic_dict"]), - self.check("optionalDeviceInformation", self.kwargs["generic_dict"]), - self.check( - "initialTwin.properties.desired", self.kwargs["generic_dict"] - ), - self.exists("reprovisionPolicy"), - self.check("reprovisionPolicy.migrateDeviceData", False), - self.check("reprovisionPolicy.updateHubAssignment", True), - self.check("capabilities.iotEdge", True), - ], - ).get_output_in_json()["etag"] - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment list -g {} --dps-name {}".format(self.entity_rg, self.entity_dps_name), - auth_type=auth_phase - ), - checks=[ - self.check("length(@)", 1), - self.check("[0].registrationId", enrollment_id), - ], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment show -g {} --dps-name {} --enrollment-id {}".format( - self.entity_rg, self.entity_dps_name, enrollment_id - ), - auth_type=auth_phase - ), - checks=[self.check("registrationId", enrollment_id)], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment update -g {} --dps-name {} --enrollment-id {}" - " --provisioning-status {} --etag {} --edge-enabled False" - " --allocation-policy {} --webhook-url {} --api-version {}".format( - self.entity_rg, - self.entity_dps_name, - enrollment_id, - EntityStatusType.disabled.value, - etag, - AllocationType.custom.value, - WEBHOOK_URL, - API_VERSION, - ), - auth_type=auth_phase - ), - checks=[ - self.check("attestation.type", attestation_type), - self.check("registrationId", enrollment_id), - self.check("provisioningStatus", EntityStatusType.disabled.value), - self.check("deviceId", device_id), - self.check("allocationPolicy", "custom"), - self.check("customAllocationDefinition.webhookUrl", WEBHOOK_URL), - self.check("customAllocationDefinition.apiVersion", API_VERSION), - self.check("iotHubs", None), - self.exists("initialTwin.tags"), - self.exists("initialTwin.properties.desired"), - self.check("attestation.symmetricKey.primaryKey", primary_key), - self.check("capabilities.iotEdge", False), - ], - ) - - # Use service generated keys - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment create --enrollment-id {} --attestation-type {}" - " -g {} --dps-name {} --allocation-policy {} --webhook-url {} --api-version {}".format( - enrollment_id2, - attestation_type, - self.entity_rg, - self.entity_dps_name, - AllocationType.custom.value, - WEBHOOK_URL, - API_VERSION, - ), - auth_type=auth_phase - ), - checks=[ - self.check("attestation.type", attestation_type), - self.check("registrationId", enrollment_id2), - self.check("allocationPolicy", "custom"), - self.check("customAllocationDefinition.webhookUrl", WEBHOOK_URL), - self.check("customAllocationDefinition.apiVersion", API_VERSION), - ], - ) - - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment delete -g {} --dps-name {} --enrollment-id {}".format( - self.entity_rg, self.entity_dps_name, enrollment_id - ), - auth_type=auth_phase - ), - ) - self.cmd( - self.set_cmd_auth_type( - "iot dps enrollment delete -g {} --dps-name {} --enrollment-id {}".format( - self.entity_rg, self.entity_dps_name, enrollment_id2 - ), - auth_type=auth_phase - ), - ) - def test_dps_enrollment_group_x509_lifecycle(self): for auth_phase in DATAPLANE_AUTH_TYPES: enrollment_id = self.generate_enrollment_names(group=True)[0] @@ -470,7 +96,7 @@ def test_dps_enrollment_group_x509_lifecycle(self): # Compute Device Key only works for symmetric key enrollment groups self.cmd( self.set_cmd_auth_type( - 'az iot dps compute-device-key -g {} --dps-name {} --enrollment-id {} ' + 'az iot dps enrollment-group compute-device-key -g {} --dps-name {} --enrollment-id {} ' "--registration-id myarbitrarydeviceId".format( self.entity_rg, self.entity_dps_name, enrollment_id ), @@ -711,7 +337,7 @@ def test_dps_enrollment_group_symmetrickey_lifecycle(self): # Compute Device Key tests online_device_key = self.cmd( self.set_cmd_auth_type( - 'az iot dps compute-device-key -g {} --dps-name {} --enrollment-id {} ' + 'az iot dps enrollment-group compute-device-key -g {} --dps-name {} --enrollment-id {} ' "--registration-id myarbitrarydeviceId".format( self.entity_rg, self.entity_dps_name, enrollment_id2 ), @@ -720,14 +346,14 @@ def test_dps_enrollment_group_symmetrickey_lifecycle(self): ).output offline_device_key = self.cmd( - 'az iot dps compute-device-key --key "{}" ' + 'az iot dps enrollment-group compute-device-key --key "{}" ' "--registration-id myarbitrarydeviceId".format(keys["primaryKey"]) ).output assert offline_device_key == online_device_key # Compute Device Key uses primary key offline_device_key = self.cmd( - 'az iot dps compute-device-key --key "{}" ' + 'az iot dps enrollment-group compute-device-key --key "{}" ' "--registration-id myarbitrarydeviceId".format(keys["secondaryKey"]) ).output assert offline_device_key != online_device_key @@ -753,7 +379,7 @@ def test_dps_enrollment_group_symmetrickey_lifecycle(self): online_device_key = self.cmd( self.set_cmd_auth_type( - 'az iot dps compute-device-key -g {} --dps-name {} --enrollment-id {} ' + 'az iot dps enrollment-group compute-device-key -g {} --dps-name {} --enrollment-id {} ' "--registration-id myarbitrarydeviceId".format( self.entity_rg, self.entity_dps_name, enrollment_id2 ), diff --git a/azext_iot/tests/dps/test_iot_dps_unit.py b/azext_iot/tests/dps/enrollment_group/test_iot_dps_enrollment_group_unit.py similarity index 50% rename from azext_iot/tests/dps/test_iot_dps_unit.py rename to azext_iot/tests/dps/enrollment_group/test_iot_dps_enrollment_group_unit.py index da8c6a453..cc9a2094d 100644 --- a/azext_iot/tests/dps/test_iot_dps_unit.py +++ b/azext_iot/tests/dps/enrollment_group/test_iot_dps_enrollment_group_unit.py @@ -4,691 +4,20 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -''' -NOTICE: These tests are to be phased out and introduced in more modern form. - Try not to add any new content, only fixes if necessary. - Look at IoT Hub jobs or configuration tests for a better example. Also use responses fixtures - like mocked_response for http request mocking. -''' import pytest import json import responses from azext_iot.operations import dps as subject from knack.util import CLIError -from azext_iot.common.sas_token_auth import SasTokenAuthentication +from azext_iot.tests.conftest import mock_dps_target, mock_symmetric_key_attestation + enrollment_id = 'myenrollment' resource_group = 'myrg' registration_id = 'myregistration' etag = 'AAAA==' -mock_target = {} -mock_target['cs'] = 'HostName=mydps;SharedAccessKeyName=name;SharedAccessKey=value' -mock_target['entity'] = 'mydps' -mock_target['primarykey'] = 'rJx/6rJ6rmG4ak890+eW5MYGH+A0uzRvjGNjg3Ve8sfo=' -mock_target['secondarykey'] = 'aCd/6rJ6rmG4ak890+eW5MYGH+A0uzRvjGNjg3Ve8sfo=' -mock_target['policy'] = 'provisioningserviceowner' -mock_target['subscription'] = "5952cff8-bcd1-4235-9554-af2c0348bf23" - -mock_symmetric_key_attestation = { - "type": "symmetricKey", - "symmetricKey": { - "primaryKey": "primary_key", - "secondaryKey": "secondary_key" - }, -} - -# Patch Paths # -path_service_client = 'msrest.service_client.ServiceClient.send' -path_sas = 'azext_iot._factory.SasTokenAuthentication' -path_dps_sub_id = 'azure.cli.core._profile.Profile.get_subscription_id' -path_iot_service_provisioning_factory = "azext_iot._factory.iot_service_provisioning_factory" -path_gdcs = "azext_iot.dps.providers.discovery.DPSDiscovery.get_target" -path_discovery_dps_init = ( - "azext_iot.dps.providers.discovery.DPSDiscovery._initialize_client" -) - - -@pytest.fixture() -def fixture_gdcs(mocker): - gdcs = mocker.patch(path_gdcs) - gdcs.return_value = mock_target - mocker.patch(path_iot_service_provisioning_factory) - mocker.patch(path_discovery_dps_init) - - return gdcs - - -@pytest.fixture() -def fixture_get_sub_id(mocker): - gsi = mocker.patch(path_dps_sub_id) - gsi.return_value = mock_target['subscription'] - - -@pytest.fixture() -def fixture_sas(mocker): - r = SasTokenAuthentication(mock_target['entity'], - mock_target['policy'], - mock_target['primarykey']) - sas = mocker.patch(path_sas) - sas.return_value = r - - -def generate_enrollment_create_req(attestation_type=None, endorsement_key=None, - certificate_path=None, secondary_certificate_path=None, - device_Id=None, iot_hub_host_name=None, - initial_twin_tags=None, initial_twin_properties=None, - provisioning_status=None, reprovision_policy=None, - primary_key=None, secondary_key=None, allocation_policy=None, - iot_hubs=None, edge_enabled=False, webhook_url=None, api_version=None): - return {'client': None, - 'enrollment_id': enrollment_id, - 'rg': resource_group, - 'dps_name': mock_target['entity'], - 'attestation_type': attestation_type, - 'endorsement_key': endorsement_key, - 'certificate_path': certificate_path, - 'secondary_certificate_path': secondary_certificate_path, - 'device_id': device_Id, - 'iot_hub_host_name': iot_hub_host_name, - 'initial_twin_tags': initial_twin_tags, - 'initial_twin_properties': initial_twin_properties, - 'provisioning_status': provisioning_status, - 'reprovision_policy': reprovision_policy, - 'primary_key': primary_key, - 'secondary_key': secondary_key, - 'allocation_policy': allocation_policy, - 'iot_hubs': iot_hubs, - 'edge_enabled': edge_enabled, - 'webhook_url': webhook_url, - 'api_version': api_version} - - -class TestEnrollmentCreate(): - @pytest.fixture() - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas): - mocked_response.add( - method=responses.PUT, - url="https://{}/enrollments/{}".format(mock_target['entity'], enrollment_id), - body='{}', - status=200, - content_type="application/json", - match_querystring=False, - ) - yield mocked_response - - @pytest.fixture(params=[400, 401, 500]) - def serviceclient_generic_error(self, mocked_response, fixture_gdcs, fixture_sas, request): - mocked_response.add( - method=responses.PUT, - url="https://{}/enrollments/{}".format(mock_target['entity'], enrollment_id), - body='{}', - status=request.param, - content_type="application/json", - match_querystring=False, - ) - yield mocked_response - - @pytest.mark.parametrize("req", [ - (generate_enrollment_create_req(attestation_type='tpm', - endorsement_key='mykey')), - (generate_enrollment_create_req(attestation_type='tpm', - endorsement_key='mykey', - device_Id='1', - iot_hub_host_name='myHub', - provisioning_status='disabled')), - (generate_enrollment_create_req(attestation_type='tpm', - endorsement_key='mykey', - provisioning_status='enabled', - initial_twin_tags={'key': 'value'})), - (generate_enrollment_create_req(attestation_type='x509', - certificate_path='myCert')), - (generate_enrollment_create_req(attestation_type='x509', - secondary_certificate_path='myCert2')), - (generate_enrollment_create_req(attestation_type='x509', - certificate_path='myCert', - device_Id='1', - iot_hub_host_name='myHub', - provisioning_status='disabled')), - (generate_enrollment_create_req(attestation_type='x509', - certificate_path='myCert', - provisioning_status='enabled', - initial_twin_properties={'key': 'value'})), - (generate_enrollment_create_req(attestation_type='symmetricKey')), - (generate_enrollment_create_req(attestation_type='symmetricKey', - primary_key='primarykey', - secondary_key='secondarykey')), - (generate_enrollment_create_req(attestation_type='tpm', - endorsement_key='mykey', - reprovision_policy='reprovisionandmigratedata')), - (generate_enrollment_create_req(attestation_type='x509', - certificate_path='myCert', - reprovision_policy='reprovisionandresetdata')), - (generate_enrollment_create_req(attestation_type='symmetricKey', - primary_key='primarykey', - secondary_key='secondarykey', - reprovision_policy='never')), - (generate_enrollment_create_req(attestation_type='symmetricKey', - primary_key='primarykey', - secondary_key='secondarykey', - reprovision_policy='never', - allocation_policy='static', - iot_hubs='hub1')), - (generate_enrollment_create_req(attestation_type='symmetricKey', - primary_key='primarykey', - secondary_key='secondarykey', - reprovision_policy='never', - allocation_policy='hashed', - iot_hubs='hub1 hub2')), - (generate_enrollment_create_req(attestation_type='symmetricKey', - primary_key='primarykey', - secondary_key='secondarykey', - reprovision_policy='never', - allocation_policy='geoLatency')), - (generate_enrollment_create_req(attestation_type='symmetricKey', - primary_key='primarykey', - secondary_key='secondarykey', - reprovision_policy='never', - allocation_policy='custom', - webhook_url="https://www.test.test", - api_version="2019-03-31")), - (generate_enrollment_create_req(attestation_type='symmetricKey', - primary_key='primarykey', - secondary_key='secondarykey', - edge_enabled=True)), - (generate_enrollment_create_req(attestation_type='symmetricKey', - primary_key='primarykey', - secondary_key='secondarykey', - edge_enabled=True, - initial_twin_properties={'key': ['value1', 'value2']})), - (generate_enrollment_create_req(attestation_type='tpm', - endorsement_key='mykey', - provisioning_status='enabled', - initial_twin_properties={'key': ['value1', 'value2']})) - ]) - def test_enrollment_create(self, serviceclient, fixture_cmd, req): - subject.iot_dps_device_enrollment_create( - cmd=fixture_cmd, - enrollment_id=req['enrollment_id'], - attestation_type=req['attestation_type'], - dps_name=req['dps_name'], - resource_group_name=req['rg'], - endorsement_key=req['endorsement_key'], - certificate_path=req['certificate_path'], - secondary_certificate_path=req['secondary_certificate_path'], - primary_key=req['primary_key'], - secondary_key=req['secondary_key'], - device_id=req['device_id'], - iot_hub_host_name=req['iot_hub_host_name'], - initial_twin_tags=req['initial_twin_tags'], - initial_twin_properties=req['initial_twin_properties'], - provisioning_status=req['provisioning_status'], - reprovision_policy=req['reprovision_policy'], - allocation_policy=req['allocation_policy'], - iot_hubs=req['iot_hubs'], - edge_enabled=req['edge_enabled'], - webhook_url=req['webhook_url'], - api_version=req['api_version'] - ) - request = serviceclient.calls[0].request - url = request.url - assert "{}/enrollments/{}?".format(mock_target['entity'], enrollment_id) in url - assert request.method == 'PUT' - - body = json.loads(request.body) - assert body['registrationId'] == req['enrollment_id'] - if req['attestation_type'] == 'tpm': - assert body['attestation']['type'] == req['attestation_type'] - assert body['attestation']['tpm']['endorsementKey'] == req['endorsement_key'] - elif req['attestation_type'] == 'x509': - assert body['attestation']['type'] == req['attestation_type'] - assert body['attestation']['x509']['clientCertificates'] is not None - if req['certificate_path']: - assert body['attestation']['x509']['clientCertificates']['primary'] is not None - if req['secondary_certificate_path']: - assert body['attestation']['x509']['clientCertificates']['secondary'] is not None - else: - assert body['attestation']['type'] == req['attestation_type'] - assert body['attestation']['symmetricKey'] is not None - if req['primary_key']: - assert body['attestation']['symmetricKey']['primaryKey'] is not None - if req['secondary_key']: - assert body['attestation']['symmetricKey']['secondaryKey'] is not None - - if req['device_id']: - assert body['deviceId'] == req['device_id'] - if req['iot_hub_host_name']: - assert body['allocationPolicy'] == 'static' - assert body['iotHubs'] == req['iot_hub_host_name'].split() - if req['provisioning_status']: - assert body['provisioningStatus'] == req['provisioning_status'] - if req['initial_twin_properties']: - assert body['initialTwin']['properties']['desired'] == req['initial_twin_properties'] - if req['initial_twin_tags']: - assert body['initialTwin']['tags'] == req['initial_twin_tags'] - if not req['reprovision_policy']: - assert body['reprovisionPolicy']['migrateDeviceData'] - assert body['reprovisionPolicy']['updateHubAssignment'] - if req['reprovision_policy'] == 'reprovisionandmigratedata': - assert body['reprovisionPolicy']['migrateDeviceData'] - assert body['reprovisionPolicy']['updateHubAssignment'] - if req['reprovision_policy'] == 'reprovisionandresetdata': - assert not body['reprovisionPolicy']['migrateDeviceData'] - assert body['reprovisionPolicy']['updateHubAssignment'] - if req['reprovision_policy'] == 'never': - assert not body['reprovisionPolicy']['migrateDeviceData'] - assert not body['reprovisionPolicy']['updateHubAssignment'] - if req['allocation_policy']: - assert body['allocationPolicy'] == req['allocation_policy'] - if body['allocationPolicy'] == 'custom': - assert body['customAllocationDefinition']['webhookUrl'] == req['webhook_url'] - assert body['customAllocationDefinition']['apiVersion'] == req['api_version'] - if req['iot_hubs']: - assert body['iotHubs'] == req['iot_hubs'].split() - if req['edge_enabled']: - assert body['capabilities']['iotEdge'] - - @pytest.mark.parametrize("req", [ - (generate_enrollment_create_req(attestation_type='x509')), - (generate_enrollment_create_req(attestation_type='x509', endorsement_key='myKey')), - (generate_enrollment_create_req(attestation_type='tpm')), - (generate_enrollment_create_req(attestation_type='tpm', certificate_path='myCert')), - (generate_enrollment_create_req(reprovision_policy='invalid')), - (generate_enrollment_create_req(allocation_policy='invalid')), - (generate_enrollment_create_req(allocation_policy='static')), - (generate_enrollment_create_req(allocation_policy='custom')), - (generate_enrollment_create_req(allocation_policy='custom', webhook_url="https://www.test.test")), - (generate_enrollment_create_req(allocation_policy='static', iot_hubs='hub1 hub2')), - (generate_enrollment_create_req(allocation_policy='static', iot_hub_host_name='hubname')), - (generate_enrollment_create_req(iot_hubs='hub1 hub2')) - ]) - def test_enrollment_create_invalid_args(self, fixture_gdcs, fixture_cmd, req): - with pytest.raises(CLIError): - subject.iot_dps_device_enrollment_create( - cmd=fixture_cmd, - enrollment_id=req['enrollment_id'], - attestation_type=req['attestation_type'], - dps_name=req['dps_name'], - resource_group_name=req['rg'], - endorsement_key=req['endorsement_key'], - certificate_path=req['certificate_path'], - primary_key=req['primary_key'], - iot_hub_host_name=req['iot_hub_host_name'], - reprovision_policy=req['reprovision_policy'], - allocation_policy=req['allocation_policy'], - iot_hubs=req['iot_hubs'], - edge_enabled=req['edge_enabled'], - webhook_url=req['webhook_url'], - api_version=req['api_version'] - ) - - @pytest.mark.parametrize("req", [ - (generate_enrollment_create_req(attestation_type='tpm', endorsement_key='mykey')) - ]) - def test_enrollment_show_error(self, serviceclient_generic_error, fixture_cmd, req): - with pytest.raises(CLIError): - subject.iot_dps_device_enrollment_create( - cmd=fixture_cmd, - enrollment_id=req['enrollment_id'], - attestation_type=req['attestation_type'], - dps_name=req['dps_name'], - resource_group_name=req['rg'], - endorsement_key=req['endorsement_key'], - device_id=req['device_id'], - iot_hub_host_name=req['iot_hub_host_name'], - initial_twin_tags=req['initial_twin_tags'], - initial_twin_properties=req['initial_twin_properties'], - provisioning_status=req['provisioning_status'], - ) - - -def generate_enrollment_show(**kvp): - payload = {'attestation': - {'x509': {'clientCertificates': {'primary': - {'info': - {'issuerName': 'test', 'notAfterUtc': '2037-01-01T00:00:00Z', - 'notBeforeUtc': '2017-01-01T00:00:00Z', - 'serialNumber': '1A2B3C4D5E', - 'sha1Thumbprint': '109F2ED9D3FC92C88C1DE2203488B93D6B3F05F5', - 'sha256Thumbprint': 'F7D272B400C88FAC2A12A990F14DD2E881CA1F', - 'subjectName': 'test', - 'version': '3'}}, 'secondary': None}, - }, 'tpm': None, 'type': 'x509'}, - 'registrationId': enrollment_id, 'etag': etag, - 'provisioningStatus': 'disabled', 'iotHubHostName': 'myHub', - 'deviceId': 'myDevice'} - for k in kvp: - if payload.get(k): - payload[k] = kvp[k] - return payload - - -def generate_enrollment_update_req(certificate_path=None, iot_hub_host_name=None, - initial_twin_tags=None, - secondary_certificate_path=None, - remove_certificate_path=None, - remove_secondary_certificate_path=None, - initial_twin_properties=None, provisioning_status=None, - device_id=None, - etag=None, reprovision_policy=None, - allocation_policy=None, iot_hubs=None, - edge_enabled=None, webhook_url=None, api_version=None): - return {'client': None, - 'enrollment_id': enrollment_id, - 'rg': resource_group, - 'dps_name': mock_target['entity'], - 'certificate_path': certificate_path, - 'secondary_certificate_path': secondary_certificate_path, - 'remove_certificate_path': remove_certificate_path, - 'remove_secondary_certificate_path': remove_secondary_certificate_path, - 'iot_hub_host_name': iot_hub_host_name, - 'initial_twin_tags': initial_twin_tags, - 'initial_twin_properties': initial_twin_properties, - 'provisioning_status': provisioning_status, - 'device_id': device_id, - 'etag': etag, - 'reprovision_policy': reprovision_policy, - 'allocation_policy': allocation_policy, - 'iot_hubs': iot_hubs, - 'edge_enabled': edge_enabled, - 'webhook_url': webhook_url, - 'api_version': api_version} - - -class TestEnrollmentUpdate(): - @pytest.fixture() - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): - # Initial GET - mocked_response.add( - method=responses.GET, - url="https://{}/enrollments/{}".format(mock_target['entity'], enrollment_id), - body=json.dumps(generate_enrollment_show()), - status=200, - content_type="application/json", - match_querystring=False, - ) - - # Update PUT - mocked_response.add( - method=responses.PUT, - url="https://{}/enrollments/{}".format(mock_target['entity'], enrollment_id), - body=json.dumps(generate_enrollment_show()), - status=200, - content_type="application/json", - match_querystring=False, - ) - yield mocked_response - - @pytest.mark.parametrize("req", [ - (generate_enrollment_update_req(etag=etag, secondary_certificate_path='someOtherCertPath')), - (generate_enrollment_update_req(certificate_path='newCertPath', secondary_certificate_path='someOtherCertPath')), - (generate_enrollment_update_req(remove_certificate_path='true')), - (generate_enrollment_update_req(iot_hub_host_name='someOtherHubName', - initial_twin_tags={'newKey': 'newValue'}, - initial_twin_properties={'newKey': 'newValue'}, - provisioning_status='enabled', - device_id='newId')), - (generate_enrollment_update_req(reprovision_policy='reprovisionandmigratedata')), - (generate_enrollment_update_req(reprovision_policy='reprovisionandresetdata')), - (generate_enrollment_update_req(reprovision_policy='never')), - (generate_enrollment_update_req(allocation_policy='static', iot_hubs='hub1')), - (generate_enrollment_update_req(allocation_policy='hashed', iot_hubs='hub1 hub2')), - (generate_enrollment_update_req(allocation_policy='geoLatency')), - (generate_enrollment_update_req(allocation_policy='custom', - webhook_url="https://www.test.test", - api_version="2019-03-31")), - (generate_enrollment_update_req(edge_enabled=True)), - (generate_enrollment_update_req(edge_enabled=False)) - ]) - def test_enrollment_update(self, serviceclient, fixture_cmd, req): - subject.iot_dps_device_enrollment_update( - cmd=fixture_cmd, - enrollment_id=req['enrollment_id'], - dps_name=req['dps_name'], - resource_group_name=req['rg'], - etag=req['etag'], - endorsement_key=None, - certificate_path=req['certificate_path'], - secondary_certificate_path=req['secondary_certificate_path'], - remove_certificate=req['remove_certificate_path'], - remove_secondary_certificate=req['remove_secondary_certificate_path'], - primary_key=None, - secondary_key=None, - device_id=req['device_id'], - iot_hub_host_name=req['iot_hub_host_name'], - initial_twin_tags=req['initial_twin_tags'], - initial_twin_properties=req['initial_twin_properties'], - provisioning_status=req['provisioning_status'], - reprovision_policy=req['reprovision_policy'], - allocation_policy=req['allocation_policy'], - iot_hubs=req['iot_hubs'], - edge_enabled=req['edge_enabled'], - webhook_url=req['webhook_url'], - api_version=req['api_version'] - ) - get_request = serviceclient.calls[0].request - assert get_request.method == 'GET' - assert "{}/enrollments/{}?".format(mock_target['entity'], enrollment_id) in get_request.url - - update_request = serviceclient.calls[1].request - url = update_request.url - - assert "{}/enrollments/{}?".format(mock_target['entity'], enrollment_id) in url - assert update_request.method == 'PUT' - - assert update_request.headers["If-Match"] == req['etag'] if req['etag'] else "*" - - body = json.loads(update_request.body) - if not req['certificate_path']: - if req['remove_certificate_path']: - assert body['attestation']['x509']['clientCertificates'].get('primary') is None - else: - assert body['attestation']['x509']['clientCertificates']['primary']['info'] is not None - if req['certificate_path']: - assert body['attestation']['x509']['clientCertificates']['primary']['certificate'] is not None - if req['secondary_certificate_path']: - assert body['attestation']['x509']['clientCertificates']['secondary']['certificate'] is not None - if req['iot_hub_host_name']: - assert body['allocationPolicy'] == 'static' - assert body['iotHubs'] == req['iot_hub_host_name'].split() - if req['provisioning_status']: - assert body['provisioningStatus'] == req['provisioning_status'] - if req['initial_twin_properties']: - assert body['initialTwin']['properties']['desired'] == req['initial_twin_properties'] - if req['initial_twin_tags']: - assert body['initialTwin']['tags'] == req['initial_twin_tags'] - if req['device_id']: - assert body['deviceId'] == req['device_id'] - if req['reprovision_policy'] == 'reprovisionandmigratedata': - assert body['reprovisionPolicy']['migrateDeviceData'] - assert body['reprovisionPolicy']['updateHubAssignment'] - if req['reprovision_policy'] == 'reprovisionandresetdata': - assert not body['reprovisionPolicy']['migrateDeviceData'] - assert body['reprovisionPolicy']['updateHubAssignment'] - if req['reprovision_policy'] == 'never': - assert not body['reprovisionPolicy']['migrateDeviceData'] - assert not body['reprovisionPolicy']['updateHubAssignment'] - if req['allocation_policy']: - assert body['allocationPolicy'] == req['allocation_policy'] - if body['allocationPolicy'] == 'custom': - assert body['customAllocationDefinition']['webhookUrl'] == req['webhook_url'] - assert body['customAllocationDefinition']['apiVersion'] == req['api_version'] - if req['iot_hubs']: - assert body['iotHubs'] == req['iot_hubs'].split() - if req['edge_enabled'] is not None: - assert body['capabilities']['iotEdge'] == req['edge_enabled'] - - -class TestEnrollmentShow(): - @pytest.fixture(params=[200]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): - mocked_response.add( - method=responses.GET, - url="https://{}/enrollments/{}".format(mock_target['entity'], enrollment_id), - body=json.dumps(generate_enrollment_show()), - status=request.param, - content_type="application/json", - match_querystring=False, - ) - - yield mocked_response - - @pytest.fixture() - def serviceclient_attestation(self, mocked_response, fixture_gdcs, fixture_sas): - mocked_response.add( - method=responses.GET, - url="https://{}/enrollments/{}".format(mock_target['entity'], enrollment_id), - body=json.dumps(generate_enrollment_show(attestation=mock_symmetric_key_attestation)), - status=200, - content_type="application/json", - match_querystring=False, - ) - - mocked_response.add( - method=responses.POST, - url="https://{}/enrollments/{}/attestationmechanism".format(mock_target['entity'], enrollment_id), - body=json.dumps(mock_symmetric_key_attestation), - status=200, - content_type="application/json", - match_querystring=False, - ) - yield mocked_response - - def test_enrollment_show(self, fixture_cmd, serviceclient): - result = subject.iot_dps_device_enrollment_get( - cmd=fixture_cmd, - enrollment_id=enrollment_id, - dps_name=mock_target['entity'], - resource_group_name=resource_group, - ) - - assert result['registrationId'] == enrollment_id - - request = serviceclient.calls[0].request - url = request.url - method = request.method - - assert "{}/enrollments/{}?".format(mock_target['entity'], enrollment_id) in url - assert method == 'GET' - - def test_enrollment_show_with_keys(self, fixture_cmd, serviceclient_attestation): - result = subject.iot_dps_device_enrollment_get( - cmd=fixture_cmd, - enrollment_id=enrollment_id, - dps_name=mock_target['entity'], - resource_group_name=resource_group, - show_keys=True - ) - - assert result['registrationId'] == enrollment_id - assert result['attestation'] - - request = serviceclient_attestation.calls[0].request - url = request.url - method = request.method - - assert "{}/enrollments/{}?".format(mock_target['entity'], enrollment_id) in url - assert method == 'GET' - - request = serviceclient_attestation.calls[1].request - url = request.url - method = request.method - - assert "{}/enrollments/{}/attestationmechanism?".format(mock_target['entity'], enrollment_id) in url - assert method == 'POST' - - def test_enrollment_show_error(self, fixture_cmd, serviceclient_generic_error): - with pytest.raises(CLIError): - subject.iot_dps_device_enrollment_get( - cmd=fixture_cmd, - enrollment_id=enrollment_id, - dps_name=mock_target['entity'], - resource_group_name=resource_group, - ) - - -class TestEnrollmentList(): - @pytest.fixture(params=[200]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): - mocked_response.add( - method=responses.POST, - url="https://{}/enrollments/query?".format(mock_target['entity']), - body=json.dumps([generate_enrollment_show()]), - status=200, - content_type="application/json", - match_querystring=False, - ) - yield mocked_response - - @pytest.mark.parametrize("top", [3, None]) - def test_enrollment_list(self, serviceclient, fixture_cmd, top): - result = subject.iot_dps_device_enrollment_list( - cmd=fixture_cmd, - dps_name=mock_target['entity'], - resource_group_name=resource_group, - top=top - ) - request = serviceclient.calls[0].request - headers = request.headers - url = request.url - method = request.method - - assert str(headers.get("x-ms-max-item-count")) == str(top) - assert "{}/enrollments/query?".format(mock_target['entity']) in url - assert method == "POST" - assert json.dumps(result) - - def test_enrollment_list_error(self, fixture_cmd, serviceclient_generic_error): - with pytest.raises(CLIError): - subject.iot_dps_device_enrollment_list( - cmd=fixture_cmd, - dps_name=mock_target['entity'], - resource_group_name=resource_group, - ) - - -class TestEnrollmentDelete(): - @pytest.fixture(params=[204]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): - mocked_response.add( - method=responses.DELETE, - url="https://{}/enrollments/{}".format(mock_target['entity'], enrollment_id), - body='{}', - status=request.param, - content_type="application/json", - match_querystring=False, - ) - yield mocked_response - - @pytest.mark.parametrize( - "etag", - [None, etag] - ) - def test_enrollment_delete(self, serviceclient, fixture_cmd, etag): - subject.iot_dps_device_enrollment_delete( - cmd=fixture_cmd, - enrollment_id=enrollment_id, - dps_name=mock_target['entity'], - resource_group_name=resource_group, - etag=etag - ) - request = serviceclient.calls[0].request - url = request.url - method = request.method - assert "{}/enrollments/{}?".format(mock_target['entity'], enrollment_id) in url - assert method == 'DELETE' - assert request.headers["If-Match"] == etag if etag else "*" - - def test_enrollment_delete_error(self, serviceclient_generic_error, fixture_cmd): - with pytest.raises(CLIError): - subject.iot_dps_device_enrollment_delete( - cmd=fixture_cmd, - enrollment_id=enrollment_id, - dps_name=mock_target['entity'], - resource_group_name=resource_group, - ) - def generate_enrollment_group_create_req(iot_hub_host_name=None, initial_twin_tags=None, @@ -709,7 +38,7 @@ def generate_enrollment_group_create_req(iot_hub_host_name=None, return {'client': None, 'enrollment_id': enrollment_id, 'rg': resource_group, - 'dps_name': mock_target['entity'], + 'dps_name': mock_dps_target['entity'], 'certificate_path': certificate_path, 'secondary_certificate_path': secondary_certificate_path, 'root_ca_name': root_ca_name, @@ -730,10 +59,10 @@ def generate_enrollment_group_create_req(iot_hub_host_name=None, class TestEnrollmentGroupCreate(): @pytest.fixture(params=[200]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): mocked_response.add( method=responses.PUT, - url="https://{}/enrollmentGroups/{}".format(mock_target['entity'], enrollment_id), + url="https://{}/enrollmentGroups/{}".format(mock_dps_target['entity'], enrollment_id), body='{}', status=200, content_type="application/json", @@ -809,7 +138,7 @@ def test_enrollment_group_create(self, serviceclient, fixture_cmd, req): ) request = serviceclient.calls[0].request url = request.url - assert "{}/enrollmentGroups/{}?".format(mock_target['entity'], enrollment_id) in url + assert "{}/enrollmentGroups/{}?".format(mock_dps_target['entity'], enrollment_id) in url assert request.method == 'PUT' body = json.loads(request.body) @@ -969,7 +298,7 @@ def generate_enrollment_group_update_req(iot_hub_host_name=None, return {'client': None, 'enrollment_id': enrollment_id, 'rg': resource_group, - 'dps_name': mock_target['entity'], + 'dps_name': mock_dps_target['entity'], 'certificate_path': certificate_path, 'secondary_certificate_path': secondary_certificate_path, 'root_ca_name': root_ca_name, @@ -993,11 +322,11 @@ def generate_enrollment_group_update_req(iot_hub_host_name=None, class TestEnrollmentGroupUpdate(): @pytest.fixture(params=[(200, generate_enrollment_group_show(), 200)]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): # Initial GET mocked_response.add( method=responses.GET, - url="https://{}/enrollmentGroups/{}".format(mock_target['entity'], enrollment_id), + url="https://{}/enrollmentGroups/{}".format(mock_dps_target['entity'], enrollment_id), body=json.dumps(generate_enrollment_group_show()), status=200, content_type="application/json", @@ -1006,7 +335,7 @@ def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): # Update PUT mocked_response.add( method=responses.PUT, - url="https://{}/enrollmentGroups/{}".format(mock_target['entity'], enrollment_id), + url="https://{}/enrollmentGroups/{}".format(mock_dps_target['entity'], enrollment_id), body=json.dumps(generate_enrollment_group_show()), status=200, content_type="application/json", @@ -1066,13 +395,13 @@ def test_enrollment_group_update(self, serviceclient, fixture_cmd, req): # test initial GET request = serviceclient.calls[0].request url = request.url - assert "{}/enrollmentGroups/{}?".format(mock_target['entity'], enrollment_id) in url + assert "{}/enrollmentGroups/{}?".format(mock_dps_target['entity'], enrollment_id) in url assert request.method == 'GET' request = serviceclient.calls[1].request url = request.url - assert "{}/enrollmentGroups/{}?".format(mock_target['entity'], enrollment_id) in url + assert "{}/enrollmentGroups/{}?".format(mock_dps_target['entity'], enrollment_id) in url assert request.method == 'PUT' assert request.headers["If-Match"] == req['etag'] if req['etag'] else "*" @@ -1169,10 +498,10 @@ def test_enrollment_group_update_invalid_args(self, fixture_cmd, req): class TestEnrollmentGroupShow(): @pytest.fixture(params=[200]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): mocked_response.add( method=responses.GET, - url="https://{}/enrollmentGroups/{}".format(mock_target['entity'], enrollment_id), + url="https://{}/enrollmentGroups/{}".format(mock_dps_target['entity'], enrollment_id), body=json.dumps(generate_enrollment_group_show()), status=200, content_type="application/json", @@ -1181,10 +510,10 @@ def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): yield mocked_response @pytest.fixture() - def serviceclient_attestation(self, mocked_response, fixture_gdcs, fixture_sas): + def serviceclient_attestation(self, mocked_response, fixture_gdcs, fixture_dps_sas): mocked_response.add( method=responses.GET, - url="https://{}/enrollmentGroups/{}".format(mock_target['entity'], enrollment_id), + url="https://{}/enrollmentGroups/{}".format(mock_dps_target['entity'], enrollment_id), body=json.dumps(generate_enrollment_group_show(attestation=mock_symmetric_key_attestation)), status=200, content_type="application/json", @@ -1193,7 +522,7 @@ def serviceclient_attestation(self, mocked_response, fixture_gdcs, fixture_sas): mocked_response.add( method=responses.POST, - url="https://{}/enrollmentGroups/{}/attestationmechanism".format(mock_target['entity'], enrollment_id), + url="https://{}/enrollmentGroups/{}/attestationmechanism".format(mock_dps_target['entity'], enrollment_id), body=json.dumps(mock_symmetric_key_attestation), status=200, content_type="application/json", @@ -1204,7 +533,7 @@ def serviceclient_attestation(self, mocked_response, fixture_gdcs, fixture_sas): def test_enrollment_group_show(self, serviceclient, fixture_cmd): result = subject.iot_dps_device_enrollment_group_get( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], enrollment_id=enrollment_id, resource_group_name=resource_group ) @@ -1214,13 +543,13 @@ def test_enrollment_group_show(self, serviceclient, fixture_cmd): url = request.url method = request.method - assert "{}/enrollmentGroups/{}?".format(mock_target['entity'], enrollment_id) in url + assert "{}/enrollmentGroups/{}?".format(mock_dps_target['entity'], enrollment_id) in url assert method == 'GET' def test_enrollment_group_show_with_keys(self, fixture_cmd, serviceclient_attestation): result = subject.iot_dps_device_enrollment_group_get( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], enrollment_id=enrollment_id, resource_group_name=resource_group, show_keys=True @@ -1232,21 +561,21 @@ def test_enrollment_group_show_with_keys(self, fixture_cmd, serviceclient_attest url = request.url method = request.method - assert "{}/enrollmentGroups/{}?".format(mock_target['entity'], enrollment_id) in url + assert "{}/enrollmentGroups/{}?".format(mock_dps_target['entity'], enrollment_id) in url assert method == 'GET' request = serviceclient_attestation.calls[1].request url = request.url method = request.method - assert "{}/enrollmentGroups/{}/attestationmechanism?".format(mock_target['entity'], enrollment_id) in url + assert "{}/enrollmentGroups/{}/attestationmechanism?".format(mock_dps_target['entity'], enrollment_id) in url assert method == 'POST' def test_enrollment_group_show_error(self, fixture_cmd, serviceclient_generic_error): with pytest.raises(CLIError): subject.iot_dps_device_enrollment_group_get( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], enrollment_id=enrollment_id, resource_group_name=resource_group ) @@ -1254,10 +583,10 @@ def test_enrollment_group_show_error(self, fixture_cmd, serviceclient_generic_er class TestEnrollmentGroupList(): @pytest.fixture(params=[200]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): mocked_response.add( method=responses.POST, - url="https://{}/enrollmentGroups/query?".format(mock_target['entity']), + url="https://{}/enrollmentGroups/query?".format(mock_dps_target['entity']), body=json.dumps([generate_enrollment_group_show()]), status=200, content_type="application/json", @@ -1269,7 +598,7 @@ def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): def test_enrollment_group_list(self, serviceclient, fixture_cmd, top): result = subject.iot_dps_device_enrollment_group_list( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], resource_group_name=resource_group, top=top ) @@ -1277,7 +606,7 @@ def test_enrollment_group_list(self, serviceclient, fixture_cmd, top): headers = request.headers url = request.url method = request.method - assert "{}/enrollmentGroups/query?".format(mock_target['entity']) in url + assert "{}/enrollmentGroups/query?".format(mock_dps_target['entity']) in url assert method == 'POST' assert json.dumps(result) assert str(headers.get("x-ms-max-item-count")) == str(top) @@ -1286,17 +615,17 @@ def test_enrollment_group_list_error(self, fixture_cmd): with pytest.raises(CLIError): subject.iot_dps_device_enrollment_group_list( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], resource_group_name=resource_group ) class TestEnrollmentGroupDelete(): @pytest.fixture(params=[204]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): mocked_response.add( method=responses.DELETE, - url="https://{}/enrollmentGroups/{}".format(mock_target['entity'], enrollment_id), + url="https://{}/enrollmentGroups/{}".format(mock_dps_target['entity'], enrollment_id), body='{}', status=request.param, content_type="application/json", @@ -1311,7 +640,7 @@ def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): def test_enrollment_group_delete(self, serviceclient, fixture_cmd, etag): subject.iot_dps_device_enrollment_group_delete( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], enrollment_id=enrollment_id, resource_group_name=resource_group, etag=etag, @@ -1319,7 +648,7 @@ def test_enrollment_group_delete(self, serviceclient, fixture_cmd, etag): request = serviceclient.calls[0].request url = request.url method = request.method - assert "{}/enrollmentGroups/{}?".format(mock_target['entity'], enrollment_id) in url + assert "{}/enrollmentGroups/{}?".format(mock_dps_target['entity'], enrollment_id) in url assert method == 'DELETE' assert request.headers["If-Match"] == etag if etag else "*" @@ -1327,7 +656,7 @@ def test_enrollment_group_delete_error(self, fixture_cmd): with pytest.raises(CLIError): subject.iot_dps_device_enrollment_group_delete( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], enrollment_id=enrollment_id, resource_group_name=resource_group, ) @@ -1341,10 +670,10 @@ def generate_registration_state_show(): class TestRegistrationShow(): @pytest.fixture(params=[200]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): mocked_response.add( method=responses.GET, - url="https://{}/registrations/{}?".format(mock_target['entity'], registration_id), + url="https://{}/registrations/{}?".format(mock_dps_target['entity'], registration_id), body=json.dumps(generate_registration_state_show()), status=request.param, content_type="application/json", @@ -1355,7 +684,7 @@ def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): def test_registration_show(self, fixture_cmd, serviceclient): result = subject.iot_dps_registration_get( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], registration_id=registration_id, resource_group_name=resource_group, ) @@ -1363,14 +692,14 @@ def test_registration_show(self, fixture_cmd, serviceclient): request = serviceclient.calls[0].request url = request.url method = request.method - assert "{}/registrations/{}?".format(mock_target['entity'], registration_id) in url + assert "{}/registrations/{}?".format(mock_dps_target['entity'], registration_id) in url assert method == 'GET' def test_registration_show_error(self, fixture_cmd): with pytest.raises(CLIError): subject.iot_dps_registration_get( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], registration_id=registration_id, resource_group_name=resource_group, ) @@ -1378,10 +707,10 @@ def test_registration_show_error(self, fixture_cmd): class TestRegistrationList(): @pytest.fixture(params=[200]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): mocked_response.add( method=responses.POST, - url="https://{}/registrations/{}/query?".format(mock_target['entity'], enrollment_id), + url="https://{}/registrations/{}/query?".format(mock_dps_target['entity'], enrollment_id), body=json.dumps([generate_registration_state_show()]), status=request.param, content_type="application/json", @@ -1392,21 +721,21 @@ def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): def test_registration_list(self, serviceclient, fixture_cmd): subject.iot_dps_registration_list( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], enrollment_id=enrollment_id, resource_group_name=resource_group, ) request = serviceclient.calls[0].request url = request.url method = request.method - assert "{}/registrations/{}/query?".format(mock_target['entity'], enrollment_id) in url + assert "{}/registrations/{}/query?".format(mock_dps_target['entity'], enrollment_id) in url assert method == 'POST' def test_registration_list_error(self, fixture_cmd): with pytest.raises(CLIError): subject.iot_dps_registration_list( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], enrollment_id=enrollment_id, resource_group_name=resource_group, ) @@ -1414,10 +743,10 @@ def test_registration_list_error(self, fixture_cmd): class TestRegistrationDelete(): @pytest.fixture(params=[204]) - def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): + def serviceclient(self, mocked_response, fixture_gdcs, fixture_dps_sas, request): mocked_response.add( method=responses.DELETE, - url="https://{}/registrations/{}".format(mock_target['entity'], registration_id), + url="https://{}/registrations/{}".format(mock_dps_target['entity'], registration_id), body='{}', status=request.param, content_type="application/json", @@ -1432,7 +761,7 @@ def serviceclient(self, mocked_response, fixture_gdcs, fixture_sas, request): def test_registration_delete(self, serviceclient, fixture_cmd, etag): subject.iot_dps_registration_delete( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], registration_id=registration_id, resource_group_name=resource_group, etag=etag @@ -1440,7 +769,7 @@ def test_registration_delete(self, serviceclient, fixture_cmd, etag): request = serviceclient.calls[0].request url = request.url method = request.method - assert "{}/registrations/{}?".format(mock_target['entity'], registration_id) in url + assert "{}/registrations/{}?".format(mock_dps_target['entity'], registration_id) in url assert method == 'DELETE' assert request.headers["If-Match"] == etag if etag else "*" @@ -1448,7 +777,7 @@ def test_registration_delete_error(self, fixture_cmd): with pytest.raises(CLIError): subject.iot_dps_registration_delete( cmd=fixture_cmd, - dps_name=mock_target['entity'], + dps_name=mock_dps_target['entity'], registration_id=registration_id, resource_group_name=resource_group, )