Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TC-ACE-2.3: Add (test for command access and presence) #36808

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 134 additions & 10 deletions src/python_testing/TC_AccessChecker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# These are separated into different runs because the logs for these tests are HUGE. The attribute one individually
# reads every attribute on every cluster 4 times. If there's a failure, having these in separate runs makes it significantly
# easier to navigate the logs
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs:
# run1:
Expand All @@ -15,6 +19,34 @@
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# --tests test_TC_ACE_2_1
# run2:
# app: ${ALL_CLUSTERS_APP}
# factory-reset: true
# quiet: true
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# --tests test_TC_ACE_2_2
# run3:
# app: ${ALL_CLUSTERS_APP}
# factory-reset: true
# quiet: true
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# --bool-arg ci_only_linux_skip_ota_cluster_disallowed_for_certification:True
# --tests test_TC_ACE_2_3
# === END CI TEST ARGUMENTS ===

import logging
Expand All @@ -23,18 +55,20 @@
from typing import Optional

import chip.clusters as Clusters
from chip.interaction_model import Status
from chip.interaction_model import InteractionModelError, Status
from chip.testing.basic_composition import BasicCompositionTests
from chip.testing.global_attribute_ids import GlobalAttributeIds
from chip.testing.matter_testing import (AttributePathLocation, ClusterPathLocation, MatterBaseTest, TestStep, async_test_body,
default_matter_test_main)
from chip.testing.global_attribute_ids import (GlobalAttributeIds, is_standard_attribute_id, is_standard_cluster_id,
is_standard_command_id)
from chip.testing.matter_testing import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, TestStep,
async_test_body, default_matter_test_main)
from chip.testing.spec_parsing import XmlCluster
from chip.tlv import uint


class AccessTestType(Enum):
READ = auto()
WRITE = auto()
INVOKE = auto()


def step_number_with_privilege(step: int, substep: str, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum) -> str:
Expand All @@ -61,10 +95,19 @@ def checkable_attributes(cluster_id, cluster, xml_cluster) -> list[uint]:

def known_cluster_attribute(attribute_id) -> bool:
''' Returns true if this is a non-manufacturer specific attribute that has information in the XML and has python codegen data'''
return attribute_id <= 0xFFFF and attribute_id in xml_cluster.attributes and attribute_id in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]
return is_standard_attribute_id(attribute_id) and attribute_id in xml_cluster.attributes and attribute_id in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]
return [x for x in all_attrs if known_cluster_attribute(x)]


def checkable_commands(cluster_id, cluster, xml_cluster) -> list[uint]:
all_cmds = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID]

def known_cluster_cmds(command_id) -> bool:
''' Returns true if this is a non-manufacturer specific command that has information in the XML and has python codegen data'''
return is_standard_command_id(command_id) and command_id in xml_cluster.accepted_commands and command_id in Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]
return [x for x in all_cmds if known_cluster_cmds(x)]


class AccessChecker(MatterBaseTest, BasicCompositionTests):
@async_test_body
async def setup_class(self):
Expand Down Expand Up @@ -110,20 +153,25 @@ async def _setup_acl(self, privilege: Optional[Clusters.AccessControl.Enums.Acce
def _record_errors(self):
''' Checks through all the endpoints and records all the spec warnings in one go so we don't get repeats'''
all_clusters = set()
attrs: dict[uint, set()] = {}
attrs: dict[uint, set] = {}
cmds: dict[uint, set] = {}

for endpoint_id, endpoint in self.endpoints_tlv.items():
all_clusters |= set(endpoint.keys())
for cluster_id, device_cluster_data in endpoint.items():
# Find all the attributes for this cluster across all endpoint
if cluster_id not in attrs:
attrs[cluster_id] = set()
if cluster_id not in cmds:
cmds[cluster_id] = set()
# discard MEI attributes as we do not have access information for them.
attrs[cluster_id].update(
set([id for id in device_cluster_data[GlobalAttributeIds.ATTRIBUTE_LIST_ID] if id <= 0xFFFF]))
set([id for id in device_cluster_data[GlobalAttributeIds.ATTRIBUTE_LIST_ID] if is_standard_attribute_id(id)]))
cmds[cluster_id].update(
set([id for id in device_cluster_data[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] if is_standard_command_id(id)]))

# Remove MEI clusters - we don't have information available to check these.
all_clusters = [id for id in all_clusters if id <= 0x7FFF]
all_clusters = [id for id in all_clusters if is_standard_cluster_id(id)]
for cluster_id in all_clusters:
location = ClusterPathLocation(endpoint_id=0, cluster_id=cluster_id)
if cluster_id not in self.xml_clusters:
Expand All @@ -139,7 +187,7 @@ def _record_errors(self):
xml_cluster = self.xml_clusters[cluster_id]
for attribute_id in attrs[cluster_id]:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
if attribute_id not in xml_cluster.attributes:
if attribute_id not in xml_cluster.attributes.keys():
self.record_warning(test_name="Access Checker", location=location,
problem="Cluster attribute not found in spec XML")
continue
Expand All @@ -148,6 +196,53 @@ def _record_errors(self):
problem="Unknown attribute")
self.success = False
continue
# Check that we have information for all the required commands
for command_id in cmds[cluster_id]:
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
if command_id not in xml_cluster.accepted_commands.keys():
self.record_warning(test_name="Access Checker", location=location,
problem="Cluster command not found in spec XML")
continue
if command_id not in Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]:
self._record_error(test_name="Access Checker", location=location,
problem="Unknown command")
self.success = False
continue

