diff --git a/HISTORY.rst b/HISTORY.rst index 11790f457..7b4e38802 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ Release History =============== +**IoT Hub updates** + +* Add convenience arguments for device update. + + 0.10.0 +++++++++++++++ diff --git a/azext_iot/_help.py b/azext_iot/_help.py index 6fbae8bed..98ea6d0ea 100644 --- a/azext_iot/_help.py +++ b/azext_iot/_help.py @@ -200,9 +200,15 @@ text: > az iot hub device-identity update -d {device_id} -n {iothub_name} --set capabilities.iotEdge=true + - name: Turn on edge capabilities for device using convenience argument. + text: > + az iot hub device-identity update -d {device_id} -n {iothub_name} --ee - name: Disable device status text: > az iot hub device-identity update -d {device_id} -n {iothub_name} --set status=disabled + - name: Disable device status using convenience argument. + text: > + az iot hub device-identity update -d {device_id} -n {iothub_name} --status disabled - name: In one command text: > az iot hub device-identity update -d {device_id} -n {iothub_name} diff --git a/azext_iot/_params.py b/azext_iot/_params.py index e21213e39..93765b831 100644 --- a/azext_iot/_params.py +++ b/azext_iot/_params.py @@ -370,6 +370,18 @@ def load_arguments(self, _): help="Description for device status.", ) + with self.argument_context('iot hub device-identity update') as context: + context.argument( + "primary_key", + options_list=["--primary-key", "--pk"], + help="The primary symmetric shared access key stored in base64 format.", + ) + context.argument( + "secondary_key", + options_list=["--secondary-key", "--sk"], + help="The secondary symmetric shared access key stored in base64 format.", + ) + with self.argument_context("iot hub device-identity create") as context: context.argument( "force", diff --git a/azext_iot/commands.py b/azext_iot/commands.py index 1085b1fd6..d9ab7c05c 100644 --- a/azext_iot/commands.py +++ b/azext_iot/commands.py @@ -34,7 +34,11 @@ def load_command_table(self, _): cmd_group.command("list", "iot_device_list") cmd_group.command("delete", "iot_device_delete") cmd_group.generic_update_command( - "update", getter_name="iot_device_show", setter_name="iot_device_update" + "update", + getter_name="iot_device_show", + custom_func_type=iothub_ops, + setter_name="iot_device_update", + custom_func_name="update_iot_device_custom" ) cmd_group.command( diff --git a/azext_iot/operations/hub.py b/azext_iot/operations/hub.py index 256bf2fa3..52e72a3f4 100644 --- a/azext_iot/operations/hub.py +++ b/azext_iot/operations/hub.py @@ -246,6 +246,38 @@ def _create_self_signed_cert(subject, valid_days, output_path=None): return create_self_signed_certificate(subject, valid_days, output_path) +def update_iot_device_custom(instance, edge_enabled=None, status=None, status_reason=None, + auth_method=None, primary_thumbprint=None, secondary_thumbprint=None, + primary_key=None, secondary_key=None): + if edge_enabled is not None: + instance['capabilities']['iotEdge'] = edge_enabled + if status is not None: + instance['status'] = status + if status_reason is not None: + instance['statusReason'] = status_reason + if auth_method is not None: + if auth_method == DeviceAuthType.shared_private_key.name: + auth = 'sas' + if (primary_key and not secondary_key) or (not primary_key and secondary_key): + raise CLIError("primary + secondary Key required with sas auth") + instance['authentication']['symmetricKey']['primaryKey'] = primary_key + instance['authentication']['symmetricKey']['secondaryKey'] = secondary_key + elif auth_method == DeviceAuthType.x509_thumbprint.name: + auth = 'selfSigned' + if not any([primary_thumbprint, secondary_thumbprint]): + raise CLIError("primary or secondary Thumbprint required with selfSigned auth") + if primary_thumbprint: + instance['authentication']['x509Thumbprint']['primaryThumbprint'] = primary_thumbprint + if secondary_thumbprint: + instance['authentication']['x509Thumbprint']['secondaryThumbprint'] = secondary_thumbprint + elif auth_method == DeviceAuthType.x509_ca.name: + auth = 'certificateAuthority' + else: + raise ValueError('Authorization method {} invalid.'.format(auth_method)) + instance['authentication']['type'] = auth + return instance + + def iot_device_update( cmd, device_id, parameters, hub_name=None, resource_group_name=None, login=None ): @@ -257,7 +289,15 @@ def iot_device_update( service_sdk = resolver.get_sdk(SdkType.service_sdk) try: - updated_device = _handle_device_update_params(parameters) + auth, pk, sk = _parse_auth(parameters) + updated_device = _assemble_device( + parameters['deviceId'], + auth, parameters['capabilities']['iotEdge'], + pk, + sk, + parameters['status'].lower(), + parameters.get('statusReason') + ) etag = parameters.get("etag", None) if etag: headers = {} @@ -272,28 +312,6 @@ def iot_device_update( raise CLIError(err) -def _handle_device_update_params(parameters): - status = parameters["status"].lower() - possible_status = ["enabled", "disabled"] - if status not in possible_status: - raise CLIError("status must be one of {}".format(possible_status)) - - edge = parameters["capabilities"].get("iotEdge") - if not isinstance(edge, bool): - raise CLIError("capabilities.iotEdge is of type bool") - - auth, pk, sk = _parse_auth(parameters) - return _assemble_device( - parameters["deviceId"], - auth, - edge, - pk, - sk, - status, - parameters.get("statusReason"), - ) - - def iot_device_delete( cmd, device_id, hub_name=None, resource_group_name=None, login=None ): diff --git a/azext_iot/tests/test_iot_ext_int.py b/azext_iot/tests/test_iot_ext_int.py index 4503f4d46..e09088d45 100644 --- a/azext_iot/tests/test_iot_ext_int.py +++ b/azext_iot/tests/test_iot_ext_int.py @@ -444,8 +444,55 @@ def test_hub_devices(self): ) self.cmd( - '''iot hub device-identity update -d {} -n {} -g {} --set authentication.symmetricKey.primaryKey="" - authentication.symmetricKey.secondaryKey=""'''.format( + "iot hub device-identity update -d {} -n {} -g {} --ee {} --auth-method {}" + .format(device_ids[0], LIVE_HUB, LIVE_RG, False, 'x509_ca'), + checks=[ + self.check("deviceId", device_ids[0]), + self.check("status", "enabled"), + self.check("capabilities.iotEdge", False), + self.check("authentication.symmetricKey.primaryKey", None), + self.check("authentication.symmetricKey.secondaryKey", None), + self.check("authentication.x509Thumbprint.primaryThumbprint", None), + self.check("authentication.x509Thumbprint.secondaryThumbprint", None), + self.check("authentication.type", 'certificateAuthority') + ] + ) + + self.cmd( + "iot hub device-identity update -d {} -n {} -g {} --status-reason {}" + .format(device_ids[0], LIVE_HUB, LIVE_RG, 'TestStatusReason'), + checks=[ + self.check("deviceId", device_ids[0]), + self.check("statusReason", 'TestStatusReason'), + ] + ) + + self.cmd( + "iot hub device-identity update -d {} -n {} -g {} --ee {} --status {}" + " --status-reason {} --auth-method {} --ptp {} --stp {}" + .format(device_ids[0], LIVE_HUB, LIVE_RG, False, 'enabled', + 'StatusReasonUpdated', 'x509_thumbprint', PRIMARY_THUMBPRINT, SECONDARY_THUMBPRINT), + checks=[ + self.check("deviceId", device_ids[0]), + self.check("status", "enabled"), + self.check("capabilities.iotEdge", False), + self.check("statusReason", 'StatusReasonUpdated'), + self.check("authentication.x509Thumbprint.primaryThumbprint", PRIMARY_THUMBPRINT), + self.check("authentication.x509Thumbprint.secondaryThumbprint", SECONDARY_THUMBPRINT), + ] + ) + + self.cmd("iot hub device-identity update -d {} -n {} -g {} --auth-method {}" + .format(device_ids[0], LIVE_HUB, LIVE_RG, 'x509_thumbprint'), + expect_failure=True) + + self.cmd("iot hub device-identity update -d {} -n {} -g {} --auth-method {} --pk {}" + .format(device_ids[0], LIVE_HUB, LIVE_RG, 'shared_private_key', '123'), + expect_failure=True) + + self.cmd( + '''iot hub device-identity update -d {} -n {} -g {} --primary-key="" + --secondary-key=""'''.format( edge_device_ids[1], LIVE_HUB, LIVE_RG ), checks=[ diff --git a/azext_iot/tests/test_iot_ext_unit.py b/azext_iot/tests/test_iot_ext_unit.py index faf9ea762..565aa0630 100644 --- a/azext_iot/tests/test_iot_ext_unit.py +++ b/azext_iot/tests/test_iot_ext_unit.py @@ -359,13 +359,15 @@ def test_device_create_error(self, serviceclient_generic_error, req): def generate_device_show(**kvp): payload = { "authentication": { - "symmetricKey": {"primaryKey": "123", "secondaryKey": "321"}, - "type": "sas", + "symmetricKey": {"primaryKey": None, "secondaryKey": None}, + "x509Thumbprint": {"primaryThumbprint": None, "secondaryThumbprint": None}, + "type": "sas" }, "etag": "abcd", "capabilities": {"iotEdge": True}, "deviceId": device_id, "status": "disabled", + "statusReason": "unknown reason", } for k in kvp: if payload.get(k): @@ -373,6 +375,28 @@ def generate_device_show(**kvp): return payload +def device_update_con_arg( + edge_enabled=None, + status=None, + status_reason=None, + auth_method=None, + primary_thumbprint=None, + secondary_thumbprint=None, + primary_key=None, + secondary_key=None +): + return { + "edge_enabled": edge_enabled, + "status": status, + "status_reason": status_reason, + "auth_method": auth_method, + "primary_thumbprint": primary_thumbprint, + "secondary_thumbprint": secondary_thumbprint, + "primary_key": primary_key, + "secondary_key": secondary_key + } + + class TestDeviceUpdate: @pytest.fixture(params=[200]) def serviceclient(self, mocker, fixture_ghcs, fixture_sas, request): @@ -384,6 +408,8 @@ def serviceclient(self, mocker, fixture_ghcs, fixture_sas, request): @pytest.mark.parametrize( "req", [ + (generate_device_show(capabilities={"iotEdge": False})), + (generate_device_show(status="enabled")), ( generate_device_show( authentication={ @@ -431,6 +457,149 @@ def test_device_update(self, fixture_cmd, serviceclient, req): headers = args[0][0].headers assert headers["If-Match"] == '"{}"'.format(req["etag"]) + @pytest.mark.parametrize( + "req, arg", + [ + ( + generate_device_show( + capabilities={"iotEdge": False} + ), + device_update_con_arg( + edge_enabled=True + ) + ), + ( + generate_device_show( + status="disabled" + ), + device_update_con_arg( + status="enabled" + ) + ), + ( + generate_device_show(), + device_update_con_arg( + status_reason="test" + ) + ), + ( + generate_device_show(), + device_update_con_arg( + auth_method="shared_private_key", + primary_key="primarykeyUpdated", + secondary_key="secondarykeyUpdated" + ) + ), + ( + generate_device_show( + authentication={ + "type": "selfSigned", + "symmetricKey": {"primaryKey": None, "secondaryKey": None}, + "x509Thumbprint": {"primaryThumbprint": "123", "secondaryThumbprint": "321"}, + } + ), + device_update_con_arg( + auth_method="shared_private_key", + primary_key="primary_key", + secondary_key="secondary_key" + ) + ), + ( + generate_device_show( + authentication={ + "type": "certificateAuthority", + "symmetricKey": {"primaryKey": None, "secondaryKey": None}, + "x509Thumbprint": {"primaryThumbprint": None, "secondaryThumbprint": None}, + } + ), + device_update_con_arg( + auth_method="x509_thumbprint", + primary_thumbprint="primary_thumbprint", + secondary_thumbprint="secondary_thumbprint" + ) + ), + ( + generate_device_show(), + device_update_con_arg( + auth_method="x509_ca", + ) + ), + ] + ) + def test_iot_device_custom(self, fixture_cmd, serviceclient, req, arg): + instance = subject.update_iot_device_custom( + req, + arg["edge_enabled"], + arg["status"], + arg["status_reason"], + arg['auth_method'], + arg['primary_thumbprint'], + arg['secondary_thumbprint'], + arg['primary_key'], + arg["secondary_key"] + ) + + if arg["edge_enabled"]: + assert instance["capabilities"]["iotEdge"] == arg["edge_enabled"] + if arg["status"]: + assert instance["status"] == arg["status"] + if arg["status_reason"]: + assert instance['statusReason'] == arg["status_reason"] + if arg["auth_method"]: + if arg["auth_method"] == "shared_private_key": + assert instance['authentication']['type'] == "sas" + instance['authentication']['symmetricKey']['primaryKey'] == arg['primary_key'] + instance['authentication']['symmetricKey']['secondaryKey'] == arg['secondary_key'] + if arg["auth_method"] == "x509_thumbprint": + assert instance['authentication']['type'] == "selfSigned" + if arg['primary_thumbprint']: + instance['authentication']['x509Thumbprint']['primaryThumbprint'] = arg['primary_thumbprint'] + if arg['secondary_thumbprint']: + instance['authentication']['x509Thumbprint']['secondaryThumbprint'] = arg['secondary_thumbprint'] + if arg['auth_method'] == "x509_ca": + assert instance['authentication']['type'] == "certificateAuthority" + + @pytest.mark.parametrize( + "req, arg, exp", + [ + ( + generate_device_show(), + device_update_con_arg( + auth_method="shared_private_key", + primary_key="primarykeyUpdated", + ), + CLIError + ), + ( + generate_device_show(), + device_update_con_arg( + auth_method="x509_thumbprint", + ), + CLIError + ), + ( + generate_device_show(), + device_update_con_arg( + auth_method="Unknown", + ), + ValueError + ), + ] + ) + def test_iot_device_custom_invalid_args(self, serviceclient, req, arg, exp): + with pytest.raises(exp): + subject.update_iot_device_custom( + req, + arg["edge_enabled"], + arg["status"], + arg["status_reason"], + arg['auth_method'], + arg['primary_thumbprint'], + arg['secondary_thumbprint'], + arg['primary_key'], + arg["secondary_key"] + ) + @pytest.mark.parametrize( "req, exp", [