Skip to content

Commit

Permalink
couple of fixes, timed commands, new id check functions
Browse files Browse the repository at this point in the history
  • Loading branch information
cecille committed Dec 11, 2024
1 parent 6e5ef85 commit 58defdd
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions src/python_testing/TC_AccessChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import chip.clusters as Clusters
from chip.interaction_model import Status, InteractionModelError
from chip.testing.basic_composition import BasicCompositionTests
from chip.testing.global_attribute_ids import GlobalAttributeIds, attribute_id_type, cluster_id_type, command_id_type, is_standard_attribute_id, is_standard_command_id
from chip.testing.matter_testing import (AttributePathLocation, ClusterPathLocation, ClusterPathLocation, MatterBaseTest, TestStep, async_test_body,
from chip.testing.global_attribute_ids import GlobalAttributeIds, cluster_id_type, is_standard_attribute_id, is_standard_cluster_id, is_standard_command_id
from chip.testing.matter_testing import (AttributePathLocation, ClusterPathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, TestStep, async_test_body,
default_matter_test_main)
from chip.testing.spec_parsing import XmlCluster, build_xml_clusters
from chip.tlv import uint
Expand Down Expand Up @@ -67,11 +67,11 @@ def known_cluster_attribute(attribute_id) -> bool:


def checkable_commands(cluster_id, cluster, xml_cluster) -> list[uint]:
all_cmds = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST]
all_cmds = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID]

def known_cluster_cmds(command_id) -> bool:
''' Returns true if this is a non-manufacturer specific command that has information in the XML and has python codegen data'''
return command_id <= 0xFFFF and command_id in xml_cluster.accepted_commands and command_id in Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]
return is_standard_command_id(command_id) and command_id in xml_cluster.accepted_commands and command_id in Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]
return [x for x in all_cmds if known_cluster_cmds(x)]


Expand Down Expand Up @@ -137,7 +137,7 @@ def _record_errors(self):
set([id for id in device_cluster_data[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] if is_standard_command_id(id)]))

# Remove MEI clusters - we don't have information available to check these.
all_clusters = [id for id in all_clusters if cluster_id_type(id) == ClusterIdType.kStandard]
all_clusters = [id for id in all_clusters if is_standard_cluster_id(id)]
for cluster_id in all_clusters:
location = ClusterPathLocation(endpoint_id=0, cluster_id=cluster_id)
if cluster_id not in self.xml_clusters:
Expand All @@ -164,9 +164,9 @@ def _record_errors(self):
continue
# Check that we have information for all the required commands
for command_id in cmds[cluster_id]:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
if command_id not in xml_cluster.accepted_commands.keys():
self.record_warning(test_name="Access Checker", location=location.
self.record_warning(test_name="Access Checker", location=location,
problem="Cluster command not found in spec XML")
continue
if command_id not in Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]:
Expand All @@ -191,7 +191,10 @@ async def _maybe_run_command_access_test_for_cluster_privilege(self, endpoint_id
# may be let through here is if the spec requires operate and the implementation requires admin.
continue
try:
await self.send_single_cmd(cmd=command(), dev_ctrl=self.TH2, endpoint=endpoint_id)
timed = None
if command.must_use_timed_invoke:
timed = 65535
await self.send_single_cmd(cmd=command(), dev_ctrl=self.TH2, endpoint=endpoint_id, timedRequestTimeoutMs=timed)
# If this was successful, that's an error
self.record_error(test_name=name, location=location,
problem=f"Unexpected success sending command {command} with privilege {privilege}")
Expand Down Expand Up @@ -294,7 +297,7 @@ async def run_access_test(self, test_type: AccessTestType):
self.step(step_number_with_privilege(check_step, 'b', privilege))
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, device_cluster_data in endpoint.items():
if cluster_id_type(cluster_id) != ClusterIdType.kStandard or cluster_id not in self.xml_clusters or cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES:
if not is_standard_cluster_id(cluster_id) or cluster_id not in self.xml_clusters or cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES:
# These cases have already been recorded by the _record_errors function
continue
xml_cluster = self.xml_clusters[cluster_id]
Expand Down

0 comments on commit 58defdd

Please sign in to comment.