async def _maybe_run_command_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, device_cluster_data, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum):
""" Runs a command only if the required cluster privilege is HIGHER than the specified privilege. In this way,
no commands are actually run on the device, which means there are no side effects. However, we can differentiate
ACL rejections from commands being unsupported.
"""
ota_exception = self.user_params.get('ci_only_linux_skip_ota_cluster_disallowed_for_certification', False)
if cluster_id == Clusters.OtaSoftwareUpdateRequestor.id and ota_exception:
logging.warn('WARNING: Skipping OTA cluster check for CI. THIS IS DISALLOWED FOR CERTIFICATION')
return

for command_id in checkable_commands(cluster_id, device_cluster_data, xml_cluster):
spec_requires = xml_cluster.accepted_commands[command_id].privilege
command = Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id][command_id]
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
name = f"Command test - privilege {privilege}"
if operation_allowed(spec_requires, privilege):
# In this test, we're only checking that the disallowed commands are rejected so that there are
# no side effects. Commands are checked with admin privilege in their cluster tests. The error that
# may be let through here is if the spec requires operate and the implementation requires admin.
continue
try:
timed = None
if command.must_use_timed_invoke:
timed = 65535
await self.send_single_cmd(cmd=command(), dev_ctrl=self.TH2, endpoint=endpoint_id, timedRequestTimeoutMs=timed)
# If this was successful, that's an error
self.record_error(test_name=name, location=location,
problem=f"Unexpected success sending command {command} with privilege {privilege}")
self.success = False
except InteractionModelError as e:
if e.status != Status.UnsupportedAccess:
self.record_error(test_name=name, location=location,
problem=f'Unexpected error sending command {command} with privilege {privilege} - expected UNSUPPORTED_ACCESS, got {e.status}')
self.success = False

async def _run_read_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, device_cluster_data, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum):
# TODO: This assumes all attributes are readable. Which they are currently. But we don't have a general way to mark otherwise.
Expand Down Expand Up @@ -241,14 +336,16 @@ async def run_access_test(self, test_type: AccessTestType):
self.step(step_number_with_privilege(check_step, 'b', privilege))
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, device_cluster_data in endpoint.items():
if cluster_id > 0x7FFF or cluster_id not in self.xml_clusters or cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES:
if not is_standard_cluster_id(cluster_id) or cluster_id not in self.xml_clusters or cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES:
# These cases have already been recorded by the _record_errors function
continue
xml_cluster = self.xml_clusters[cluster_id]
if test_type == AccessTestType.READ:
await self._run_read_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege)
elif test_type == AccessTestType.WRITE:
await self._run_write_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege, wildcard_read)
elif test_type == AccessTestType.INVOKE:
await self._maybe_run_command_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege)
else:
self.fail_current_test("Unsupported test type")
if not self.success:
Expand Down Expand Up @@ -298,6 +395,33 @@ def desc_TC_ACE_2_2(self):
async def test_TC_ACE_2_2(self):
await self.run_access_test(AccessTestType.WRITE)

