From 7b7e99d6111d1adcab86c411812f2c80e5d65395 Mon Sep 17 00:00:00 2001 From: Tobias Reiher Date: Mon, 4 Sep 2023 18:25:31 +0200 Subject: [PATCH] Fix test code generation Ref. eng/recordflux/RecordFlux#1410 --- tests/data/data_test.py | 14 +- tests/data/models.py | 1325 ++++++++++++++--------- tests/integration/pyrflx_test.py | 18 +- tests/unit/expression_test.py | 10 +- tests/unit/generator_test.py | 36 +- tests/unit/model/cache_test.py | 8 +- tests/unit/model/message_test.py | 751 +++++++------ tests/unit/model/model_test.py | 28 +- tests/unit/model/session_test.py | 98 +- tests/unit/model/type_test.py | 18 +- tests/unit/specification/parser_test.py | 2 +- tools/generate_spark_test_code.py | 4 +- 12 files changed, 1282 insertions(+), 1030 deletions(-) diff --git a/tests/data/data_test.py b/tests/data/data_test.py index 7e12920b9..baa6e489d 100644 --- a/tests/data/data_test.py +++ b/tests/data/data_test.py @@ -9,13 +9,13 @@ @pytest.mark.parametrize( "message", [ - models.NULL_MESSAGE, - models.TLV_MESSAGE, - models.ETHERNET_FRAME, - models.ENUMERATION_MESSAGE, - models.SEQUENCE_MESSAGE, - models.EXPRESSION_MESSAGE, - models.DERIVATION_MESSAGE, + models.null_message(), + models.tlv_message(), + models.ethernet_frame(), + models.enumeration_message(), + models.sequence_message(), + models.expression_message(), + models.derivation_message(), ], ) def test_models(message: model.Message) -> None: diff --git a/tests/data/models.py b/tests/data/models.py index cc333162a..f0dc14c7a 100644 --- a/tests/data/models.py +++ b/tests/data/models.py @@ -1,3 +1,16 @@ +""" +Collection of models for testing. + +All types are represented by functions instead of variables to avoid time-consuming operations (such +as message verifications) during test collection. This also prevents new processes from starting +before the current process has finished its bootstrapping phase, which would result in a runtime +error. +""" + +from __future__ import annotations + +from functools import lru_cache + from rflx.error import Location from rflx.expression import ( Aggregate, @@ -37,550 +50,802 @@ UnprovenMessage, ) -NULL_MESSAGE = Message("Null::Message", [], {}, skip_proof=True) -NULL_MODEL = Model([NULL_MESSAGE]) -TLV_TAG = Enumeration( - "TLV::Tag", - [("Msg_Data", Number(1)), ("Msg_Error", Number(3))], - Number(8), - always_valid=False, -) -TLV_LENGTH = Integer( - "TLV::Length", - Number(0), - Sub(Pow(Number(2), Number(16)), Number(1)), - Number(16), -) -TLV_MESSAGE = Message( - "TLV::Message", - [ - Link(INITIAL, Field("Tag")), - Link(Field("Tag"), Field("Length"), Equal(Variable("Tag"), Variable("Msg_Data"))), - Link(Field("Tag"), FINAL, Equal(Variable("Tag"), Variable("Msg_Error"))), - Link(Field("Length"), Field("Value"), size=Mul(Variable("Length"), Number(8))), - Link(Field("Value"), FINAL), - ], - {Field("Tag"): TLV_TAG, Field("Length"): TLV_LENGTH, Field("Value"): OPAQUE}, - skip_proof=True, -) -TLV_MODEL = Model([TLV_TAG, TLV_LENGTH, TLV_MESSAGE]) +@lru_cache +def null_message() -> Message: + return Message("Null::Message", [], {}, skip_proof=True) -TLV_MESSAGES = Sequence("TLV::Messages", TLV_MESSAGE) -TLV_TAGS = Sequence("TLV::Tags", TLV_TAG) -TLV_WITH_CHECKSUM_TAG = Enumeration( - "TLV_With_Checksum::Tag", - [("Msg_Data", Number(1)), ("Msg_Error", Number(3))], - Number(8), - always_valid=False, -) -TLV_WITH_CHECKSUM_LENGTH = Integer( - "TLV_With_Checksum::Length", - Number(0), - Sub(Pow(Number(2), Number(16)), Number(1)), - Number(16), -) -TLV_WITH_CHECKSUM_CHECKSUM = Integer( - "TLV_With_Checksum::Checksum", - Number(0), - Sub(Pow(Number(2), Number(16)), Number(1)), - Number(16), -) -TLV_WITH_CHECKSUM_MESSAGE = Message( - "TLV_With_Checksum::Message", - [ - Link(INITIAL, Field("Tag")), - Link(Field("Tag"), Field("Length"), Equal(Variable("Tag"), Variable("Msg_Data"))), - Link(Field("Tag"), FINAL, Equal(Variable("Tag"), Variable("Msg_Error"))), - Link(Field("Length"), Field("Value"), size=Mul(Variable("Length"), Number(8))), - Link(Field("Value"), Field("Checksum")), - Link(Field("Checksum"), FINAL, ValidChecksum("Checksum")), - ], - { - Field("Tag"): TLV_WITH_CHECKSUM_TAG, - Field("Length"): TLV_WITH_CHECKSUM_LENGTH, - Field("Value"): OPAQUE, - Field("Checksum"): TLV_WITH_CHECKSUM_CHECKSUM, - }, - checksums={ID("Checksum"): [Variable("Tag"), Size("Value"), Variable("Value")]}, - skip_proof=True, -) -TLV_WITH_CHECKSUM_MODEL = Model( - [TLV_WITH_CHECKSUM_TAG, TLV_WITH_CHECKSUM_LENGTH, TLV_WITH_CHECKSUM_MESSAGE], -) +@lru_cache +def null_model() -> Model: + return Model([null_message()]) -NULL_MESSAGE_IN_TLV_MESSAGE = Refinement("In_TLV", TLV_MESSAGE, Field("Value"), NULL_MESSAGE) -NULL_MESSAGE_IN_TLV_MESSAGE_MODEL = Model( - [TLV_TAG, TLV_LENGTH, TLV_MESSAGE, NULL_MESSAGE, NULL_MESSAGE_IN_TLV_MESSAGE], -) -ETHERNET_ADDRESS = Integer( - "Ethernet::Address", - Number(0), - Sub(Pow(Number(2), Number(48)), Number(1)), - Number(48), -) -ETHERNET_TYPE_LENGTH = Integer( - "Ethernet::Type_Length", - Number(46), - Sub(Pow(Number(2), Number(16)), Number(1)), - Number(16), -) -ETHERNET_TPID = Integer("Ethernet::TPID", Number(0x8100, 16), Number(0x8100, 16), Number(16)) -ETHERNET_TCI = Integer( - "Ethernet::TCI", - Number(0), - Sub(Pow(Number(2), Number(16)), Number(1)), - Number(16), -) -ETHERNET_FRAME = Message( - "Ethernet::Frame", - [ - Link(INITIAL, Field("Destination")), - Link(Field("Destination"), Field("Source")), - Link(Field("Source"), Field("Type_Length_TPID")), - Link( - Field("Type_Length_TPID"), - Field("TPID"), - Equal(Variable("Type_Length_TPID"), Number(0x8100, 16)), - first=First("Type_Length_TPID"), - ), - Link( - Field("Type_Length_TPID"), - Field("Type_Length"), - NotEqual(Variable("Type_Length_TPID"), Number(0x8100, 16)), - first=First("Type_Length_TPID"), - ), - Link(Field("TPID"), Field("TCI")), - Link(Field("TCI"), Field("Type_Length")), - Link( - Field("Type_Length"), - Field("Payload"), - LessEqual(Variable("Type_Length"), Number(1500)), - Mul(Variable("Type_Length"), Number(8)), - ), - Link( - Field("Type_Length"), - Field("Payload"), - GreaterEqual(Variable("Type_Length"), Number(1536)), - ), - Link( - Field("Payload"), - FINAL, - And( - GreaterEqual(Div(Size("Payload"), Number(8)), Number(46)), - LessEqual(Div(Size("Payload"), Number(8)), Number(1500)), +@lru_cache +def tlv_tag() -> Enumeration: + return Enumeration( + "TLV::Tag", + [("Msg_Data", Number(1)), ("Msg_Error", Number(3))], + Number(8), + always_valid=False, + ) + + +@lru_cache +def tlv_length() -> Integer: + return Integer( + "TLV::Length", + Number(0), + Sub(Pow(Number(2), Number(16)), Number(1)), + Number(16), + ) + + +@lru_cache +def tlv_message() -> Message: + return Message( + "TLV::Message", + [ + Link(INITIAL, Field("Tag")), + Link(Field("Tag"), Field("Length"), Equal(Variable("Tag"), Variable("Msg_Data"))), + Link(Field("Tag"), FINAL, Equal(Variable("Tag"), Variable("Msg_Error"))), + Link(Field("Length"), Field("Value"), size=Mul(Variable("Length"), Number(8))), + Link(Field("Value"), FINAL), + ], + {Field("Tag"): tlv_tag(), Field("Length"): tlv_length(), Field("Value"): OPAQUE}, + skip_proof=True, + ) + + +@lru_cache +def tlv_model() -> Model: + return Model([tlv_tag(), tlv_length(), tlv_message()]) + + +@lru_cache +def tlv_messages() -> Sequence: + return Sequence("TLV::Messages", tlv_message()) + + +@lru_cache +def tlv_tags() -> Sequence: + return Sequence("TLV::Tags", tlv_tag()) + + +@lru_cache +def tlv_with_checksum_tag() -> Enumeration: + return Enumeration( + "TLV_With_Checksum::Tag", + [("Msg_Data", Number(1)), ("Msg_Error", Number(3))], + Number(8), + always_valid=False, + ) + + +@lru_cache +def tlv_with_checksum_length() -> Integer: + return Integer( + "TLV_With_Checksum::Length", + Number(0), + Sub(Pow(Number(2), Number(16)), Number(1)), + Number(16), + ) + + +@lru_cache +def tlv_with_checksum_checksum() -> Integer: + return Integer( + "TLV_With_Checksum::Checksum", + Number(0), + Sub(Pow(Number(2), Number(16)), Number(1)), + Number(16), + ) + + +@lru_cache +def tlv_with_checksum_message() -> Message: + return Message( + "TLV_With_Checksum::Message", + [ + Link(INITIAL, Field("Tag")), + Link(Field("Tag"), Field("Length"), Equal(Variable("Tag"), Variable("Msg_Data"))), + Link(Field("Tag"), FINAL, Equal(Variable("Tag"), Variable("Msg_Error"))), + Link(Field("Length"), Field("Value"), size=Mul(Variable("Length"), Number(8))), + Link(Field("Value"), Field("Checksum")), + Link(Field("Checksum"), FINAL, ValidChecksum("Checksum")), + ], + { + Field("Tag"): tlv_with_checksum_tag(), + Field("Length"): tlv_with_checksum_length(), + Field("Value"): OPAQUE, + Field("Checksum"): tlv_with_checksum_checksum(), + }, + checksums={ID("Checksum"): [Variable("Tag"), Size("Value"), Variable("Value")]}, + skip_proof=True, + ) + + +@lru_cache +def tlv_with_checksum_model() -> Model: + return Model( + [tlv_with_checksum_tag(), tlv_with_checksum_length(), tlv_with_checksum_message()], + ) + + +@lru_cache +def null_message_in_tlv_message() -> Refinement: + return Refinement("In_TLV", tlv_message(), Field("Value"), null_message()) + + +@lru_cache +def null_message_in_tlv_message_model() -> Model: + return Model( + [tlv_tag(), tlv_length(), tlv_message(), null_message(), null_message_in_tlv_message()], + ) + + +@lru_cache +def ethernet_address() -> Integer: + return Integer( + "Ethernet::Address", + Number(0), + Sub(Pow(Number(2), Number(48)), Number(1)), + Number(48), + ) + + +@lru_cache +def ethernet_type_length() -> Integer: + return Integer( + "Ethernet::Type_Length", + Number(46), + Sub(Pow(Number(2), Number(16)), Number(1)), + Number(16), + ) + + +@lru_cache +def ethernet_tpid() -> Integer: + return Integer("Ethernet::TPID", Number(0x8100, 16), Number(0x8100, 16), Number(16)) + + +@lru_cache +def ethernet_tci() -> Integer: + return Integer( + "Ethernet::TCI", + Number(0), + Sub(Pow(Number(2), Number(16)), Number(1)), + Number(16), + ) + + +@lru_cache +def ethernet_frame() -> Message: + return Message( + "Ethernet::Frame", + [ + Link(INITIAL, Field("Destination")), + Link(Field("Destination"), Field("Source")), + Link(Field("Source"), Field("Type_Length_TPID")), + Link( + Field("Type_Length_TPID"), + Field("TPID"), + Equal(Variable("Type_Length_TPID"), Number(0x8100, 16)), + first=First("Type_Length_TPID"), ), - ), - ], - { - Field("Destination"): ETHERNET_ADDRESS, - Field("Source"): ETHERNET_ADDRESS, - Field("Type_Length_TPID"): ETHERNET_TYPE_LENGTH, - Field("TPID"): ETHERNET_TPID, - Field("TCI"): ETHERNET_TCI, - Field("Type_Length"): ETHERNET_TYPE_LENGTH, - Field("Payload"): OPAQUE, - }, - skip_proof=True, -) -ETHERNET_MODEL = Model( - [ETHERNET_ADDRESS, ETHERNET_TYPE_LENGTH, ETHERNET_TPID, ETHERNET_TCI, ETHERNET_FRAME], -) + Link( + Field("Type_Length_TPID"), + Field("Type_Length"), + NotEqual(Variable("Type_Length_TPID"), Number(0x8100, 16)), + first=First("Type_Length_TPID"), + ), + Link(Field("TPID"), Field("TCI")), + Link(Field("TCI"), Field("Type_Length")), + Link( + Field("Type_Length"), + Field("Payload"), + LessEqual(Variable("Type_Length"), Number(1500)), + Mul(Variable("Type_Length"), Number(8)), + ), + Link( + Field("Type_Length"), + Field("Payload"), + GreaterEqual(Variable("Type_Length"), Number(1536)), + ), + Link( + Field("Payload"), + FINAL, + And( + GreaterEqual(Div(Size("Payload"), Number(8)), Number(46)), + LessEqual(Div(Size("Payload"), Number(8)), Number(1500)), + ), + ), + ], + { + Field("Destination"): ethernet_address(), + Field("Source"): ethernet_address(), + Field("Type_Length_TPID"): ethernet_type_length(), + Field("TPID"): ethernet_tpid(), + Field("TCI"): ethernet_tci(), + Field("Type_Length"): ethernet_type_length(), + Field("Payload"): OPAQUE, + }, + skip_proof=True, + ) -ENUMERATION_PRIORITY = Enumeration( - "Enumeration::Priority", - [("Low", Number(1)), ("Medium", Number(4)), ("High", Number(7))], - Number(8), - always_valid=True, -) -ENUMERATION_MESSAGE = Message( - "Enumeration::Message", - [Link(INITIAL, Field("Priority")), Link(Field("Priority"), FINAL)], - {Field("Priority"): ENUMERATION_PRIORITY}, - skip_proof=True, -) -ENUMERATION_MODEL = Model([ENUMERATION_PRIORITY, ENUMERATION_MESSAGE]) -SEQUENCE_LENGTH = Integer( - "Sequence::Length", - Number(0), - Sub(Pow(Number(2), Number(8)), Number(1)), - Number(8), -) -SEQUENCE_INTEGER = Integer("Sequence::Integer", Number(1), Number(100), Number(16)) -SEQUENCE_INTEGER_VECTOR = Sequence("Sequence::Integer_Vector", SEQUENCE_INTEGER) -SEQUENCE_ENUMERATION = Enumeration( - "Sequence::Enumeration", - [("Zero", Number(0)), ("One", Number(1)), ("Two", Number(2))], - Number(8), - always_valid=False, -) -SEQUENCE_ENUMERATION_VECTOR = Sequence("Sequence::Enumeration_Vector", SEQUENCE_ENUMERATION) -SEQUENCE_AV_ENUMERATION = Enumeration( - "Sequence::AV_Enumeration", - [("AV_Zero", Number(0)), ("AV_One", Number(1)), ("AV_Two", Number(2))], - Number(8), - always_valid=True, -) -SEQUENCE_AV_ENUMERATION_VECTOR = Sequence( - "Sequence::AV_Enumeration_Vector", - SEQUENCE_AV_ENUMERATION, -) -SEQUENCE_MESSAGE = Message( - "Sequence::Message", - [ - Link(INITIAL, Field("Length")), - Link(Field("Length"), Field("Integer_Vector"), size=Mul(Variable("Length"), Number(8))), - Link(Field("Integer_Vector"), Field("Enumeration_Vector"), size=Number(16)), - Link(Field("Enumeration_Vector"), Field("AV_Enumeration_Vector"), size=Number(16)), - Link(Field("AV_Enumeration_Vector"), FINAL), - ], - { - Field("Length"): SEQUENCE_LENGTH, - Field("Integer_Vector"): SEQUENCE_INTEGER_VECTOR, - Field("Enumeration_Vector"): SEQUENCE_ENUMERATION_VECTOR, - Field("AV_Enumeration_Vector"): SEQUENCE_AV_ENUMERATION_VECTOR, - }, - skip_proof=True, -) -SEQUENCE_INNER_MESSAGE = Message( - "Sequence::Inner_Message", - [ - Link(INITIAL, Field("Length")), - Link(Field("Length"), Field("Payload"), size=Mul(Variable("Length"), Number(8))), - Link(Field("Payload"), FINAL), - ], - {Field("Length"): SEQUENCE_LENGTH, Field("Payload"): OPAQUE}, - skip_proof=True, -) -SEQUENCE_INNER_MESSAGES = Sequence("Sequence::Inner_Messages", SEQUENCE_INNER_MESSAGE) -SEQUENCE_MESSAGES_MESSAGE = Message( - "Sequence::Messages_Message", - [ - Link(INITIAL, Field("Length")), - Link(Field("Length"), Field("Messages"), size=Mul(Variable("Length"), Number(8))), - Link(Field("Messages"), FINAL), - ], - {Field("Length"): SEQUENCE_LENGTH, Field("Messages"): SEQUENCE_INNER_MESSAGES}, - skip_proof=True, -) -SEQUENCE_SEQUENCE_SIZE_DEFINED_BY_MESSAGE_SIZE = Message( - "Sequence::Sequence_Size_Defined_By_Message_Size", - [ - Link(INITIAL, Field("Header")), - Link(Field("Header"), Field("Vector")), - Link(Field("Vector"), FINAL), - ], - { - Field("Header"): SEQUENCE_ENUMERATION, - Field("Vector"): SEQUENCE_INTEGER_VECTOR, - }, - skip_proof=True, -) -SEQUENCE_MODEL = Model( - [ - SEQUENCE_LENGTH, - SEQUENCE_INTEGER, - SEQUENCE_INTEGER_VECTOR, - SEQUENCE_ENUMERATION, - SEQUENCE_ENUMERATION_VECTOR, - SEQUENCE_AV_ENUMERATION, - SEQUENCE_AV_ENUMERATION_VECTOR, - SEQUENCE_MESSAGE, - SEQUENCE_INNER_MESSAGE, - SEQUENCE_INNER_MESSAGES, - SEQUENCE_MESSAGES_MESSAGE, - SEQUENCE_SEQUENCE_SIZE_DEFINED_BY_MESSAGE_SIZE, - ], -) +@lru_cache +def ethernet_model() -> Model: + return Model( + [ + ethernet_address(), + ethernet_type_length(), + ethernet_tpid(), + ethernet_tci(), + ethernet_frame(), + ], + ) -EXPRESSION_MESSAGE = Message( - "Expression::Message", - [ - Link(INITIAL, Field("Payload"), size=Number(16)), - Link(Field("Payload"), FINAL, Equal(Variable("Payload"), Aggregate(Number(1), Number(2)))), - ], - {Field("Payload"): OPAQUE}, - skip_proof=True, -) -EXPRESSION_MODEL = Model([EXPRESSION_MESSAGE]) - -DERIVATION_MESSAGE = DerivedMessage("Derivation::Message", TLV_MESSAGE) -DERIVATION_MODEL = Model([DERIVATION_MESSAGE]) - -VALID_MESSAGE = UnprovenMessage( - "P::M", - [ - Link(INITIAL, Field("F"), size=Number(16)), - Link(Field("F"), FINAL), - ], - {Field("F"): OPAQUE}, -) -INTEGER = Integer("P::Integer", Number(1), Number(220), Number(8)) -ENUMERATION = Enumeration( - "P::Enumeration", - [("Zero", Number(0)), ("One", Number(1)), ("Two", Number(2))], - Number(8), - always_valid=False, - location=Location((10, 2)), -) +@lru_cache +def enumeration_priority() -> Enumeration: + return Enumeration( + "Enumeration::Priority", + [("Low", Number(1)), ("Medium", Number(4)), ("High", Number(7))], + Number(8), + always_valid=True, + ) -MESSAGE = Message( - "P::M", - [ - Link(INITIAL, Field("F"), size=Number(16)), - Link(Field("F"), FINAL), - ], - {Field("F"): OPAQUE}, -) -REFINEMENT = Refinement("In_Message", MESSAGE, Field("F"), MESSAGE) - -UNIVERSAL_MESSAGE_TYPE = Enumeration( - "Universal::Message_Type", - [ - ("MT_Null", Number(0)), - ("MT_Data", Number(1)), - ("MT_Value", Number(2)), - ("MT_Values", Number(3)), - ("MT_Option_Types", Number(4)), - ("MT_Options", Number(5)), - ("MT_Unconstrained_Data", Number(6)), - ("MT_Unconstrained_Options", Number(7)), - ], - size=Number(8), - always_valid=False, -) -UNIVERSAL_LENGTH = Integer( - "Universal::Length", - Number(0), - Sub(Pow(Number(2), Number(16)), Number(1)), - Number(16), -) -UNIVERSAL_VALUE = Integer( - "Universal::Value", - Number(0), - Sub(Pow(Number(2), Number(8)), Number(1)), - Number(8), -) -UNIVERSAL_VALUES = Sequence("Universal::Values", UNIVERSAL_VALUE) -UNIVERSAL_OPTION_TYPE = Enumeration( - "Universal::Option_Type", - [ - ("OT_Null", Number(0)), - ("OT_Data", Number(1)), - ], - size=Number(8), - always_valid=True, -) -UNIVERSAL_OPTION_TYPES = Sequence("Universal::Option_Types", UNIVERSAL_OPTION_TYPE) -UNIVERSAL_OPTION = Message( - "Universal::Option", - [ - Link(INITIAL, Field("Option_Type")), - Link( - Field("Option_Type"), - FINAL, - condition=Equal(Variable("Option_Type"), Variable("OT_Null")), - ), - Link( - Field("Option_Type"), - Field("Length"), - condition=Equal(Variable("Option_Type"), Variable("OT_Data")), - ), - Link( - Field("Length"), - Field("Data"), - size=Mul(Variable("Length"), Number(8)), - ), - Link(Field("Data"), FINAL), - ], - { - Field("Option_Type"): UNIVERSAL_OPTION_TYPE, - Field("Length"): UNIVERSAL_LENGTH, - Field("Data"): OPAQUE, - }, -) -UNIVERSAL_OPTIONS = Sequence("Universal::Options", UNIVERSAL_OPTION) -UNIVERSAL_MESSAGE = Message( - "Universal::Message", - [ - Link(INITIAL, Field("Message_Type")), - Link( - Field("Message_Type"), - FINAL, - condition=Equal(Variable("Message_Type"), Variable("MT_Null")), - ), - Link( - Field("Message_Type"), - Field("Data"), - condition=Equal(Variable("Message_Type"), Variable("MT_Unconstrained_Data")), - ), - Link( - Field("Message_Type"), - Field("Options"), - condition=Equal(Variable("Message_Type"), Variable("MT_Unconstrained_Options")), - ), - Link( - Field("Message_Type"), - Field("Length"), - condition=And( - NotEqual(Variable("Message_Type"), Variable("MT_Null")), - NotEqual(Variable("Message_Type"), Variable("MT_Unconstrained_Data")), - NotEqual(Variable("Message_Type"), Variable("MT_Unconstrained_Options")), + +@lru_cache +def enumeration_message() -> Message: + return Message( + "Enumeration::Message", + [Link(INITIAL, Field("Priority")), Link(Field("Priority"), FINAL)], + {Field("Priority"): enumeration_priority()}, + skip_proof=True, + ) + + +@lru_cache +def enumeration_model() -> Model: + return Model([enumeration_priority(), enumeration_message()]) + + +@lru_cache +def sequence_length() -> Integer: + return Integer( + "Sequence::Length", + Number(0), + Sub(Pow(Number(2), Number(8)), Number(1)), + Number(8), + ) + + +@lru_cache +def sequence_integer() -> Integer: + return Integer("Sequence::Integer", Number(1), Number(100), Number(16)) + + +@lru_cache +def sequence_integer_vector() -> Sequence: + return Sequence("Sequence::Integer_Vector", sequence_integer()) + + +@lru_cache +def sequence_enumeration() -> Enumeration: + return Enumeration( + "Sequence::Enumeration", + [("Zero", Number(0)), ("One", Number(1)), ("Two", Number(2))], + Number(8), + always_valid=False, + ) + + +@lru_cache +def sequence_enumeration_vector() -> Sequence: + return Sequence("Sequence::Enumeration_Vector", sequence_enumeration()) + + +@lru_cache +def sequence_av_enumeration() -> Enumeration: + return Enumeration( + "Sequence::AV_Enumeration", + [("AV_Zero", Number(0)), ("AV_One", Number(1)), ("AV_Two", Number(2))], + Number(8), + always_valid=True, + ) + + +@lru_cache +def sequence_av_enumeration_vector() -> Sequence: + return Sequence( + "Sequence::AV_Enumeration_Vector", + sequence_av_enumeration(), + ) + + +@lru_cache +def sequence_message() -> Message: + return Message( + "Sequence::Message", + [ + Link(INITIAL, Field("Length")), + Link(Field("Length"), Field("Integer_Vector"), size=Mul(Variable("Length"), Number(8))), + Link(Field("Integer_Vector"), Field("Enumeration_Vector"), size=Number(16)), + Link(Field("Enumeration_Vector"), Field("AV_Enumeration_Vector"), size=Number(16)), + Link(Field("AV_Enumeration_Vector"), FINAL), + ], + { + Field("Length"): sequence_length(), + Field("Integer_Vector"): sequence_integer_vector(), + Field("Enumeration_Vector"): sequence_enumeration_vector(), + Field("AV_Enumeration_Vector"): sequence_av_enumeration_vector(), + }, + skip_proof=True, + ) + + +@lru_cache +def sequence_inner_message() -> Message: + return Message( + "Sequence::Inner_Message", + [ + Link(INITIAL, Field("Length")), + Link(Field("Length"), Field("Payload"), size=Mul(Variable("Length"), Number(8))), + Link(Field("Payload"), FINAL), + ], + {Field("Length"): sequence_length(), Field("Payload"): OPAQUE}, + skip_proof=True, + ) + + +@lru_cache +def sequence_inner_messages() -> Sequence: + return Sequence("Sequence::Inner_Messages", sequence_inner_message()) + + +@lru_cache +def sequence_messages_message() -> Message: + return Message( + "Sequence::Messages_Message", + [ + Link(INITIAL, Field("Length")), + Link(Field("Length"), Field("Messages"), size=Mul(Variable("Length"), Number(8))), + Link(Field("Messages"), FINAL), + ], + {Field("Length"): sequence_length(), Field("Messages"): sequence_inner_messages()}, + skip_proof=True, + ) + + +@lru_cache +def sequence_sequence_size_defined_by_message_size() -> Message: + return Message( + "Sequence::Sequence_Size_Defined_By_Message_Size", + [ + Link(INITIAL, Field("Header")), + Link(Field("Header"), Field("Vector")), + Link(Field("Vector"), FINAL), + ], + { + Field("Header"): sequence_enumeration(), + Field("Vector"): sequence_integer_vector(), + }, + skip_proof=True, + ) + + +@lru_cache +def sequence_model() -> Model: + return Model( + [ + sequence_length(), + sequence_integer(), + sequence_integer_vector(), + sequence_enumeration(), + sequence_enumeration_vector(), + sequence_av_enumeration(), + sequence_av_enumeration_vector(), + sequence_message(), + sequence_inner_message(), + sequence_inner_messages(), + sequence_messages_message(), + sequence_sequence_size_defined_by_message_size(), + ], + ) + + +@lru_cache +def expression_message() -> Message: + return Message( + "Expression::Message", + [ + Link(INITIAL, Field("Payload"), size=Number(16)), + Link( + Field("Payload"), + FINAL, + Equal(Variable("Payload"), Aggregate(Number(1), Number(2))), + ), + ], + {Field("Payload"): OPAQUE}, + skip_proof=True, + ) + + +@lru_cache +def expression_model() -> Model: + return Model([expression_message()]) + + +@lru_cache +def derivation_message() -> DerivedMessage: + return DerivedMessage("Derivation::Message", tlv_message()) + + +@lru_cache +def derivation_model() -> Model: + return Model([derivation_message()]) + + +@lru_cache +def valid_message() -> UnprovenMessage: + return UnprovenMessage( + "P::M", + [ + Link(INITIAL, Field("F"), size=Number(16)), + Link(Field("F"), FINAL), + ], + {Field("F"): OPAQUE}, + ) + + +@lru_cache +def integer() -> Integer: + return Integer("P::Integer", Number(1), Number(220), Number(8)) + + +@lru_cache +def enumeration() -> Enumeration: + return Enumeration( + "P::Enumeration", + [("Zero", Number(0)), ("One", Number(1)), ("Two", Number(2))], + Number(8), + always_valid=False, + location=Location((10, 2)), + ) + + +@lru_cache +def message() -> Message: + return Message( + "P::M", + [ + Link(INITIAL, Field("F"), size=Number(16)), + Link(Field("F"), FINAL), + ], + {Field("F"): OPAQUE}, + ) + + +@lru_cache +def refinement() -> Refinement: + return Refinement("In_Message", message(), Field("F"), message()) + + +@lru_cache +def universal_message_type() -> Enumeration: + return Enumeration( + "Universal::Message_Type", + [ + ("MT_Null", Number(0)), + ("MT_Data", Number(1)), + ("MT_Value", Number(2)), + ("MT_Values", Number(3)), + ("MT_Option_Types", Number(4)), + ("MT_Options", Number(5)), + ("MT_Unconstrained_Data", Number(6)), + ("MT_Unconstrained_Options", Number(7)), + ], + size=Number(8), + always_valid=False, + ) + + +@lru_cache +def universal_length() -> Integer: + return Integer( + "Universal::Length", + Number(0), + Sub(Pow(Number(2), Number(16)), Number(1)), + Number(16), + ) + + +@lru_cache +def universal_value() -> Integer: + return Integer( + "Universal::Value", + Number(0), + Sub(Pow(Number(2), Number(8)), Number(1)), + Number(8), + ) + + +@lru_cache +def universal_values() -> Sequence: + return Sequence("Universal::Values", universal_value()) + + +@lru_cache +def universal_option_type() -> Enumeration: + return Enumeration( + "Universal::Option_Type", + [ + ("OT_Null", Number(0)), + ("OT_Data", Number(1)), + ], + size=Number(8), + always_valid=True, + ) + + +@lru_cache +def universal_option_types() -> Sequence: + return Sequence("Universal::Option_Types", universal_option_type()) + + +@lru_cache +def universal_option() -> Message: + return Message( + "Universal::Option", + [ + Link(INITIAL, Field("Option_Type")), + Link( + Field("Option_Type"), + FINAL, + condition=Equal(Variable("Option_Type"), Variable("OT_Null")), ), - ), - Link( - Field("Length"), - Field("Data"), - condition=Equal(Variable("Message_Type"), Variable("MT_Data")), - size=Mul(Variable("Length"), Number(8)), - ), - Link(Field("Data"), FINAL), - Link( - Field("Length"), - Field("Value"), - condition=And( - Equal(Variable("Message_Type"), Variable("MT_Value")), - Equal(Variable("Length"), Div(Size("Universal::Value"), Number(8))), + Link( + Field("Option_Type"), + Field("Length"), + condition=Equal(Variable("Option_Type"), Variable("OT_Data")), ), - ), - Link(Field("Value"), FINAL), - Link( - Field("Length"), - Field("Values"), - condition=Equal(Variable("Message_Type"), Variable("MT_Values")), - size=Mul(Variable("Length"), Number(8)), - ), - Link(Field("Values"), FINAL), - Link( - Field("Length"), - Field("Option_Types"), - condition=Equal(Variable("Message_Type"), Variable("MT_Option_Types")), - size=Mul(Variable("Length"), Number(8)), - ), - Link(Field("Option_Types"), FINAL), - Link( - Field("Length"), - Field("Options"), - condition=Equal(Variable("Message_Type"), Variable("MT_Options")), - size=Mul(Variable("Length"), Number(8)), - ), - Link(Field("Options"), FINAL), - ], - { - Field("Message_Type"): UNIVERSAL_MESSAGE_TYPE, - Field("Length"): UNIVERSAL_LENGTH, - Field("Data"): OPAQUE, - Field("Value"): UNIVERSAL_VALUE, - Field("Values"): UNIVERSAL_VALUES, - Field("Option_Types"): UNIVERSAL_OPTION_TYPES, - Field("Options"): UNIVERSAL_OPTIONS, - }, -) -UNIVERSAL_REFINEMENT = Refinement("Universal", UNIVERSAL_MESSAGE, Field("Data"), UNIVERSAL_OPTION) -UNIVERSAL_MODEL = Model( - [ - UNIVERSAL_MESSAGE_TYPE, - UNIVERSAL_LENGTH, - OPAQUE, - UNIVERSAL_OPTION_TYPE, - UNIVERSAL_OPTION_TYPES, - UNIVERSAL_OPTION, - UNIVERSAL_OPTIONS, - UNIVERSAL_VALUE, - UNIVERSAL_VALUES, - UNIVERSAL_MESSAGE, - UNIVERSAL_REFINEMENT, - ], -) + Link( + Field("Length"), + Field("Data"), + size=Mul(Variable("Length"), Number(8)), + ), + Link(Field("Data"), FINAL), + ], + { + Field("Option_Type"): universal_option_type(), + Field("Length"): universal_length(), + Field("Data"): OPAQUE, + }, + ) + + +@lru_cache +def universal_options() -> Sequence: + return Sequence("Universal::Options", universal_option()) -FIXED_SIZE_MESSAGE = Message( - "Fixed_Size::Message", - [ - Link(INITIAL, Field("Message_Type")), - Link( - Field("Message_Type"), - Field("Data"), - condition=Or( - Equal(Variable("Message_Type"), Variable("Universal::MT_Null")), - Equal(Variable("Message_Type"), Variable("Universal::MT_Data")), - Equal(Variable("Message_Type"), Variable("Universal::MT_Values")), - Equal(Variable("Message_Type"), Variable("Universal::MT_Options")), + +@lru_cache +def universal_message() -> Message: + return Message( + "Universal::Message", + [ + Link(INITIAL, Field("Message_Type")), + Link( + Field("Message_Type"), + FINAL, + condition=Equal(Variable("Message_Type"), Variable("MT_Null")), ), - size=Number(64), - ), - Link( - Field("Data"), - Field("Values"), - size=Mul(Number(8), Size("Universal::Value")), - ), - Link( - Field("Values"), - Field("Options"), - size=Number(64), - ), - Link( - Field("Options"), - FINAL, - ), - ], - { - Field("Message_Type"): UNIVERSAL_MESSAGE_TYPE, - Field("Data"): OPAQUE, - Field("Values"): UNIVERSAL_VALUES, - Field("Options"): UNIVERSAL_OPTIONS, - }, -) -FIXED_SIZE_SIMPLE_MESSAGE = Message( - "Fixed_Size::Simple_Message", - [ - Link(INITIAL, Field("Message_Type")), - Link( - Field("Message_Type"), - Field("Data"), - condition=Equal(Variable("Message_Type"), Variable("Universal::OT_Data")), - size=Number(24), - ), - Link( - Field("Data"), - FINAL, - ), - ], - { - Field("Message_Type"): UNIVERSAL_OPTION_TYPE, - Field("Data"): OPAQUE, - }, -) + Link( + Field("Message_Type"), + Field("Data"), + condition=Equal(Variable("Message_Type"), Variable("MT_Unconstrained_Data")), + ), + Link( + Field("Message_Type"), + Field("Options"), + condition=Equal(Variable("Message_Type"), Variable("MT_Unconstrained_Options")), + ), + Link( + Field("Message_Type"), + Field("Length"), + condition=And( + NotEqual(Variable("Message_Type"), Variable("MT_Null")), + NotEqual(Variable("Message_Type"), Variable("MT_Unconstrained_Data")), + NotEqual(Variable("Message_Type"), Variable("MT_Unconstrained_Options")), + ), + ), + Link( + Field("Length"), + Field("Data"), + condition=Equal(Variable("Message_Type"), Variable("MT_Data")), + size=Mul(Variable("Length"), Number(8)), + ), + Link(Field("Data"), FINAL), + Link( + Field("Length"), + Field("Value"), + condition=And( + Equal(Variable("Message_Type"), Variable("MT_Value")), + Equal(Variable("Length"), Div(Size("Universal::Value"), Number(8))), + ), + ), + Link(Field("Value"), FINAL), + Link( + Field("Length"), + Field("Values"), + condition=Equal(Variable("Message_Type"), Variable("MT_Values")), + size=Mul(Variable("Length"), Number(8)), + ), + Link(Field("Values"), FINAL), + Link( + Field("Length"), + Field("Option_Types"), + condition=Equal(Variable("Message_Type"), Variable("MT_Option_Types")), + size=Mul(Variable("Length"), Number(8)), + ), + Link(Field("Option_Types"), FINAL), + Link( + Field("Length"), + Field("Options"), + condition=Equal(Variable("Message_Type"), Variable("MT_Options")), + size=Mul(Variable("Length"), Number(8)), + ), + Link(Field("Options"), FINAL), + ], + { + Field("Message_Type"): universal_message_type(), + Field("Length"): universal_length(), + Field("Data"): OPAQUE, + Field("Value"): universal_value(), + Field("Values"): universal_values(), + Field("Option_Types"): universal_option_types(), + Field("Options"): universal_options(), + }, + ) -DEFINITE_MESSAGE = Message( - "Definite::Message", - [ - Link(INITIAL, Field("Length")), - Link( - Field("Length"), - Field("Data"), - size=Mul(Variable("Length"), Number(8)), - ), - Link( - Field("Data"), - FINAL, - ), - ], - { - Field("Length"): UNIVERSAL_LENGTH, - Field("Data"): OPAQUE, - }, -) -SESSION = Session( - identifier="P::S", - states=[ - State("A", transitions=[Transition(target="null")]), - ], - declarations=[], - parameters=[], - types=[], -) +@lru_cache +def universal_refinement() -> Refinement: + return Refinement("Universal", universal_message(), Field("Data"), universal_option()) + + +@lru_cache +def universal_model() -> Model: + return Model( + [ + universal_message_type(), + universal_length(), + OPAQUE, + universal_option_type(), + universal_option_types(), + universal_option(), + universal_options(), + universal_value(), + universal_values(), + universal_message(), + universal_refinement(), + ], + ) + + +@lru_cache +def fixed_size_message() -> Message: + return Message( + "Fixed_Size::Message", + [ + Link(INITIAL, Field("Message_Type")), + Link( + Field("Message_Type"), + Field("Data"), + condition=Or( + Equal(Variable("Message_Type"), Variable("Universal::MT_Null")), + Equal(Variable("Message_Type"), Variable("Universal::MT_Data")), + Equal(Variable("Message_Type"), Variable("Universal::MT_Values")), + Equal(Variable("Message_Type"), Variable("Universal::MT_Options")), + ), + size=Number(64), + ), + Link( + Field("Data"), + Field("Values"), + size=Mul(Number(8), Size("Universal::Value")), + ), + Link( + Field("Values"), + Field("Options"), + size=Number(64), + ), + Link( + Field("Options"), + FINAL, + ), + ], + { + Field("Message_Type"): universal_message_type(), + Field("Data"): OPAQUE, + Field("Values"): universal_values(), + Field("Options"): universal_options(), + }, + ) + + +@lru_cache +def fixed_size_simple_message() -> Message: + return Message( + "Fixed_Size::Simple_Message", + [ + Link(INITIAL, Field("Message_Type")), + Link( + Field("Message_Type"), + Field("Data"), + condition=Equal(Variable("Message_Type"), Variable("Universal::OT_Data")), + size=Number(24), + ), + Link( + Field("Data"), + FINAL, + ), + ], + { + Field("Message_Type"): universal_option_type(), + Field("Data"): OPAQUE, + }, + ) + + +@lru_cache +def definite_message() -> Message: + return Message( + "Definite::Message", + [ + Link(INITIAL, Field("Length")), + Link( + Field("Length"), + Field("Data"), + size=Mul(Variable("Length"), Number(8)), + ), + Link( + Field("Data"), + FINAL, + ), + ], + { + Field("Length"): universal_length(), + Field("Data"): OPAQUE, + }, + ) + + +@lru_cache +def session() -> Session: + return Session( + identifier="P::S", + states=[ + State("A", transitions=[Transition(target="null")]), + ], + declarations=[], + parameters=[], + types=[], + ) + + +def spark_test_models() -> list[Model]: + """Return models corresponding to generated code in tests/spark/generated.""" + return [ + derivation_model(), + enumeration_model(), + ethernet_model(), + expression_model(), + null_message_in_tlv_message_model(), + null_model(), + sequence_model(), + tlv_model(), + Model(fixed_size_simple_message().dependencies), + ] diff --git a/tests/integration/pyrflx_test.py b/tests/integration/pyrflx_test.py index c6cb6a0e1..60e06532a 100644 --- a/tests/integration/pyrflx_test.py +++ b/tests/integration/pyrflx_test.py @@ -31,7 +31,7 @@ from rflx.model.type_ import OPAQUE from rflx.pyrflx import MessageValue, Package, PyRFLX, PyRFLXError, TypeValue, utils from tests.const import CAPTURED_DIR, SPEC_DIR -from tests.data.models import TLV_LENGTH, TLV_TAG +from tests.data import models def test_ethernet_set_tltpid(ethernet_frame_value: MessageValue) -> None: @@ -441,10 +441,14 @@ def test_tlv_message_with_not_operator() -> None: Link(Field("Length"), Field("Value"), size=Mul(Variable("Length"), Number(8))), Link(Field("Value"), FINAL), ], - {Field("Tag"): TLV_TAG, Field("Length"): TLV_LENGTH, Field("Value"): OPAQUE}, + { + Field("Tag"): models.tlv_tag(), + Field("Length"): models.tlv_length(), + Field("Value"): OPAQUE, + }, ) - model = PyRFLX(model=Model([TLV_TAG, TLV_LENGTH, message])) + model = PyRFLX(model=Model([models.tlv_tag(), models.tlv_length(), message])) pkg = model.package("TLV") msg = pkg.new_message("Message_With_Not_Operator") test_bytes = b"\x01\x00\x04\x00\x00\x00\x00" @@ -480,7 +484,11 @@ def test_tlv_message_with_not_operator_exhausting() -> None: Link(Field("Length"), Field("Value"), size=Mul(Variable("Length"), Number(8))), Link(Field("Value"), FINAL), ], - {Field("Tag"): TLV_TAG, Field("Length"): TLV_LENGTH, Field("Value"): OPAQUE}, + { + Field("Tag"): models.tlv_tag(), + Field("Length"): models.tlv_length(), + Field("Value"): OPAQUE, + }, ) with pytest.raises( @@ -503,4 +511,4 @@ def test_tlv_message_with_not_operator_exhausting() -> None: + "$" ), ): - PyRFLX(model=Model([TLV_TAG, TLV_LENGTH, message])) + PyRFLX(model=Model([models.tlv_tag(), models.tlv_length(), message])) diff --git a/tests/unit/expression_test.py b/tests/unit/expression_test.py index 8f6f705f8..6ac34e072 100644 --- a/tests/unit/expression_test.py +++ b/tests/unit/expression_test.py @@ -71,7 +71,7 @@ ) from rflx.identifier import ID, StrID, id_generator from rflx.model import Integer -from tests.data.models import ENUMERATION +from tests.data import models from tests.utils import assert_equal, check_regex EXPR = Equal(Variable("UNDEFINED_1"), Variable("UNDEFINED_2")) @@ -3069,7 +3069,7 @@ def test_case_findall() -> None: def test_case_type() -> None: - c1 = Variable("C", type_=ENUMERATION.type_) + c1 = Variable("C", type_=models.enumeration().type_) assert_type(CaseExpr(c1, [([ID("Zero"), ID("One")], TRUE), ([ID("Two")], FALSE)]), rty.BOOLEAN) assert_type( CaseExpr(c1, [([ID("Zero"), ID("One")], Number(1)), ([ID("Two")], Number(2))]), @@ -3106,7 +3106,7 @@ def test_case_simplified() -> None: def test_case_invalid() -> None: assert_type_error( CaseExpr( - Variable("C", type_=ENUMERATION.type_), + Variable("C", type_=models.enumeration().type_), [([ID("Zero")], TRUE), ([ID("One")], FALSE)], location=Location((1, 2)), ), @@ -3115,7 +3115,7 @@ def test_case_invalid() -> None: ) assert_type_error( CaseExpr( - Variable("C", type_=ENUMERATION.type_), + Variable("C", type_=models.enumeration().type_), [([ID("Zero"), ID("One")], TRUE), ([ID("Two"), ID("Invalid")], FALSE)], location=Location((1, 2)), ), @@ -3124,7 +3124,7 @@ def test_case_invalid() -> None: ) assert_type_error( CaseExpr( - Variable("C", type_=ENUMERATION.type_), + Variable("C", type_=models.enumeration().type_), [ ([ID("Zero"), ID("One", location=Location((2, 2)))], TRUE), ([ID("Two"), ID("One", location=Location((3, 2)))], FALSE), diff --git a/tests/unit/generator_test.py b/tests/unit/generator_test.py index a49d6ec13..280d69f32 100644 --- a/tests/unit/generator_test.py +++ b/tests/unit/generator_test.py @@ -31,18 +31,6 @@ from tests.data import models from tests.utils import assert_equal, assert_equal_code -MODELS = [ - models.DERIVATION_MODEL, - models.ENUMERATION_MODEL, - models.ETHERNET_MODEL, - models.EXPRESSION_MODEL, - models.NULL_MESSAGE_IN_TLV_MESSAGE_MODEL, - models.NULL_MODEL, - models.SEQUENCE_MODEL, - models.TLV_MODEL, - Model(models.FIXED_SIZE_SIMPLE_MESSAGE.dependencies), -] - MSG_TY = rty.Message("M") SEQ_TY = rty.Sequence("S", rty.Message("M")) @@ -60,12 +48,12 @@ def test_unsupported_checksum(tmp_path: Path) -> None: r" \(consider --ignore-unsupported-checksum option\)$" ), ): - Generator().generate(models.TLV_WITH_CHECKSUM_MODEL, Integration(), tmp_path) + Generator().generate(models.tlv_with_checksum_model(), Integration(), tmp_path) def test_ignore_unsupported_checksum(capsys: pytest.CaptureFixture[str], tmp_path: Path) -> None: Generator(ignore_unsupported_checksum=True).generate( - models.TLV_WITH_CHECKSUM_MODEL, + models.tlv_with_checksum_model(), Integration(), tmp_path, ) @@ -149,8 +137,8 @@ def test_generate_missing_template_files(monkeypatch: pytest.MonkeyPatch, tmp_pa def test_generate_partial_update(tmp_path: Path) -> None: - Generator().generate(models.TLV_MODEL, Integration(), tmp_path) - Generator().generate(models.TLV_MODEL, Integration(), tmp_path) + Generator().generate(models.tlv_model(), Integration(), tmp_path) + Generator().generate(models.tlv_model(), Integration(), tmp_path) with pytest.raises( RecordFluxError, match=( @@ -162,10 +150,10 @@ def test_generate_partial_update(tmp_path: Path) -> None: "$" ), ): - Generator().generate(models.ETHERNET_MODEL, Integration(), tmp_path) + Generator().generate(models.ethernet_model(), Integration(), tmp_path) -@pytest.mark.parametrize("model", MODELS) +@pytest.mark.parametrize("model", models.spark_test_models()) def test_equality(model: Model, tmp_path: Path) -> None: assert_equal_code(model, Integration(), GENERATED_DIR, tmp_path, accept_extra_files=True) @@ -235,7 +223,7 @@ def test_substitution_relation_aggregate( expected = equal_call if relation == expr.Equal else expr.Not(equal_call) assert ( - relation(left, right).substituted(common.substitution(models.TLV_MESSAGE, "", embedded)) + relation(left, right).substituted(common.substitution(models.tlv_message(), "", embedded)) == expected ) @@ -266,7 +254,7 @@ def test_substitution_relation_boolean_literal( expected_right: expr.Expr, ) -> None: assert relation(left, right).substituted( - common.substitution(models.TLV_MESSAGE, ""), + common.substitution(models.tlv_message(), ""), ) == relation(expected_left, expected_right) @@ -295,7 +283,7 @@ def test_substitution_relation_scalar( ) -> None: assert_equal( relation(*expressions).substituted( - common.substitution(models.TLV_MESSAGE, "", public=True), + common.substitution(models.tlv_message(), "", public=True), ), relation(*expected), ) @@ -321,7 +309,7 @@ def test_prefixed_type_identifier() -> None: ], declarations=[], parameters=[], - types={t.identifier: t for t in models.UNIVERSAL_MODEL.types}, + types={t.identifier: t for t in models.universal_model().types}, location=None, variable_id=id_generator(), ) @@ -2162,7 +2150,7 @@ def test_generate_field_size_optimization() -> None: ), ], { - Field("Length"): models.UNIVERSAL_LENGTH, + Field("Length"): models.universal_length(), Field("Data"): mty.OPAQUE, }, ) @@ -2183,7 +2171,7 @@ def test_generate_field_size_optimization() -> None: def test_generate_string_substitution() -> None: - subst = common.substitution(models.DEFINITE_MESSAGE, "") + subst = common.substitution(models.definite_message(), "") assert subst(expr.String("abc")) == expr.Aggregate( expr.Number(97), expr.Number(98), diff --git a/tests/unit/model/cache_test.py b/tests/unit/model/cache_test.py index fbdb1298e..ba45efdd3 100644 --- a/tests/unit/model/cache_test.py +++ b/tests/unit/model/cache_test.py @@ -4,7 +4,7 @@ from rflx import expression as expr, model from rflx.model import cache -from tests.data.models import TLV_MESSAGE +from tests.data import models def test_init(tmp_path: Path) -> None: @@ -98,6 +98,6 @@ def test_verified(tmp_path: Path) -> None: def test_verified_disabled(tmp_path: Path) -> None: c = cache.Cache(tmp_path / "test.json", enabled=False) - assert not c.is_verified(TLV_MESSAGE) - c.add_verified(TLV_MESSAGE) - assert not c.is_verified(TLV_MESSAGE) + assert not c.is_verified(models.tlv_message()) + c.add_verified(models.tlv_message()) + assert not c.is_verified(models.tlv_message()) diff --git a/tests/unit/model/message_test.py b/tests/unit/model/message_test.py index ba205aeb8..cd8f42132 100644 --- a/tests/unit/model/message_test.py +++ b/tests/unit/model/message_test.py @@ -63,36 +63,7 @@ UnprovenMessage, ) from rflx.model.message import ByteOrder -from tests.data.models import ( - DEFINITE_MESSAGE, - ENUMERATION, - ETHERNET_FRAME, - FIXED_SIZE_MESSAGE, - FIXED_SIZE_SIMPLE_MESSAGE, - INTEGER, - MESSAGE, - NULL_MESSAGE, - REFINEMENT, - SEQUENCE_INNER_MESSAGE, - SEQUENCE_INNER_MESSAGES, - SEQUENCE_INTEGER_VECTOR, - SEQUENCE_LENGTH, - SEQUENCE_MESSAGE, - SEQUENCE_MESSAGES_MESSAGE, - TLV_LENGTH, - TLV_MESSAGE, - TLV_TAG, - UNIVERSAL_LENGTH, - UNIVERSAL_MESSAGE, - UNIVERSAL_MESSAGE_TYPE, - UNIVERSAL_OPTION, - UNIVERSAL_OPTION_TYPE, - UNIVERSAL_OPTION_TYPES, - UNIVERSAL_OPTIONS, - UNIVERSAL_REFINEMENT, - UNIVERSAL_VALUE, - UNIVERSAL_VALUES, -) +from tests.data import models from tests.utils import assert_equal, assert_message_model_error M_NO_REF = UnprovenMessage( @@ -112,9 +83,9 @@ ], { Field("F1"): OPAQUE, - Field("F2"): INTEGER, - Field("F3"): ENUMERATION, - Field("F4"): INTEGER, + Field("F2"): models.integer(), + Field("F3"): models.enumeration(), + Field("F4"): models.integer(), }, ) @@ -139,12 +110,12 @@ Link(Field("F6"), FINAL), ], { - Field("F1"): deepcopy(INTEGER), - Field("F2"): deepcopy(INTEGER), - Field("F3"): deepcopy(INTEGER), + Field("F1"): deepcopy(models.integer()), + Field("F2"): deepcopy(models.integer()), + Field("F3"): deepcopy(models.integer()), Field("NR"): deepcopy(M_NO_REF), - Field("F5"): deepcopy(INTEGER), - Field("F6"): deepcopy(INTEGER), + Field("F5"): deepcopy(models.integer()), + Field("F6"): deepcopy(models.integer()), }, ) @@ -174,9 +145,9 @@ ], { Field("F1"): OPAQUE, - Field("F2"): INTEGER, - Field("F3"): ENUMERATION, - Field("F4"): INTEGER, + Field("F2"): models.integer(), + Field("F3"): models.enumeration(), + Field("F4"): models.integer(), }, ) @@ -212,10 +183,10 @@ ), ], { - Field("P1"): INTEGER, - Field("P2"): ENUMERATION, + Field("P1"): models.integer(), + Field("P2"): models.enumeration(), Field("F1"): OPAQUE, - Field("F2"): INTEGER, + Field("F2"): models.integer(), }, ) @@ -228,7 +199,7 @@ Link(Field("F1"), FINAL, condition=Equal(Variable("P1"), Number(2))), Link(Field("F2"), FINAL), ], - {Field("P1"): INTEGER, Field("F1"): INTEGER, Field("F2"): INTEGER}, + {Field("P1"): models.integer(), Field("F1"): models.integer(), Field("F2"): models.integer()}, ) @@ -238,7 +209,7 @@ Link(INITIAL, Field("PNR")), Link(Field("PNR"), FINAL), ], - {Field("P2"): INTEGER, Field("PNR"): deepcopy(M_PARAM_NO_REF)}, + {Field("P2"): models.integer(), Field("PNR"): deepcopy(M_PARAM_NO_REF)}, ) @@ -261,11 +232,17 @@ def test_invalid_identifier() -> None: @pytest.mark.parametrize( "parameter_type", - [NULL_MESSAGE, TLV_MESSAGE, SEQUENCE_INTEGER_VECTOR, SEQUENCE_INNER_MESSAGES, OPAQUE], + [ + models.null_message(), + models.tlv_message(), + models.sequence_integer_vector(), + models.sequence_inner_messages(), + OPAQUE, + ], ) def test_invalid_parameter_type_composite(parameter_type: Type) -> None: structure = [Link(INITIAL, Field("X")), Link(Field("X"), FINAL)] - types = {Field(ID("P", Location((1, 2)))): parameter_type, Field("X"): INTEGER} + types = {Field(ID("P", Location((1, 2)))): parameter_type, Field("X"): models.integer()} assert_message_model_error( structure, @@ -282,7 +259,7 @@ def test_invalid_parameter_type_always_valid_enum() -> None: always_valid=True, ) structure = [Link(INITIAL, Field("X")), Link(Field("X"), FINAL)] - types = {Field(ID("P", Location((1, 2)))): always_valid_enum, Field("X"): INTEGER} + types = {Field(ID("P", Location((1, 2)))): always_valid_enum, Field("X"): models.integer()} assert_message_model_error( structure, @@ -436,7 +413,7 @@ def test_unsupported_expression() -> None: ), ] - types = {x: INTEGER} + types = {x: models.integer()} assert_message_model_error( structure, @@ -490,7 +467,7 @@ def test_cycle() -> None: def test_parameters() -> None: - assert not ETHERNET_FRAME.parameters + assert not models.ethernet_frame().parameters assert PARAMETERIZED_MESSAGE.parameters == ( Field("P1"), Field("P2"), @@ -498,7 +475,7 @@ def test_parameters() -> None: def test_fields() -> None: - assert ETHERNET_FRAME.fields == ( + assert models.ethernet_frame().fields == ( Field("Destination"), Field("Source"), Field("Type_Length_TPID"), @@ -515,30 +492,30 @@ def test_fields() -> None: def test_parameter_types() -> None: assert PARAMETERIZED_MESSAGE.parameter_types == { - Field("P1"): INTEGER, - Field("P2"): ENUMERATION, + Field("P1"): models.integer(), + Field("P2"): models.enumeration(), } def test_field_types() -> None: assert PARAMETERIZED_MESSAGE.field_types == { Field("F1"): OPAQUE, - Field("F2"): INTEGER, + Field("F2"): models.integer(), } def test_path_condition() -> None: - assert_equal(ETHERNET_FRAME.path_condition(INITIAL), TRUE) + assert_equal(models.ethernet_frame().path_condition(INITIAL), TRUE) assert_equal( - ETHERNET_FRAME.path_condition(Field("TPID")), + models.ethernet_frame().path_condition(Field("TPID")), Equal(Variable("Type_Length_TPID"), Number(33024, 16)), ) assert_equal( - ETHERNET_FRAME.path_condition(Field("Type_Length")), + models.ethernet_frame().path_condition(Field("Type_Length")), TRUE, ) assert_equal( - ETHERNET_FRAME.path_condition(Field("Payload")), + models.ethernet_frame().path_condition(Field("Payload")), Or( LessEqual(Variable("Type_Length"), Number(1500)), GreaterEqual(Variable("Type_Length"), Number(1536)), @@ -547,9 +524,9 @@ def test_path_condition() -> None: def test_incoming() -> None: - assert_equal(ETHERNET_FRAME.incoming(INITIAL), []) + assert_equal(models.ethernet_frame().incoming(INITIAL), []) assert_equal( - ETHERNET_FRAME.incoming(Field("Type_Length")), + models.ethernet_frame().incoming(Field("Type_Length")), [ Link(Field("TCI"), Field("Type_Length")), Link( @@ -561,7 +538,7 @@ def test_incoming() -> None: ], ) assert_equal( - ETHERNET_FRAME.incoming(FINAL), + models.ethernet_frame().incoming(FINAL), [ Link( Field("Payload"), @@ -576,9 +553,9 @@ def test_incoming() -> None: def test_outgoing() -> None: - assert_equal(ETHERNET_FRAME.outgoing(INITIAL), [Link(INITIAL, Field("Destination"))]) + assert_equal(models.ethernet_frame().outgoing(INITIAL), [Link(INITIAL, Field("Destination"))]) assert_equal( - ETHERNET_FRAME.outgoing(Field("Type_Length")), + models.ethernet_frame().outgoing(Field("Type_Length")), [ Link( Field("Type_Length"), @@ -594,27 +571,30 @@ def test_outgoing() -> None: ), ], ) - assert_equal(ETHERNET_FRAME.outgoing(FINAL), []) + assert_equal(models.ethernet_frame().outgoing(FINAL), []) def test_direct_predecessors() -> None: - assert_equal(ETHERNET_FRAME.direct_predecessors(INITIAL), []) + assert_equal(models.ethernet_frame().direct_predecessors(INITIAL), []) assert_equal( - ETHERNET_FRAME.direct_predecessors(Field("Type_Length")), + models.ethernet_frame().direct_predecessors(Field("Type_Length")), [Field("TCI"), Field("Type_Length_TPID")], ) - assert_equal(ETHERNET_FRAME.direct_predecessors(FINAL), [Field("Payload")]) + assert_equal(models.ethernet_frame().direct_predecessors(FINAL), [Field("Payload")]) def test_direct_successors() -> None: - assert_equal(ETHERNET_FRAME.direct_successors(INITIAL), [Field("Destination")]) - assert_equal(ETHERNET_FRAME.direct_successors(Field("Type_Length")), [Field("Payload")]) - assert_equal(ETHERNET_FRAME.direct_successors(FINAL), []) + assert_equal(models.ethernet_frame().direct_successors(INITIAL), [Field("Destination")]) + assert_equal( + models.ethernet_frame().direct_successors(Field("Type_Length")), + [Field("Payload")], + ) + assert_equal(models.ethernet_frame().direct_successors(FINAL), []) def test_definite_predecessors() -> None: assert_equal( - ETHERNET_FRAME.definite_predecessors(FINAL), + models.ethernet_frame().definite_predecessors(FINAL), ( Field("Destination"), Field("Source"), @@ -624,14 +604,14 @@ def test_definite_predecessors() -> None: ), ) assert_equal( - ETHERNET_FRAME.definite_predecessors(Field("TCI")), + models.ethernet_frame().definite_predecessors(Field("TCI")), (Field("Destination"), Field("Source"), Field("Type_Length_TPID"), Field("TPID")), ) def test_predecessors() -> None: assert_equal( - ETHERNET_FRAME.predecessors(FINAL), + models.ethernet_frame().predecessors(FINAL), ( Field("Destination"), Field("Source"), @@ -643,16 +623,16 @@ def test_predecessors() -> None: ), ) assert_equal( - ETHERNET_FRAME.predecessors(Field("TCI")), + models.ethernet_frame().predecessors(Field("TCI")), (Field("Destination"), Field("Source"), Field("Type_Length_TPID"), Field("TPID")), ) - assert_equal(ETHERNET_FRAME.predecessors(Field("Destination")), ()) - assert_equal(ETHERNET_FRAME.predecessors(INITIAL), ()) + assert_equal(models.ethernet_frame().predecessors(Field("Destination")), ()) + assert_equal(models.ethernet_frame().predecessors(INITIAL), ()) def test_successors() -> None: assert_equal( - ETHERNET_FRAME.successors(INITIAL), + models.ethernet_frame().successors(INITIAL), ( Field("Destination"), Field("Source"), @@ -664,7 +644,7 @@ def test_successors() -> None: ), ) assert_equal( - ETHERNET_FRAME.successors(Field("Source")), + models.ethernet_frame().successors(Field("Source")), ( Field("Type_Length_TPID"), Field("TPID"), @@ -674,11 +654,11 @@ def test_successors() -> None: ), ) assert_equal( - ETHERNET_FRAME.successors(Field("TPID")), + models.ethernet_frame().successors(Field("TPID")), (Field("TCI"), Field("Type_Length"), Field("Payload")), ) - assert_equal(ETHERNET_FRAME.successors(Field("Payload")), ()) - assert_equal(ETHERNET_FRAME.successors(FINAL), ()) + assert_equal(models.ethernet_frame().successors(Field("Payload")), ()) + assert_equal(models.ethernet_frame().successors(FINAL), ()) def test_field_identifier_locations() -> None: @@ -689,7 +669,7 @@ def test_field_identifier_locations() -> None: message = UnprovenMessage( "P::M", [Link(INITIAL, Field("A")), Link(Field("A"), Field("B")), Link(Field("B"), FINAL)], - {p: INTEGER, a: INTEGER, b: INTEGER}, + {p: models.integer(), a: models.integer(), b: models.integer()}, ) assert message.parameters == (p,) @@ -702,18 +682,18 @@ def test_field_identifier_locations() -> None: @pytest.mark.parametrize( "expression", [ - Variable(INTEGER.identifier, location=Location((1, 2))), + Variable(models.integer().identifier, location=Location((1, 2))), And( TRUE, - Variable(INTEGER.identifier, location=Location((1, 2))), + Variable(models.integer().identifier, location=Location((1, 2))), location=Location((3, 4)), ), Equal( - Add(Variable(INTEGER.identifier, location=Location((1, 2))), Number(1)), + Add(Variable(models.integer().identifier, location=Location((1, 2))), Number(1)), Number(1), ), Equal( - Variable(INTEGER.identifier, location=Location((1, 2))), + Variable(models.integer().identifier, location=Location((1, 2))), Variable("X"), ), ], @@ -723,7 +703,7 @@ def test_invalid_use_of_type_name(expression: Expr) -> None: Link(INITIAL, Field("X")), Link(Field("X"), FINAL, condition=expression), ] - types = {Field("X"): INTEGER} + types = {Field("X"): models.integer()} assert_message_model_error( structure, @@ -745,7 +725,7 @@ def test_invalid_use_of_enum_literal(expression: Expr) -> None: Link(INITIAL, Field("X")), Link(Field("X"), FINAL, condition=expression), ] - types = {Field("X"): ENUMERATION} + types = {Field("X"): models.enumeration()} assert_message_model_error( structure, @@ -770,7 +750,7 @@ def test_invalid_message_field_type() -> None: def test_unused_parameter() -> None: structure = [Link(INITIAL, Field("X")), Link(Field("X"), FINAL)] - types = {Field(ID("P", Location((1, 2)))): INTEGER, Field("X"): INTEGER} + types = {Field(ID("P", Location((1, 2)))): models.integer(), Field("X"): models.integer()} assert_message_model_error( structure, @@ -886,7 +866,7 @@ def test_undefined_variables() -> None: Link(Field("F2"), FINAL), ] - types = {Field("F1"): INTEGER, Field("F2"): INTEGER} + types = {Field("F1"): models.integer(), Field("F2"): models.integer()} assert_message_model_error( structure, @@ -958,9 +938,9 @@ def test_reference_to_optional_field_2() -> None: Link(Field("Data"), FINAL), ] types = { - Field("Flag"): INTEGER, - Field("Opt"): INTEGER, - Field("Any"): INTEGER, + Field("Flag"): models.integer(), + Field("Opt"): models.integer(), + Field("Any"): models.integer(), Field("Data"): OPAQUE, } assert_message_model_error( @@ -978,7 +958,7 @@ def test_invalid_use_of_size_attribute() -> None: Link(INITIAL, Field("F1")), Link(Field("F1"), FINAL, Equal(Size(Number(1)), Number(32), Location((400, 17)))), ] - types = {Field("F1"): INTEGER} + types = {Field("F1"): models.integer()} assert_message_model_error( structure, types, @@ -996,7 +976,7 @@ def test_invalid_relation_to_opaque() -> None: condition=Equal(Variable("Data"), Number(42, location=Location((10, 20)))), ), ] - types = {Field("Length"): INTEGER, Field("Data"): OPAQUE} + types = {Field("Length"): models.integer(), Field("Data"): OPAQUE} assert_message_model_error( structure, types, @@ -1122,7 +1102,7 @@ def test_sequence_aggregate_invalid_element_type() -> None: inner = Message( "P::I", [Link(INITIAL, Field("F")), Link(Field("F"), FINAL)], - {Field("F"): INTEGER}, + {Field("F"): models.integer()}, ) sequence_type = Sequence("P::Sequence", inner) f = Field("F") @@ -1190,7 +1170,7 @@ def test_opaque_not_byte_aligned_dynamic() -> None: Link(o2, FINAL), ], { - Field("L1"): INTEGER, + Field("L1"): models.integer(), Field("L2"): Integer("P::T", Number(0), Number(3), Number(2)), Field("O1"): OPAQUE, o2: OPAQUE, @@ -1206,7 +1186,7 @@ def test_opaque_valid_byte_aligned_dynamic_mul() -> None: Link(Field("L"), Field("O1"), size=Mul(Number(8), Variable("L"))), Link(Field("O1"), FINAL), ], - {Field("L"): INTEGER, Field("O1"): OPAQUE}, + {Field("L"): models.integer(), Field("O1"): OPAQUE}, ) @@ -1224,7 +1204,7 @@ def test_opaque_valid_byte_aligned_dynamic_cond() -> None: Link(Field("O1"), Field("O2"), size=Number(128)), Link(Field("O2"), FINAL), ], - {Field("L"): INTEGER, Field("O1"): OPAQUE, Field("O2"): OPAQUE}, + {Field("L"): models.integer(), Field("O1"): OPAQUE, Field("O2"): OPAQUE}, ) @@ -1258,7 +1238,7 @@ def test_opaque_size_not_multiple_of_8_dynamic() -> None: Link(Field("L"), o, size=Variable("L", location=Location((44, 3)))), Link(o, FINAL), ], - {Field("L"): INTEGER, o: OPAQUE}, + {Field("L"): models.integer(), o: OPAQUE}, ) @@ -1275,7 +1255,7 @@ def test_opaque_size_valid_multiple_of_8_dynamic_cond() -> None: ), Link(Field("O"), FINAL), ], - {Field("L"): INTEGER, Field("O"): OPAQUE}, + {Field("L"): models.integer(), Field("O"): OPAQUE}, ) @@ -1301,9 +1281,9 @@ def test_prefixed_message_attribute() -> None: Link(Field("F4"), FINAL), ], { - Field("F1"): deepcopy(INTEGER), - Field("F2"): deepcopy(INTEGER), - Field("F3"): deepcopy(INTEGER), + Field("F1"): deepcopy(models.integer()), + Field("F2"): deepcopy(models.integer()), + Field("F3"): deepcopy(models.integer()), Field("F4"): OPAQUE, }, ).prefixed("X_") @@ -1329,9 +1309,9 @@ def test_prefixed_message_attribute() -> None: Link(Field("X_F4"), FINAL), ], { - Field("X_F1"): deepcopy(INTEGER), - Field("X_F2"): deepcopy(INTEGER), - Field("X_F3"): deepcopy(INTEGER), + Field("X_F1"): deepcopy(models.integer()), + Field("X_F2"): deepcopy(models.integer()), + Field("X_F3"): deepcopy(models.integer()), Field("X_F4"): OPAQUE, }, ) @@ -1347,8 +1327,8 @@ def test_exclusive_valid() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } Message("P::M", structure, types) @@ -1361,8 +1341,8 @@ def test_exclusive_enum_valid() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): ENUMERATION, - Field("F2"): INTEGER, + Field("F1"): models.enumeration(), + Field("F2"): models.integer(), } Message("P::M", structure, types) @@ -1375,7 +1355,7 @@ def test_exclusive_prefixed_enum_valid() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): ENUMERATION, + Field("F1"): models.enumeration(), Field("F2"): Enumeration( "P2::Enumeration", [("One", Number(2)), ("Two", Number(1))], @@ -1398,8 +1378,8 @@ def test_exclusive_conflict() -> None: Link(Field("F2"), FINAL), ] types = { - Field(ID("F1", Location((8, 4)))): INTEGER, - Field("F2"): INTEGER, + Field(ID("F1", Location((8, 4)))): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -1429,9 +1409,9 @@ def test_exclusive_with_size_valid() -> None: Link(Field("F3"), FINAL), ] types = { - Field("F1"): INTEGER, + Field("F1"): models.integer(), Field("F2"): OPAQUE, - Field("F3"): INTEGER, + Field("F3"): models.integer(), } Message("P::M", structure, types) @@ -1453,9 +1433,9 @@ def test_exclusive_with_size_valid_and_not() -> None: Link(Field("F3"), FINAL), ] types = { - Field("F1"): INTEGER, + Field("F1"): models.integer(), Field("F2"): OPAQUE, - Field("F3"): INTEGER, + Field("F3"): models.integer(), } Message("P::M", structure, types) @@ -1469,7 +1449,7 @@ def test_exclusive_with_size_invalid() -> None: ] types = { Field(ID("F1", Location((98, 10)))): OPAQUE, - Field("F2"): INTEGER, + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -1506,9 +1486,9 @@ def test_no_valid_path() -> None: Link(f3, FINAL, condition=LessEqual(Variable("F1"), Number(80), Location((23, 5)))), ] types = { - f1: INTEGER, - f2: INTEGER, - f3: INTEGER, + f1: models.integer(), + f2: models.integer(), + f3: models.integer(), } assert_message_model_error( structure, @@ -1559,7 +1539,7 @@ def test_invalid_path_1(monkeypatch: pytest.MonkeyPatch) -> None: Link(f1, FINAL, condition=Equal(Number(1), Number(2), Location((5, 10)))), ] types = { - Field("F1"): INTEGER, + Field("F1"): models.integer(), } monkeypatch.setattr(Message, "_prove_reachability", lambda _: None) assert_message_model_error( @@ -1580,8 +1560,8 @@ def test_invalid_path_2(monkeypatch: pytest.MonkeyPatch) -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } monkeypatch.setattr(Message, "_prove_reachability", lambda _: None) assert_message_model_error( @@ -1634,8 +1614,8 @@ def test_invalid_type_condition_range_low() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -1739,8 +1719,8 @@ def test_tlv_valid_enum() -> None: Link(Field("V"), FINAL), ] types = { - Field("L"): INTEGER, - Field("T"): ENUMERATION, + Field("L"): models.integer(), + Field("T"): models.enumeration(), Field("V"): OPAQUE, } Message("P::M", structure, types) @@ -1753,8 +1733,8 @@ def test_invalid_fixed_size_field_with_size() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -1770,8 +1750,8 @@ def test_valid_first() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } Message("P::M", structure, types) @@ -1784,9 +1764,9 @@ def test_invalid_first_is_not_previous_field() -> None: Link(Field("F3"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, - Field("F3"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), + Field("F3"): models.integer(), } assert_message_model_error( structure, @@ -1806,8 +1786,8 @@ def test_invalid_first_is_expression() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -1823,8 +1803,8 @@ def test_invalid_first_is_last() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -1841,9 +1821,9 @@ def test_invalid_first_forward_reference() -> None: Link(Field("F3"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, - Field("F3"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), + Field("F3"): models.integer(), } assert_message_model_error( structure, @@ -1863,7 +1843,7 @@ def test_valid_size_reference() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, + Field("F1"): models.integer(), Field("F2"): OPAQUE, } Message("P::M", structure, types) @@ -1876,7 +1856,7 @@ def test_invalid_size_forward_reference() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, + Field("F1"): models.integer(), Field("F2"): OPAQUE, } assert_message_model_error( @@ -1894,7 +1874,7 @@ def test_invalid_negative_field_size_1() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, + Field("F1"): models.integer(), Field("F2"): OPAQUE, } assert_message_model_error( @@ -1919,7 +1899,7 @@ def test_invalid_negative_field_size_2() -> None: Link(o, FINAL), ] types = { - Field("L"): INTEGER, + Field("L"): models.integer(), o: OPAQUE, } assert_message_model_error( @@ -1944,7 +1924,7 @@ def test_invalid_negative_field_size_3() -> None: ), ] types = { - Field("F1"): INTEGER, + Field("F1"): models.integer(), Field("F2"): OPAQUE, } assert_message_model_error( @@ -1978,8 +1958,8 @@ def test_sequence_no_size() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): SEQUENCE_INTEGER_VECTOR, - Field("F2"): SEQUENCE_INTEGER_VECTOR, + Field("F1"): models.sequence_integer_vector(), + Field("F2"): models.sequence_integer_vector(), } assert_message_model_error( structure, @@ -2023,7 +2003,7 @@ def test_field_coverage_1(monkeypatch: pytest.MonkeyPatch) -> None: Link(Field("F2"), FINAL), ] - types = {Field("F1"): INTEGER, Field("F2"): INTEGER} + types = {Field("F1"): models.integer(), Field("F2"): models.integer()} monkeypatch.setattr(Message, "_verify_expressions", lambda _: None) assert_message_model_error( structure, @@ -2052,10 +2032,10 @@ def test_field_coverage_2(monkeypatch: pytest.MonkeyPatch) -> None: ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, - Field("F3"): INTEGER, - Field("F4"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), + Field("F3"): models.integer(), + Field("F4"): models.integer(), } monkeypatch.setattr(Message, "_verify_expressions", lambda _: None) assert_message_model_error( @@ -2078,7 +2058,7 @@ def test_field_after_message_start(monkeypatch: pytest.MonkeyPatch) -> None: Link(Field("F2"), FINAL), ] - types = {Field("F1"): INTEGER, Field("F2"): INTEGER} + types = {Field("F1"): models.integer(), Field("F2"): models.integer()} monkeypatch.setattr(Message, "_verify_expressions", lambda _: None) assert_message_model_error( structure, @@ -2101,7 +2081,7 @@ def test_field_after_message_start(monkeypatch: pytest.MonkeyPatch) -> None: Equal(Add(Sub(Last("Message"), First("Message")), Number(1)), Number(64)), ], ) -@pytest.mark.parametrize("type_", [OPAQUE, SEQUENCE_INTEGER_VECTOR]) +@pytest.mark.parametrize("type_", [OPAQUE, models.sequence_integer_vector()]) def test_message_with_implicit_size_single_field(size: Expr, condition: Expr, type_: Type) -> None: x = Field("X") @@ -2125,7 +2105,7 @@ def test_message_with_implicit_size_single_field(size: Expr, condition: Expr, ty Equal(Add(Sub(Last("Message"), First("Message")), Number(1)), Number(64)), ], ) -@pytest.mark.parametrize("type_", [OPAQUE, SEQUENCE_INTEGER_VECTOR]) +@pytest.mark.parametrize("type_", [OPAQUE, models.sequence_integer_vector()]) def test_message_with_implicit_size_multiple_fields( size: Expr, condition: Expr, @@ -2189,10 +2169,10 @@ def test_no_path_to_final() -> None: ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, - Field("F3"): INTEGER, - Field("F4"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), + Field("F3"): models.integer(), + Field("F4"): models.integer(), } assert_message_model_error( structure, @@ -2213,12 +2193,12 @@ def test_no_path_to_final_transitive() -> None: ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, - Field("F3"): INTEGER, - Field("F4"): INTEGER, - Field("F5"): INTEGER, - Field("F6"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), + Field("F3"): models.integer(), + Field("F4"): models.integer(), + Field("F5"): models.integer(), + Field("F6"): models.integer(), } assert_message_model_error( structure, @@ -2238,8 +2218,8 @@ def test_conditionally_unreachable_field_mod_first() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -2277,8 +2257,8 @@ def test_conditionally_unreachable_field_mod_last() -> None: Link(Field("F2"), FINAL, Equal(Last("F1"), Last("Message"))), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -2311,8 +2291,8 @@ def test_conditionally_unreachable_field_range_first() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -2350,8 +2330,8 @@ def test_conditionally_unreachable_field_range_last() -> None: Link(Field("F2"), FINAL, Equal(Last("F1"), Last("Message"))), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -2384,8 +2364,8 @@ def test_conditionally_unreachable_field_enum_first() -> None: Link(Field("F2"), FINAL), ] types = { - Field("F1"): ENUMERATION, - Field("F2"): ENUMERATION, + Field("F1"): models.enumeration(), + Field("F2"): models.enumeration(), } assert_message_model_error( structure, @@ -2423,8 +2403,8 @@ def test_conditionally_unreachable_field_enum_last() -> None: Link(Field("F2"), FINAL, Equal(Last("F1"), Last("Message"))), ] types = { - Field("F1"): ENUMERATION, - Field("F2"): ENUMERATION, + Field("F1"): models.enumeration(), + Field("F2"): models.enumeration(), } assert_message_model_error( structure, @@ -2458,8 +2438,8 @@ def test_conditionally_unreachable_field_outgoing() -> None: Link(Field("F2"), FINAL, Greater(Variable("F1"), Number(32))), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -2500,9 +2480,9 @@ def test_conditionally_unreachable_field_outgoing_multi() -> None: Link(Field("F3"), FINAL), ] types = { - Field("F1"): INTEGER, - Field(ID("F2", Location((90, 12)))): INTEGER, - Field("F3"): INTEGER, + Field("F1"): models.integer(), + Field(ID("F2", Location((90, 12)))): models.integer(), + Field("F3"): models.integer(), } assert_message_model_error( structure, @@ -2533,8 +2513,8 @@ def test_size_aspect_final() -> None: Link(Field("F2"), FINAL, size=Number(100, location=Location((4, 12)))), ] types = { - Field("F1"): INTEGER, - Field("F2"): INTEGER, + Field("F1"): models.integer(), + Field("F2"): models.integer(), } assert_message_model_error( structure, @@ -2756,12 +2736,12 @@ def test_no_contradiction_multi() -> None: Link(Field("F5"), FINAL), ] types = { - Field("F0"): INTEGER, - Field("F1"): INTEGER, - Field("F2"): INTEGER, - Field("F3"): INTEGER, - Field("F4"): INTEGER, - Field("F5"): INTEGER, + Field("F0"): models.integer(), + Field("F1"): models.integer(), + Field("F2"): models.integer(), + Field("F3"): models.integer(), + Field("F4"): models.integer(), + Field("F5"): models.integer(), } Message("P::M", structure, types) @@ -2797,9 +2777,9 @@ def test_discontiguous_optional_fields() -> None: ), ] types = { - Field("Flag"): INTEGER, - Field("Opt1"): INTEGER, - Field("Data"): INTEGER, + Field("Flag"): models.integer(), + Field("Opt1"): models.integer(), + Field("Data"): models.integer(), Field("Opt2"): OPAQUE, } Message("P::M", structure, types) @@ -2844,7 +2824,7 @@ def test_checksum(checksums: abc.Mapping[ID, abc.Sequence[Expr]], condition: Exp Link(f2, f3), Link(f3, FINAL, condition), ] - types = {f1: INTEGER, f2: INTEGER, f3: INTEGER} + types = {f1: models.integer(), f2: models.integer(), f3: models.integer()} message = Message("P::M", structure, types, checksums=checksums) assert message.checksums == checksums @@ -2902,7 +2882,7 @@ def test_checksum_error( Link(f2, f3), Link(f3, FINAL, condition), ] - types = {f1: INTEGER, f2: INTEGER, f3: INTEGER} + types = {f1: models.integer(), f2: models.integer(), f3: models.integer()} assert_message_model_error(structure, types, error, checksums=checksums) @@ -2916,7 +2896,7 @@ def test_field_size() -> None: Link(Field("B"), Field("C")), Link(Field("C"), FINAL), ], - {Field("A"): INTEGER, Field("B"): OPAQUE, Field("C"): OPAQUE}, + {Field("A"): models.integer(), Field("B"): OPAQUE, Field("C"): OPAQUE}, location=Location((30, 10)), ) @@ -2941,38 +2921,38 @@ def test_copy() -> None: message = Message( "P::M", [Link(INITIAL, Field("F")), Link(Field("F"), FINAL)], - {Field("F"): INTEGER}, + {Field("F"): models.integer()}, ) assert_equal( message.copy(identifier="A::B"), Message( "A::B", [Link(INITIAL, Field("F")), Link(Field("F"), FINAL)], - {Field("F"): INTEGER}, + {Field("F"): models.integer()}, ), ) assert_equal( message.copy( structure=[Link(INITIAL, Field("C")), Link(Field("C"), FINAL)], - types={Field("C"): INTEGER}, + types={Field("C"): models.integer()}, byte_order={Field("C"): ByteOrder.HIGH_ORDER_FIRST}, ), Message( "P::M", [Link(INITIAL, Field("C")), Link(Field("C"), FINAL)], - {Field("C"): INTEGER}, + {Field("C"): models.integer()}, ), ) assert_equal( message.copy( structure=[Link(INITIAL, Field("C")), Link(Field("C"), FINAL)], - types={Field("C"): INTEGER}, + types={Field("C"): models.integer()}, byte_order={Field("C"): ByteOrder.LOW_ORDER_FIRST}, ), Message( "P::M", [Link(INITIAL, Field("C")), Link(Field("C"), FINAL)], - {Field("C"): INTEGER}, + {Field("C"): models.integer()}, byte_order=ByteOrder.LOW_ORDER_FIRST, ), ) @@ -2982,7 +2962,7 @@ def test_proven() -> None: message = Message( "P::M", [Link(INITIAL, Field("F")), Link(Field("F"), FINAL)], - {Field("F"): INTEGER}, + {Field("F"): models.integer()}, ) assert message.proven() == message @@ -3013,71 +2993,73 @@ def test_is_possibly_empty() -> None: def test_has_fixed_size() -> None: - assert NULL_MESSAGE.has_fixed_size - assert FIXED_SIZE_MESSAGE.has_fixed_size - assert not TLV_MESSAGE.has_fixed_size - assert not ETHERNET_FRAME.has_fixed_size - assert not SEQUENCE_MESSAGE.has_fixed_size + assert models.null_message().has_fixed_size + assert models.fixed_size_message().has_fixed_size + assert not models.tlv_message().has_fixed_size + assert not models.ethernet_frame().has_fixed_size + assert not models.sequence_message().has_fixed_size def test_has_implicit_size() -> None: - assert not NULL_MESSAGE.has_implicit_size - assert not FIXED_SIZE_MESSAGE.has_implicit_size - assert not TLV_MESSAGE.has_implicit_size - assert ETHERNET_FRAME.has_implicit_size - assert not SEQUENCE_MESSAGE.has_implicit_size + assert not models.null_message().has_implicit_size + assert not models.fixed_size_message().has_implicit_size + assert not models.tlv_message().has_implicit_size + assert models.ethernet_frame().has_implicit_size + assert not models.sequence_message().has_implicit_size def test_is_definite() -> None: - assert NULL_MESSAGE.is_definite - assert FIXED_SIZE_SIMPLE_MESSAGE.is_definite - assert DEFINITE_MESSAGE.is_definite - assert not FIXED_SIZE_MESSAGE.is_definite - assert not TLV_MESSAGE.is_definite - assert not ETHERNET_FRAME.is_definite - assert not SEQUENCE_MESSAGE.is_definite + assert models.null_message().is_definite + assert models.fixed_size_simple_message().is_definite + assert models.definite_message().is_definite + assert not models.fixed_size_message().is_definite + assert not models.tlv_message().is_definite + assert not models.ethernet_frame().is_definite + assert not models.sequence_message().is_definite def test_size() -> None: - assert NULL_MESSAGE.size() == Number(0) - assert FIXED_SIZE_MESSAGE.size() == Number(200) - assert DEFINITE_MESSAGE.size() == Add(Mul(Variable("Length"), Number(8)), Number(16)) - assert DEFINITE_MESSAGE.size({Field("Length"): Number(8)}) == Number(80) - assert TLV_MESSAGE.size({Field("Tag"): Literal("TLV::Msg_Error")}) == Number(8) - assert TLV_MESSAGE.size( + assert models.null_message().size() == Number(0) + assert models.fixed_size_message().size() == Number(200) + assert models.definite_message().size() == Add(Mul(Variable("Length"), Number(8)), Number(16)) + assert models.definite_message().size({Field("Length"): Number(8)}) == Number(80) + assert models.tlv_message().size({Field("Tag"): Literal("TLV::Msg_Error")}) == Number(8) + assert models.tlv_message().size( { Field("Tag"): Literal("TLV::Msg_Data"), Field("Length"): Number(4), Field("Value"): Aggregate(*[Number(0)] * 4), }, ) == Number(56) - assert TLV_MESSAGE.size( + assert models.tlv_message().size( { Field("Tag"): Literal("TLV::Msg_Data"), Field("Length"): Div(Add(Size("Tag"), Size("TLV::Length")), Number(8)), Field("Value"): Aggregate(*[Number(0)] * 3), }, ) == Number(48) - assert TLV_MESSAGE.size( + assert models.tlv_message().size( { Field("Tag"): Literal("TLV::Msg_Data"), Field("Length"): Add(Div(Size("X"), Number(8)), Variable("Y")), Field("Value"): Variable("Z"), }, ) == Add(Size("Z"), Number(24)) - assert TLV_MESSAGE.size( + assert models.tlv_message().size( { Field("Tag"): Literal("TLV::Msg_Data"), Field("Length"): Div(Size("Msg_Data"), Number(8)), Field("Value"): Opaque("Msg_Data"), }, ) == Add(Mul(Div(Size("Msg_Data"), Number(8)), Number(8)), Number(24)) - assert TLV_MESSAGE.size({Field("Tag"): Variable("X"), Field("Length"): Variable("Y")}) == Add( + assert models.tlv_message().size( + {Field("Tag"): Variable("X"), Field("Length"): Variable("Y")}, + ) == Add( Mul(Variable("Y"), Number(8)), Number(24), ) - assert ETHERNET_FRAME.size( + assert models.ethernet_frame().size( { Field("Destination"): Number(0), Field("Source"): Number(0), @@ -3086,7 +3068,7 @@ def test_size() -> None: Field("Payload"): Aggregate(*[Number(0)] * 46), }, ) == Number(480) - assert ETHERNET_FRAME.size( + assert models.ethernet_frame().size( { Field("Destination"): Number(0), Field("Source"): Number(0), @@ -3097,7 +3079,7 @@ def test_size() -> None: Field("Payload"): Aggregate(*[Number(0)] * 46), }, ) == Number(512) - assert ETHERNET_FRAME.size( + assert models.ethernet_frame().size( { Field("Destination"): Number(0), Field("Source"): Number(0), @@ -3106,7 +3088,7 @@ def test_size() -> None: Field("Payload"): Aggregate(*[Number(0)] * 46), }, ) == Number(480) - assert ETHERNET_FRAME.size( + assert models.ethernet_frame().size( { Field("Destination"): Number(0), Field("Source"): Number(0), @@ -3139,7 +3121,7 @@ def test_size() -> None: ], { Field("Has_Data"): BOOLEAN, - Field("Length"): TLV_LENGTH, + Field("Length"): models.tlv_length(), Field("Data"): OPAQUE, }, ) @@ -3196,8 +3178,8 @@ def test_size() -> None: ), ], { - Field("A"): TLV_LENGTH, - Field("B"): TLV_LENGTH, + Field("A"): models.tlv_length(), + Field("B"): models.tlv_length(), }, ) assert optional_overlayed_field.size( @@ -3249,9 +3231,9 @@ def test_size() -> None: ), ], { - Field("A"): TLV_LENGTH, - Field("B"): TLV_LENGTH, - Field("C"): TLV_LENGTH, + Field("A"): models.tlv_length(), + Field("B"): models.tlv_length(), + Field("C"): models.tlv_length(), }, ) assert path_dependent_fields.size({Field("A"): Variable("X"), Field("B"): Number(0)}) == Number( @@ -3267,21 +3249,21 @@ def test_size() -> None: def test_size_subpath() -> None: - assert NULL_MESSAGE.size({}, subpath=True) == Number(0) + assert models.null_message().size({}, subpath=True) == Number(0) - assert FIXED_SIZE_MESSAGE.size( + assert models.fixed_size_message().size( { Field("Message_Type"): Literal("Universal::MT_Data"), }, subpath=True, ) == Number(8) - assert FIXED_SIZE_MESSAGE.size( + assert models.fixed_size_message().size( { Field("Data"): Aggregate(*[Number(0)] * 4), }, subpath=True, ) == Number(32) - assert FIXED_SIZE_MESSAGE.size( + assert models.fixed_size_message().size( { Field("Message_Type"): Literal("Universal::MT_Data"), Field("Data"): Aggregate(*[Number(0)] * 4), @@ -3289,19 +3271,19 @@ def test_size_subpath() -> None: subpath=True, ) == Number(40) - assert DEFINITE_MESSAGE.size( + assert models.definite_message().size( { Field("Length"): Number(8), }, subpath=True, ) == Number(16) - assert DEFINITE_MESSAGE.size( + assert models.definite_message().size( { Field("Data"): Variable("D"), }, subpath=True, ) == Size("D") - assert DEFINITE_MESSAGE.size( + assert models.definite_message().size( { Field("Length"): Variable("L"), Field("Data"): Variable("D"), @@ -3309,13 +3291,13 @@ def test_size_subpath() -> None: subpath=True, ) == Add(Size("D"), Number(16)) - assert TLV_MESSAGE.size( + assert models.tlv_message().size( { Field("Tag"): Variable("T"), }, subpath=True, ) == Number(8) - assert TLV_MESSAGE.size( + assert models.tlv_message().size( { Field("Tag"): Variable("T"), Field("Length"): Variable("L"), @@ -3323,28 +3305,28 @@ def test_size_subpath() -> None: }, subpath=True, ) == Number(56) - assert TLV_MESSAGE.size( + assert models.tlv_message().size( { Field("Tag"): Variable("T"), Field("Length"): Variable("L"), }, subpath=True, ) == Number(24) - assert TLV_MESSAGE.size( + assert models.tlv_message().size( { Field("Length"): Div(Add(Size("Tag"), Size("TLV::Length")), Number(8)), Field("Value"): Aggregate(*[Number(0)] * 3), }, subpath=True, ) == Number(40) - assert TLV_MESSAGE.size( + assert models.tlv_message().size( { Field("Length"): Add(Div(Size("X"), Number(8)), Variable("Y")), Field("Value"): Variable("Z"), }, subpath=True, ) == Add(Size("Z"), Number(16)) - assert TLV_MESSAGE.size( + assert models.tlv_message().size( { Field("Length"): Div(Size("Msg_Data"), Number(8)), Field("Value"): Opaque("Msg_Data"), @@ -3352,7 +3334,7 @@ def test_size_subpath() -> None: subpath=True, ) == Add(Mul(Div(Size("Msg_Data"), Number(8)), Number(8)), Number(16)) - assert ETHERNET_FRAME.size( + assert models.ethernet_frame().size( { Field("Destination"): Number(0), Field("Source"): Number(0), @@ -3362,7 +3344,7 @@ def test_size_subpath() -> None: }, subpath=True, ) == Number(480) - assert ETHERNET_FRAME.size( + assert models.ethernet_frame().size( { Field("Destination"): Number(0), Field("Source"): Number(0), @@ -3374,7 +3356,7 @@ def test_size_subpath() -> None: }, subpath=True, ) == Number(512) - assert ETHERNET_FRAME.size( + assert models.ethernet_frame().size( { Field("Destination"): Number(0), Field("Source"): Number(0), @@ -3384,7 +3366,7 @@ def test_size_subpath() -> None: }, subpath=True, ) == Number(480) - assert ETHERNET_FRAME.size( + assert models.ethernet_frame().size( { Field("Destination"): Number(0), Field("Source"): Number(0), @@ -3418,7 +3400,7 @@ def test_size_subpath() -> None: ], { Field("Has_Data"): BOOLEAN, - Field("Length"): TLV_LENGTH, + Field("Length"): models.tlv_length(), Field("Data"): OPAQUE, }, ) @@ -3488,8 +3470,8 @@ def test_size_subpath() -> None: ), ], { - Field("A"): TLV_LENGTH, - Field("B"): TLV_LENGTH, + Field("A"): models.tlv_length(), + Field("B"): models.tlv_length(), }, ) assert optional_overlayed_field.size( @@ -3546,9 +3528,9 @@ def test_size_subpath() -> None: ), ], { - Field("A"): TLV_LENGTH, - Field("B"): TLV_LENGTH, - Field("C"): TLV_LENGTH, + Field("A"): models.tlv_length(), + Field("B"): models.tlv_length(), + Field("C"): models.tlv_length(), }, ) assert path_dependent_fields.size( @@ -3579,7 +3561,7 @@ def test_size_subpath_error() -> None: r"$" ), ): - TLV_MESSAGE.size( + models.tlv_message().size( { Field("Tag"): Variable("T"), Field("Value"): Variable("V"), @@ -3589,10 +3571,10 @@ def test_size_subpath_error() -> None: def test_max_size() -> None: - assert NULL_MESSAGE.max_size() == Number(0) - assert FIXED_SIZE_MESSAGE.max_size() == Number(8 + 3 * 64) - assert TLV_MESSAGE.max_size() == Number(8 + 16 + (2**16 - 1) * 8) - assert SEQUENCE_MESSAGE.max_size() == Number(8 + (2**8 - 1) * 8 + 2 * 16) + assert models.null_message().max_size() == Number(0) + assert models.fixed_size_message().max_size() == Number(8 + 3 * 64) + assert models.tlv_message().max_size() == Number(8 + 16 + (2**16 - 1) * 8) + assert models.sequence_message().max_size() == Number(8 + (2**8 - 1) * 8 + 2 * 16) def test_max_size_error() -> None: @@ -3600,18 +3582,18 @@ def test_max_size_error() -> None: RecordFluxError, match=r"^model: error: unable to calculate maximum size of message with implicit size$", ): - ETHERNET_FRAME.max_size() + models.ethernet_frame().max_size() def test_max_field_sizes() -> None: - assert NULL_MESSAGE.max_field_sizes() == {} - assert FIXED_SIZE_MESSAGE.max_field_sizes() == { + assert models.null_message().max_field_sizes() == {} + assert models.fixed_size_message().max_field_sizes() == { Field("Message_Type"): Number(8), Field("Data"): Number(64), Field("Values"): Number(64), Field("Options"): Number(64), } - assert TLV_MESSAGE.max_field_sizes() == { + assert models.tlv_message().max_field_sizes() == { Field("Tag"): Number(8), Field("Length"): Number(16), Field("Value"): Number((2**16 - 1) * 8), @@ -3625,7 +3607,7 @@ def test_max_field_sizes_error() -> None: r"^model: error: unable to calculate maximum field sizes of message with implicit size$" ), ): - ETHERNET_FRAME.max_field_sizes() + models.ethernet_frame().max_field_sizes() def test_derived_message_incorrect_base_name() -> None: @@ -3642,7 +3624,7 @@ def test_derived_message_proven() -> None: Message( "X::M", [Link(INITIAL, Field("F")), Link(Field("F"), FINAL)], - {Field("F"): INTEGER}, + {Field("F"): models.integer()}, ), ) assert message.proven() == message @@ -3671,9 +3653,9 @@ def test_prefixed_message() -> None: Link(Field("F4"), FINAL), ], { - Field("F1"): deepcopy(INTEGER), + Field("F1"): deepcopy(models.integer()), Field("F2"): deepcopy(BOOLEAN), - Field("F3"): deepcopy(INTEGER), + Field("F3"): deepcopy(models.integer()), Field("F4"): OPAQUE, }, ).prefixed("X_"), @@ -3698,9 +3680,9 @@ def test_prefixed_message() -> None: Link(Field("X_F4"), FINAL), ], { - Field("X_F1"): deepcopy(INTEGER), + Field("X_F1"): deepcopy(models.integer()), Field("X_F2"): deepcopy(BOOLEAN), - Field("X_F3"): deepcopy(INTEGER), + Field("X_F3"): deepcopy(models.integer()), Field("X_F4"): OPAQUE, }, ), @@ -3730,9 +3712,9 @@ def test_merge_message_simple() -> None: ], { Field("NR_F1"): OPAQUE, - Field("NR_F2"): deepcopy(INTEGER), - Field("NR_F3"): deepcopy(ENUMERATION), - Field("NR_F4"): deepcopy(INTEGER), + Field("NR_F2"): deepcopy(models.integer()), + Field("NR_F3"): deepcopy(models.enumeration()), + Field("NR_F4"): deepcopy(models.integer()), }, ) @@ -3793,15 +3775,15 @@ def test_merge_message_complex() -> None: ), ], { - Field("F1"): deepcopy(INTEGER), - Field("F2"): deepcopy(INTEGER), - Field("F3"): deepcopy(INTEGER), + Field("F1"): deepcopy(models.integer()), + Field("F2"): deepcopy(models.integer()), + Field("F3"): deepcopy(models.integer()), Field("NR_F1"): OPAQUE, - Field("NR_F2"): deepcopy(INTEGER), - Field("NR_F3"): deepcopy(ENUMERATION), - Field("NR_F4"): deepcopy(INTEGER), - Field("F5"): deepcopy(INTEGER), - Field("F6"): deepcopy(INTEGER), + Field("NR_F2"): deepcopy(models.integer()), + Field("NR_F3"): deepcopy(models.enumeration()), + Field("NR_F4"): deepcopy(models.integer()), + Field("F5"): deepcopy(models.integer()), + Field("F6"): deepcopy(models.integer()), }, ), ) @@ -3852,13 +3834,13 @@ def test_merge_message_recursive() -> None: ], { Field("SR_NR_F1"): OPAQUE, - Field("SR_NR_F2"): deepcopy(INTEGER), - Field("SR_NR_F3"): deepcopy(ENUMERATION), - Field("SR_NR_F4"): deepcopy(INTEGER), + Field("SR_NR_F2"): deepcopy(models.integer()), + Field("SR_NR_F3"): deepcopy(models.enumeration()), + Field("SR_NR_F4"): deepcopy(models.integer()), Field("NR_F1"): OPAQUE, - Field("NR_F2"): deepcopy(INTEGER), - Field("NR_F3"): deepcopy(ENUMERATION), - Field("NR_F4"): deepcopy(INTEGER), + Field("NR_F2"): deepcopy(models.integer()), + Field("NR_F3"): deepcopy(models.enumeration()), + Field("NR_F4"): deepcopy(models.integer()), }, ), ) @@ -3890,9 +3872,9 @@ def test_merge_message_simple_derived() -> None: ], { Field("NR_F1"): OPAQUE, - Field("NR_F2"): deepcopy(INTEGER), - Field("NR_F3"): deepcopy(ENUMERATION), - Field("NR_F4"): deepcopy(INTEGER), + Field("NR_F2"): deepcopy(models.integer()), + Field("NR_F3"): deepcopy(models.enumeration()), + Field("NR_F4"): deepcopy(models.integer()), }, byte_order=ByteOrder.HIGH_ORDER_FIRST, ), @@ -3903,7 +3885,7 @@ def test_merge_byte_order() -> None: inner_msg = UnprovenMessage( "P::Merge_Test_Byte_Order", [Link(INITIAL, Field("F1")), Link(Field("F1"), Field("F2")), Link(Field("F2"), FINAL)], - {Field("F1"): INTEGER, Field("F2"): ENUMERATION}, + {Field("F1"): models.integer(), Field("F2"): models.enumeration()}, byte_order=ByteOrder.LOW_ORDER_FIRST, ) outer_msg = UnprovenMessage( @@ -3923,7 +3905,7 @@ def test_merge_byte_order() -> None: Link(Field("NR_F1"), Field("NR_F2")), Link(Field("NR_F2"), FINAL), ], - {Field("NR_F1"): INTEGER, Field("NR_F2"): ENUMERATION}, + {Field("NR_F1"): models.integer(), Field("NR_F2"): models.enumeration()}, byte_order={ Field("NR_F1"): ByteOrder.LOW_ORDER_FIRST, Field("NR_F2"): ByteOrder.LOW_ORDER_FIRST, @@ -4021,7 +4003,7 @@ def test_merge_message_error_name_conflict() -> None: m2 = UnprovenMessage( "P::M2", [Link(INITIAL, m2_f2), Link(m2_f2, FINAL)], - {m2_f2: INTEGER}, + {m2_f2: models.integer()}, location=Location((15, 3)), ) @@ -4031,7 +4013,7 @@ def test_merge_message_error_name_conflict() -> None: m1 = UnprovenMessage( "P::M1", [Link(INITIAL, m1_f1), Link(m1_f1, m1_f1_f2), Link(m1_f1_f2, FINAL)], - {m1_f1: m2, m1_f1_f2: INTEGER}, + {m1_f1: m2, m1_f1_f2: models.integer()}, location=Location((2, 9)), ) @@ -4061,9 +4043,9 @@ def test_merge_message_parameterized() -> None: Link(Field("PNR_F2"), FINAL), ], { - Field("P2"): INTEGER, - Field("PNR_F1"): INTEGER, - Field("PNR_F2"): INTEGER, + Field("P2"): models.integer(), + Field("PNR_F1"): models.integer(), + Field("PNR_F2"): models.integer(), }, ).proven(), ) @@ -4089,7 +4071,7 @@ def test_merge_message_with_message_last_attribute() -> None: ), Link(Field("I2"), FINAL), ], - {Field("I1"): INTEGER, Field("I2"): OPAQUE}, + {Field("I1"): models.integer(), Field("I2"): OPAQUE}, ) inner.error.propagate() @@ -4102,7 +4084,7 @@ def test_merge_message_with_message_last_attribute() -> None: Link(Field("O1"), Field("O2")), Link(Field("O2"), FINAL), ], - {Field("O1"): INTEGER, Field("O2"): inner}, + {Field("O1"): models.integer(), Field("O2"): inner}, ) .merged() .proven() @@ -4130,7 +4112,11 @@ def test_merge_message_with_message_last_attribute() -> None: ), Link(Field("O2_I2"), FINAL), ], - {Field("O1"): INTEGER, Field("O2_I1"): INTEGER, Field("O2_I2"): OPAQUE}, + { + Field("O1"): models.integer(), + Field("O2_I1"): models.integer(), + Field("O2_I2"): OPAQUE, + }, ), ) o1 = Field(ID("O1", location=Location((2, 10)))) @@ -4153,7 +4139,7 @@ def test_merge_message_with_message_last_attribute() -> None: Link(o1, Field("O2")), Link(Field("O2"), FINAL), ], - {o1: inner, Field("O2"): INTEGER}, + {o1: inner, Field("O2"): models.integer()}, ) .merged() .proven() @@ -4196,9 +4182,9 @@ def test_merge_message_with_message_size_attribute() -> None: Link(Field("B"), FINAL), ], { - Field("O1"): INTEGER, - Field("O2"): INTEGER, - Field("O3"): INTEGER, + Field("O1"): models.integer(), + Field("O2"): models.integer(), + Field("O3"): models.integer(), Field("A"): inner, Field("B"): inner, }, @@ -4236,9 +4222,9 @@ def test_merge_message_with_message_size_attribute() -> None: ), ], { - Field("O1"): INTEGER, - Field("O2"): INTEGER, - Field("O3"): INTEGER, + Field("O1"): models.integer(), + Field("O2"): models.integer(), + Field("O3"): models.integer(), Field("A_I"): OPAQUE, Field("B_I"): OPAQUE, }, @@ -4312,7 +4298,7 @@ def test_paths() -> None: Link(Field("L"), Field("O"), condition=LessEqual(Variable("L"), Number(100))), Link(Field("O"), FINAL), ], - {Field("L"): INTEGER, Field("O"): INTEGER}, + {Field("L"): models.integer(), Field("O"): models.integer()}, ) assert message.paths(Field("O")) == { ( @@ -4327,7 +4313,7 @@ def test_paths() -> None: def test_normalization() -> None: - assert TLV_MESSAGE.structure == sorted( + assert models.tlv_message().structure == sorted( [ Link(INITIAL, Field("Tag")), Link(Field("Tag"), Field("Length"), Equal(Variable("Tag"), Variable("TLV::Msg_Data"))), @@ -4339,11 +4325,11 @@ def test_normalization() -> None: def test_set_refinements() -> None: - message = MESSAGE.copy() + message = models.message().copy() assert message.type_.refinements == [] - message.set_refinements([REFINEMENT]) + message.set_refinements([models.refinement()]) assert message.type_.refinements == [ rty.Refinement( @@ -4362,29 +4348,32 @@ def test_set_refinements() -> None: def test_set_refinements_error() -> None: - message = MESSAGE.copy() + message = models.message().copy() with pytest.raises( FatalError, match=r"^model: error: setting refinements for different message$", ): message.set_refinements( - [REFINEMENT, Refinement("In_Message", TLV_MESSAGE, Field("Value"), MESSAGE)], + [ + models.refinement(), + Refinement("In_Message", models.tlv_message(), Field("Value"), models.message()), + ], ) def test_message_dependencies() -> None: - assert TLV_MESSAGE.dependencies == [ - TLV_TAG, - TLV_LENGTH, + assert models.tlv_message().dependencies == [ + models.tlv_tag(), + models.tlv_length(), OPAQUE, - TLV_MESSAGE, + models.tlv_message(), ] - assert SEQUENCE_MESSAGES_MESSAGE.dependencies == [ - SEQUENCE_LENGTH, + assert models.sequence_messages_message().dependencies == [ + models.sequence_length(), OPAQUE, - SEQUENCE_INNER_MESSAGE, - SEQUENCE_INNER_MESSAGES, - SEQUENCE_MESSAGES_MESSAGE, + models.sequence_inner_message(), + models.sequence_inner_messages(), + models.sequence_messages_message(), ] @@ -4404,9 +4393,9 @@ def test_message_str() -> None: ], { Field("A"): BOOLEAN, - Field("L"): INTEGER, - Field("O"): INTEGER, - Field("P"): INTEGER, + Field("L"): models.integer(), + Field("O"): models.integer(), + Field("P"): models.integer(), }, ) assert_equal( @@ -4430,23 +4419,23 @@ def test_message_str() -> None: def test_refinement_dependencies() -> None: - assert UNIVERSAL_REFINEMENT.direct_dependencies == [ - UNIVERSAL_MESSAGE, - UNIVERSAL_OPTION, - UNIVERSAL_REFINEMENT, + assert models.universal_refinement().direct_dependencies == [ + models.universal_message(), + models.universal_option(), + models.universal_refinement(), ] - assert UNIVERSAL_REFINEMENT.dependencies == [ - UNIVERSAL_MESSAGE_TYPE, - UNIVERSAL_LENGTH, + assert models.universal_refinement().dependencies == [ + models.universal_message_type(), + models.universal_length(), OPAQUE, - UNIVERSAL_OPTION_TYPE, - UNIVERSAL_OPTION_TYPES, - UNIVERSAL_OPTION, - UNIVERSAL_OPTIONS, - UNIVERSAL_VALUE, - UNIVERSAL_VALUES, - UNIVERSAL_MESSAGE, - UNIVERSAL_REFINEMENT, + models.universal_option_type(), + models.universal_option_types(), + models.universal_option(), + models.universal_options(), + models.universal_value(), + models.universal_values(), + models.universal_message(), + models.universal_refinement(), ] @@ -4455,13 +4444,18 @@ def test_refinement_invalid_package() -> None: RecordFluxError, match=r'^:22:10: model: error: unexpected format of package name "A::B"$', ): - Refinement(ID("A::B", Location((22, 10))), ETHERNET_FRAME, Field("Payload"), ETHERNET_FRAME) + Refinement( + ID("A::B", Location((22, 10))), + models.ethernet_frame(), + Field("Payload"), + models.ethernet_frame(), + ) def test_refinement_invalid_field_type() -> None: x = Field(ID("X", Location((20, 10)))) - message = Message("P::M", [Link(INITIAL, x), Link(x, FINAL)], {x: INTEGER}) + message = Message("P::M", [Link(INITIAL, x), Link(x, FINAL)], {x: models.integer()}) with pytest.raises( RecordFluxError, @@ -4572,8 +4566,8 @@ def test_boolean_variable_as_condition() -> None: Link(Field("Tag_2"), FINAL), ], { - Field("Tag_1"): INTEGER, - Field("Tag_2"): INTEGER, + Field("Tag_1"): models.integer(), + Field("Tag_2"): models.integer(), Field("Has_Tag"): BOOLEAN, }, ) @@ -4591,7 +4585,7 @@ def test_boolean_variable_as_condition() -> None: Link(Field("Value"), FINAL), ], { - Field("Tag"): TLV_TAG, + Field("Tag"): models.tlv_tag(), Field("Value"): OPAQUE, }, ), @@ -4613,7 +4607,7 @@ def test_boolean_variable_as_condition() -> None: Link(Field("Value"), FINAL), ], { - Field("Tag"): TLV_TAG, + Field("Tag"): models.tlv_tag(), Field("Value"): OPAQUE, }, ), @@ -4633,7 +4627,7 @@ def test_always_true_refinement(message: Message, condition: Expr) -> None: "In_Message", message, Field(ID("Value", location=Location((10, 20)))), - MESSAGE, + models.message(), condition, ) @@ -4650,7 +4644,7 @@ def test_always_true_refinement(message: Message, condition: Expr) -> None: Link(Field("Value"), FINAL), ], { - Field("Tag"): TLV_TAG, + Field("Tag"): models.tlv_tag(), Field("Value"): OPAQUE, }, ), @@ -4672,7 +4666,7 @@ def test_always_true_refinement(message: Message, condition: Expr) -> None: Link(Field("Value"), FINAL), ], { - Field("Tag"): TLV_TAG, + Field("Tag"): models.tlv_tag(), Field("Value"): OPAQUE, }, ), @@ -4692,7 +4686,7 @@ def test_always_false_refinement(message: Message, condition: Expr) -> None: "In_Message", message, Field(ID("Value", location=Location((10, 20)))), - MESSAGE, + models.message(), condition, ) @@ -4713,7 +4707,7 @@ def test_always_false_refinement(message: Message, condition: Expr) -> None: ), ], { - Field("Tag"): TLV_TAG, + Field("Tag"): models.tlv_tag(), }, ), ( @@ -4731,8 +4725,8 @@ def test_always_false_refinement(message: Message, condition: Expr) -> None: ), ], { - Field("Tag_1"): TLV_TAG, - Field("Tag_2"): TLV_TAG, + Field("Tag_1"): models.tlv_tag(), + Field("Tag_2"): models.tlv_tag(), }, ), ], @@ -4778,7 +4772,7 @@ def test_possibly_always_true_refinement( Link(Field("Value"), FINAL), ], { - Field("Tag"): TLV_TAG, + Field("Tag"): models.tlv_tag(), Field("Value"): OPAQUE, }, ) @@ -4786,12 +4780,13 @@ def test_possibly_always_true_refinement( Equal(Variable("Tag"), Variable("TLV::Msg_Data")), Equal(Variable("Tag"), Variable("TLV::Msg_Error")), ) + inner_message = models.message() monkeypatch.setattr(Proof, "result", ProofResult.UNKNOWN) Refinement( "In_Message", message, Field(ID("Value", location=Location((10, 20)))), - MESSAGE, + inner_message, condition, ).error.propagate() captured = capsys.readouterr() @@ -4828,9 +4823,9 @@ def test_possibly_always_true_refinement( [], [ (Field("F1"), OPAQUE.identifier, []), - (Field("F2"), INTEGER.identifier, []), - (Field("F3"), ENUMERATION.identifier, []), - (Field("F4"), INTEGER.identifier, []), + (Field("F2"), models.integer().identifier, []), + (Field("F3"), models.enumeration().identifier, []), + (Field("F4"), models.integer().identifier, []), ], None, None, @@ -4858,16 +4853,16 @@ def test_possibly_always_true_refinement( ], { Field("F1"): OPAQUE, - Field("F2"): INTEGER, - Field("F3"): ENUMERATION, - Field("F4"): INTEGER, + Field("F2"): models.integer(), + Field("F3"): models.enumeration(), + Field("F4"): models.integer(), }, ), ), ], ) def test_unchecked_message_checked(unchecked: UncheckedMessage, expected: Message) -> None: - assert unchecked.checked([OPAQUE, ENUMERATION, INTEGER]) == expected + assert unchecked.checked([OPAQUE, models.enumeration(), models.integer()]) == expected @pytest.mark.parametrize( @@ -4897,9 +4892,9 @@ def test_unchecked_message_checked(unchecked: UncheckedMessage, expected: Messag [], [ (Field("F1"), OPAQUE.identifier, []), - (Field("F2"), INTEGER.identifier, []), - (Field("F3"), ENUMERATION.identifier, []), - (Field("F4"), INTEGER.identifier, []), + (Field("F2"), models.integer().identifier, []), + (Field("F3"), models.enumeration().identifier, []), + (Field("F4"), models.integer().identifier, []), ], None, None, @@ -4911,4 +4906,4 @@ def test_unchecked_message_checked(unchecked: UncheckedMessage, expected: Messag ) def test_unchecked_message_checked_error(unchecked: UncheckedMessage, expected: str) -> None: with pytest.raises(RecordFluxError, match=expected): - unchecked.checked([OPAQUE, ENUMERATION, INTEGER]) + unchecked.checked([OPAQUE, models.enumeration(), models.integer()]) diff --git a/tests/unit/model/model_test.py b/tests/unit/model/model_test.py index 627bfc377..abbf76bab 100644 --- a/tests/unit/model/model_test.py +++ b/tests/unit/model/model_test.py @@ -55,9 +55,9 @@ def test_name_conflict_types() -> None: def test_conflicting_refinements() -> None: - r1 = copy(models.REFINEMENT) + r1 = copy(models.refinement()) r1.location = Location((10, 20)) - r2 = copy(models.REFINEMENT) + r2 = copy(models.refinement()) r2.location = Location((10, 30)) with pytest.raises( @@ -69,13 +69,13 @@ def test_conflicting_refinements() -> None: r"$" ), ): - Model([models.MESSAGE, r1, r2]) + Model([models.message(), r1, r2]) def test_name_conflict_sessions() -> None: - s1 = copy(models.SESSION) + s1 = copy(models.session()) s1.location = Location((10, 20)) - s2 = copy(models.SESSION) + s2 = copy(models.session()) s2.location = Location((10, 30)) with pytest.raises( @@ -173,18 +173,18 @@ def test_invalid_enumeration_type_builtin_literals() -> None: @pytest.mark.parametrize( ("types", "model"), [ - ([models.TLV_MESSAGE], models.TLV_MODEL), - ([models.TLV_WITH_CHECKSUM_MESSAGE], models.TLV_WITH_CHECKSUM_MODEL), - ([models.ETHERNET_FRAME], models.ETHERNET_MODEL), - ([models.ENUMERATION_MESSAGE], models.ENUMERATION_MODEL), - ([models.UNIVERSAL_REFINEMENT], models.UNIVERSAL_MODEL), + ([models.tlv_message()], models.tlv_model()), + ([models.tlv_with_checksum_message()], models.tlv_with_checksum_model()), + ([models.ethernet_frame()], models.ethernet_model()), + ([models.enumeration_message()], models.enumeration_model()), + ([models.universal_refinement()], models.universal_model()), ( [ - models.SEQUENCE_MESSAGE, - models.SEQUENCE_MESSAGES_MESSAGE, - models.SEQUENCE_SEQUENCE_SIZE_DEFINED_BY_MESSAGE_SIZE, + models.sequence_message(), + models.sequence_messages_message(), + models.sequence_sequence_size_defined_by_message_size(), ], - models.SEQUENCE_MODEL, + models.sequence_model(), ), ], ) diff --git a/tests/unit/model/session_test.py b/tests/unit/model/session_test.py index 88cf4a9d8..fc53d24ed 100644 --- a/tests/unit/model/session_test.py +++ b/tests/unit/model/session_test.py @@ -21,15 +21,7 @@ declaration as decl, statement as stmt, ) -from tests.data.models import ( - NULL_MESSAGE, - NULL_MESSAGE_IN_TLV_MESSAGE, - TLV_MESSAGE, - TLV_MESSAGES, - TLV_TAG, - TLV_TAGS, - UNIVERSAL_MESSAGE, -) +from tests.data import models from tests.utils import assert_equal, assert_session_model_error, get_test_model @@ -80,7 +72,7 @@ def test_str() -> None: BOOLEAN.identifier, ), ], - [BOOLEAN, TLV_MESSAGE], + [BOOLEAN, models.tlv_message()], ), ), textwrap.dedent( @@ -407,7 +399,7 @@ def test_declared_variable() -> None: ], declarations=[decl.VariableDeclaration("Defined", "TLV::Tag")], parameters=[], - types=[TLV_TAG], + types=[models.tlv_tag()], ) @@ -492,7 +484,7 @@ def test_declared_local_variable_valid() -> None: ], declarations=[decl.VariableDeclaration("Global", "TLV::Message")], parameters=[], - types=[TLV_MESSAGE], + types=[models.tlv_message()], ) @@ -522,7 +514,7 @@ def test_declared_local_variable_message_field() -> None: ], declarations=[decl.VariableDeclaration("Global", "TLV::Message")], parameters=[], - types=[TLV_MESSAGE], + types=[models.tlv_message()], ) @@ -801,7 +793,7 @@ def test_channel_read() -> None: parameters=[ decl.ChannelDeclaration("Some_Channel", readable=True, writable=False), ], - types=[TLV_MESSAGE], + types=[models.tlv_message()], ) @@ -822,7 +814,7 @@ def test_channel_write() -> None: parameters=[ decl.ChannelDeclaration("Some_Channel", readable=False, writable=True), ], - types=[TLV_MESSAGE], + types=[models.tlv_message()], ) @@ -840,7 +832,7 @@ def test_channel_read_undeclared() -> None: ], declarations=[decl.VariableDeclaration("Result", "TLV::Message")], parameters=[], - types=[TLV_MESSAGE], + types=[models.tlv_message()], regex=r'^:10:20: model: error: undefined channel "Undeclared"$', ) @@ -863,7 +855,7 @@ def test_channel_read_invalid_type() -> None: ], declarations=[decl.VariableDeclaration("Result", "TLV::Message")], parameters=[], - types=[TLV_MESSAGE], + types=[models.tlv_message()], regex=( r"^" r":10:20: model: error: channel parameter must be a variable\n" @@ -894,7 +886,7 @@ def test_channel_read_invalid_mode() -> None: parameters=[ decl.ChannelDeclaration("Channel", readable=False, writable=True), ], - types=[TLV_MESSAGE], + types=[models.tlv_message()], regex=( r"^" r":10:20: model: error: expected readable channel\n" @@ -922,7 +914,7 @@ def test_channel_write_invalid_mode() -> None: parameters=[ decl.ChannelDeclaration("Out_Channel", readable=True, writable=False), ], - types=[TLV_MESSAGE], + types=[models.tlv_message()], regex=( r"^" r":10:20: model: error: expected writable channel\n" @@ -948,7 +940,7 @@ def test_channel_attribute_has_data() -> None: decl.VariableDeclaration("Result", "Boolean"), ], parameters=[], - types=[BOOLEAN, TLV_MESSAGE], + types=[BOOLEAN, models.tlv_message()], ) @@ -1113,7 +1105,7 @@ def test_renaming() -> None: ), ], parameters=[], - types=[NULL_MESSAGE, TLV_MESSAGE, NULL_MESSAGE_IN_TLV_MESSAGE], + types=[models.null_message(), models.tlv_message(), models.null_message_in_tlv_message()], regex=r"^:10:20: model: error: renaming declarations not yet supported$", ) @@ -1148,7 +1140,11 @@ def test_renaming_invalid() -> None: ), ], parameters=[], - types=[UNIVERSAL_MESSAGE, TLV_MESSAGE, NULL_MESSAGE_IN_TLV_MESSAGE], + types=[ + models.universal_message(), + models.tlv_message(), + models.null_message_in_tlv_message(), + ], regex=( r"^" r':10:20: model: error: invalid renaming to "Universal_Message"\n' @@ -1215,7 +1211,7 @@ def test_for_all() -> None: ], declarations=[decl.VariableDeclaration("List", "TLV::Messages")], parameters=[], - types=[BOOLEAN, TLV_MESSAGES], + types=[BOOLEAN, models.tlv_messages()], regex=r"^:10:20: model: error: quantified expressions not yet supported$", ) @@ -1242,7 +1238,7 @@ def test_append() -> None: ], declarations=[decl.VariableDeclaration("List", "TLV::Messages")], parameters=[], - types=[TLV_TAG, TLV_MESSAGE, TLV_MESSAGES], + types=[models.tlv_tag(), models.tlv_message(), models.tlv_messages()], ) @@ -1287,7 +1283,7 @@ def test_append_message_unsupported() -> None: decl.VariableDeclaration("Element", "TLV::Message"), ], parameters=[], - types=[TLV_MESSAGE, TLV_MESSAGES], + types=[models.tlv_message(), models.tlv_messages()], regex=( r"^:10:20: model: error: appending independently created message not supported\n" r":10:20: model: info: message aggregate should be used instead$" @@ -1312,7 +1308,7 @@ def test_extend() -> None: decl.VariableDeclaration("Element", "TLV::Messages"), ], parameters=[], - types=[BOOLEAN, TLV_MESSAGES], + types=[BOOLEAN, models.tlv_messages()], ) @@ -1360,7 +1356,7 @@ def test_message_aggregate_with_undefined_parameter() -> None: ], declarations=[decl.VariableDeclaration("Data", "TLV::Message")], parameters=[], - types=[BOOLEAN, TLV_MESSAGE], + types=[BOOLEAN, models.tlv_message()], regex=r'^:10:20: model: error: undefined variable "Undef"$', ) @@ -1426,7 +1422,7 @@ def test_comprehension() -> None: decl.VariableDeclaration("Result", "TLV::Tags"), ], parameters=[], - types=[BOOLEAN, TLV_MESSAGES, TLV_TAGS], + types=[BOOLEAN, models.tlv_messages(), models.tlv_tags()], ) @@ -1456,7 +1452,7 @@ def test_assignment_opaque_function_undef_parameter() -> None: parameters=[ decl.FunctionDeclaration("Sub", [decl.Argument("Param", "Opaque")], "TLV::Message"), ], - types=[BOOLEAN, OPAQUE, TLV_MESSAGE], + types=[BOOLEAN, OPAQUE, models.tlv_message()], regex=r'^:10:20: model: error: undefined variable "UndefData"$', ) @@ -1485,7 +1481,7 @@ def test_assignment_opaque_function_result() -> None: parameters=[ decl.FunctionDeclaration("Sub", [decl.Argument("Param", "Opaque")], "TLV::Message"), ], - types=[BOOLEAN, OPAQUE, TLV_MESSAGE], + types=[BOOLEAN, OPAQUE, models.tlv_message()], ) @@ -1509,7 +1505,7 @@ def test_message_field_assignment_with_invalid_field_name() -> None: decl.VariableDeclaration("Message", "TLV::Message"), ], parameters=[], - types=[TLV_MESSAGE], + types=[models.tlv_message()], regex=r'^:1:2: model: error: invalid message field "Invalid"$', ) @@ -1566,7 +1562,7 @@ def test_message_field_assignment_with_incompatible_field_type() -> None: decl.VariableDeclaration("Message", "TLV::Message"), ], parameters=[], - types=[TLV_MESSAGE], + types=[models.tlv_message()], regex=( r"^" r':1:2: model: error: expected enumeration type "TLV::Tag"\n' @@ -1597,7 +1593,7 @@ def test_message_field_assignment_with_incompatible_variable_type() -> None: decl.VariableDeclaration("Message", "TLV::Tag"), ], parameters=[], - types=[TLV_TAG], + types=[models.tlv_tag()], regex=( r"^" r":1:2: model: error: expected message type\n" @@ -1631,7 +1627,7 @@ def test_conversion() -> None: decl.VariableDeclaration("Converted", "Null::Message"), ], parameters=[], - types=[NULL_MESSAGE, TLV_MESSAGE, NULL_MESSAGE_IN_TLV_MESSAGE], + types=[models.null_message(), models.tlv_message(), models.null_message_in_tlv_message()], ) @@ -1659,7 +1655,7 @@ def test_conversion_undefined() -> None: decl.VariableDeclaration("Converted", "P::Undef", location=Location((10, 20))), ], parameters=[], - types=[TLV_MESSAGE], + types=[models.tlv_message()], regex=( r'^:10:20: model: error: undefined type "P::Undef"\n' r':10:30: model: error: invalid conversion to "P::Undef"\n' @@ -1693,7 +1689,7 @@ def test_conversion_invalid_argument() -> None: decl.VariableDeclaration("Converted", "TLV::Message"), ], parameters=[], - types=[OPAQUE, TLV_MESSAGE], + types=[OPAQUE, models.tlv_message()], regex=( r"^:10:20: model: error: invalid argument for conversion," r" expected message field$" @@ -1725,7 +1721,7 @@ def test_conversion_invalid() -> None: decl.VariableDeclaration("Converted", "Null::Message"), ], parameters=[], - types=[NULL_MESSAGE, TLV_MESSAGE], + types=[models.null_message(), models.tlv_message()], regex=( r"^" r':10:20: model: error: invalid conversion to "Null::Message"\n' @@ -1813,7 +1809,7 @@ def test_undefined_type_in_declarations(declarations: abc.Sequence[decl.BasicDec ], declarations=declarations, parameters=[], - types=[TLV_MESSAGE], + types=[models.tlv_message()], regex=r'^:10:20: model: error: undefined type "Undefined"$', ) @@ -1854,7 +1850,7 @@ def test_undefined_type_in_local_declarations( ], declarations=[], parameters=[], - types=[TLV_MESSAGE], + types=[models.tlv_message()], regex=r'^:10:20: model: error: undefined type "Undefined"$', ) @@ -1932,7 +1928,7 @@ def test_type_error_in_renaming_declaration() -> None: [ decl.VariableDeclaration( "M", - UNIVERSAL_MESSAGE.identifier, + models.universal_message().identifier, location=Location((1, 2)), ), ], @@ -1948,7 +1944,7 @@ def test_type_error_in_renaming_declaration() -> None: stmt.Read( "C1", expr.MessageAggregate( - UNIVERSAL_MESSAGE.identifier, + models.universal_message().identifier, {"Message_Type": expr.Variable("Universal::MT_Null")}, ), location=Location((1, 2)), @@ -2033,16 +2029,16 @@ def test_conflicting_actions( ), ], declarations=[ - decl.VariableDeclaration("M1", UNIVERSAL_MESSAGE.identifier), - decl.VariableDeclaration("M2", UNIVERSAL_MESSAGE.identifier), - decl.VariableDeclaration("M3", UNIVERSAL_MESSAGE.identifier), + decl.VariableDeclaration("M1", models.universal_message().identifier), + decl.VariableDeclaration("M2", models.universal_message().identifier), + decl.VariableDeclaration("M3", models.universal_message().identifier), decl.VariableDeclaration("X", BOOLEAN.identifier), ], parameters=[ decl.ChannelDeclaration("C1", readable=True, writable=True), decl.ChannelDeclaration("C2", readable=True, writable=True), ], - types=[BOOLEAN, UNIVERSAL_MESSAGE, *UNIVERSAL_MESSAGE.types.values()], + types=[BOOLEAN, models.universal_message(), *models.universal_message().types.values()], regex=rf"^{errors}$", ) @@ -2068,7 +2064,7 @@ def test_missing_exception_transition() -> None: ], declarations=[decl.VariableDeclaration("List", "TLV::Messages")], parameters=[], - types=[TLV_TAG, TLV_MESSAGE, TLV_MESSAGES], + types=[models.tlv_tag(), models.tlv_message(), models.tlv_messages()], regex=r'^:10:20: model: error: missing exception transition in state "Start"$', ) @@ -2100,7 +2096,7 @@ def test_missing_exception_transition() -> None: ), ], ), - [TLV_TAG], + [models.tlv_tag()], [decl.FunctionDeclaration("SubProg", [], "TLV::Tag")], ), ], @@ -2148,7 +2144,7 @@ def test_resolving_of_function_calls() -> None: parameters=[ decl.FunctionDeclaration("Func", [], "Boolean"), ], - types=[BOOLEAN, OPAQUE, TLV_MESSAGE], + types=[BOOLEAN, OPAQUE, models.tlv_message()], ) global_decl = session.declarations[ID("Global")] @@ -2835,7 +2831,7 @@ def test_message_assignment_from_function() -> None: ], declarations=[], parameters=[decl.FunctionDeclaration("SubProg", [], "Null::Message")], - types=[NULL_MESSAGE], + types=[models.null_message()], location=Location((1, 1)), ) @@ -2934,7 +2930,7 @@ def test_message_assignment_from_function() -> None: BOOLEAN.identifier, ), ], - [BOOLEAN, TLV_MESSAGE], + [BOOLEAN, models.tlv_message()], Location((1, 2)), ), ), @@ -2943,7 +2939,7 @@ def test_message_assignment_from_function() -> None: def test_unchecked_session_checked(unchecked: UncheckedSession, expected: Session) -> None: assert ( unchecked.checked( - [BOOLEAN, TLV_MESSAGE], + [BOOLEAN, models.tlv_message()], ) == expected ) diff --git a/tests/unit/model/type_test.py b/tests/unit/model/type_test.py index 6e48972ab..425931d62 100644 --- a/tests/unit/model/type_test.py +++ b/tests/unit/model/type_test.py @@ -387,15 +387,15 @@ def test_enumeration_str() -> None: def test_sequence_dependencies() -> None: - assert models.SEQUENCE_INTEGER_VECTOR.dependencies == [ - models.SEQUENCE_INTEGER, - models.SEQUENCE_INTEGER_VECTOR, + assert models.sequence_integer_vector().dependencies == [ + models.sequence_integer(), + models.sequence_integer_vector(), ] - assert models.SEQUENCE_INNER_MESSAGES.dependencies == [ - models.SEQUENCE_LENGTH, + assert models.sequence_inner_messages().dependencies == [ + models.sequence_length(), OPAQUE, - models.SEQUENCE_INNER_MESSAGE, - models.SEQUENCE_INNER_MESSAGES, + models.sequence_inner_message(), + models.sequence_inner_messages(), ] @@ -403,7 +403,7 @@ def test_sequence_dependencies() -> None: ("element_type", "error"), [ ( - Sequence("P::B", models.INTEGER, Location((3, 4))), + Sequence("P::B", models.integer(), Location((3, 4))), r':1:2: model: error: invalid element type of sequence "A"\n' r':3:4: model: info: type "B" must be scalar or message', ), @@ -435,7 +435,7 @@ def test_sequence_dependencies() -> None: Link(INITIAL, Field("A"), condition=Equal(Size("Message"), Number(8))), Link(Field("A"), FINAL), ], - {Field("A"): models.INTEGER}, + {Field("A"): models.integer()}, location=Location((3, 4)), ), r':1:2: model: error: invalid element type of sequence "A"\n' diff --git a/tests/unit/specification/parser_test.py b/tests/unit/specification/parser_test.py index 1e243f22b..440171583 100644 --- a/tests/unit/specification/parser_test.py +++ b/tests/unit/specification/parser_test.py @@ -2334,7 +2334,7 @@ def test_create_model_message_in_message() -> None: def test_create_model_ethernet_frame() -> None: - assert_messages_files([f"{SPEC_DIR}/ethernet.rflx"], [models.ETHERNET_FRAME]) + assert_messages_files([f"{SPEC_DIR}/ethernet.rflx"], [models.ethernet_frame()]) def test_create_model_type_derivation_message() -> None: diff --git a/tools/generate_spark_test_code.py b/tools/generate_spark_test_code.py index abcc7a208..32c1fe499 100755 --- a/tools/generate_spark_test_code.py +++ b/tools/generate_spark_test_code.py @@ -14,7 +14,7 @@ from rflx.model import Model from rflx.specification import Parser from tests.const import FEATURE_DIR, SPEC_DIR -from tests.unit.generator_test import MODELS +from tests.data import models logging.basicConfig(level=logging.INFO, format="%(message)s") logging.disable(logging.NOTSET) @@ -49,7 +49,7 @@ def generate_spark_tests() -> None: parser = Parser() parser.parse(*SPECIFICATION_FILES) - model = merge_models([parser.create_model(), *MODELS]) + model = merge_models([parser.create_model(), *models.spark_test_models()]) Generator( "RFLX", reproducible=True,