def steps_TC_ACE_2_3(self):
steps = [TestStep("precondition", "DUT is commissioned", is_commissioning=True),
TestStep(1, "TH_commissioner performs a wildcard read"),
TestStep(2, "TH_commissioner reads the ACL attribute"),
TestStep(3, "Repeat steps 3a and 3b for each permission level")]
enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue]
for p in privilege_enum:
steps.append(TestStep(step_number_with_privilege(3, 'a', p),
"TH_commissioner gives TH_second_commissioner the specified privilege"))
steps.append(TestStep(step_number_with_privilege(3, 'b', p),
"""For each standard command on each standard cluster on each endpoint,
TH_second_controller checks the permission requirements for that command.
If the permission required for the command is HIGHER than the permission level being tested,
TH_second_controller sends the command to the DUT using default values.
Regardless of the command contents, the DUT should return an access error since access must be checked
before the command is processed. Receipt of an UNSUPPORTED_COMMAND error is a conformance failure.""",
"DUT returns UNSUPPORTED_ACCESS error"))
return steps

def desc_TC_ACE_2_3(self):
return "[TC-ACE-2.3] Command Privilege Enforcement - [DUT as Server]"

@async_test_body
async def test_TC_ACE_2_3(self):
await self.run_access_test(AccessTestType.INVOKE)


if __name__ == "__main__":
default_matter_test_main()
24 changes: 21 additions & 3 deletions src/python_testing/TestSpecParsingSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@
CLUSTER_NAME = "TestCluster"
ATTRIBUTE_NAME = "TestAttribute"
ATTRIBUTE_ID = 0x0000
COMMAND_ID = 0x0F


def single_attribute_cluster_xml(read_access: str, write_access: str, write_supported: str):
def single_attribute_cluster_xml(read_access: str, write_access: str, write_supported: str, invoke_access: str):
xml_cluster = f'<cluster xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="types types.xsd cluster cluster.xsd" id="{CLUSTER_ID}" name="{CLUSTER_NAME}" revision="3">'
revision_table = ('<revisionHistory>'
'<revision revision="1" summary="Initial Release"/>'
Expand All @@ -55,12 +56,20 @@ def single_attribute_cluster_xml(read_access: str, write_access: str, write_supp
'<mandatoryConform/>'
'</attribute>'
'</attributes>')
invoke_access_str = f'invokePrivilege="{invoke_access}"' if invoke_access is not None else ""
command = ('<commands>'
f'<command id="{COMMAND_ID}" name="Cmd" direction="commandToServer" response="Y">'
f'<access {invoke_access_str}/>'
'<mandatoryConform/>'
'</command>'
'</commands>')

return (f'{xml_cluster}'
f'{revision_table}'
f'{id_table}'
f'{classification}'
f'{attribute}'
f'{command}'
'</cluster>')


Expand Down Expand Up @@ -297,7 +306,7 @@ def test_spec_parsing_access(self):
strs = [None, 'view', 'operate', 'manage', 'admin']
for read in strs:
for write in strs:
xml = single_attribute_cluster_xml(read, write, "true")
xml = single_attribute_cluster_xml(read, write, "true", None)
xml_cluster = parse_cluster(xml)
asserts.assert_is_not_none(xml_cluster.attributes, "No attributes found in cluster")
asserts.assert_is_not_none(xml_cluster.attribute_map, "No attribute map found in cluster")
Expand All @@ -308,10 +317,19 @@ def test_spec_parsing_access(self):
get_access_enum_from_string(read), "Unexpected read access")
asserts.assert_equal(xml_cluster.attributes[ATTRIBUTE_ID].write_access,
get_access_enum_from_string(write), "Unexpected write access")
for invoke in strs:
xml = single_attribute_cluster_xml(None, None, "true", invoke)
xml_cluster = parse_cluster(xml)
asserts.assert_is_not_none(xml_cluster.accepted_commands, "No commands found in cluster")
asserts.assert_is_not_none(xml_cluster.command_map, "No command map found in cluster")
asserts.assert_true(COMMAND_ID in xml_cluster.accepted_commands.keys(),
"Did not find test command in XmlCluster.accepted_commands")
asserts.assert_equal(xml_cluster.accepted_commands[COMMAND_ID].privilege,
get_access_enum_from_string(invoke), "Unexpected invoke privilege")

def test_write_optional(self):
for write_support in ['true', 'optional']:
xml = single_attribute_cluster_xml('view', 'view', write_support)
xml = single_attribute_cluster_xml('view', 'view', write_support, None)
xml_cluster = parse_cluster(xml)
asserts.assert_is_not_none(xml_cluster.attributes, "No attributes found in cluster")
asserts.assert_is_not_none(xml_cluster.attribute_map, "No attribute map found in cluster")
Expand Down
Loading
Loading