diff --git a/pydifact/segmentcollection.py b/pydifact/segmentcollection.py index 955d575..ba986ce 100644 --- a/pydifact/segmentcollection.py +++ b/pydifact/segmentcollection.py @@ -21,7 +21,8 @@ # THE SOFTWARE. import collections -from typing import List, Optional +from typing import List, Optional, Tuple +import datetime from pydifact.parser import Parser from pydifact.segments import Segment @@ -30,7 +31,7 @@ import codecs -class SegmentCollection: +class AbstractSegmentsContainer: """Represent a collection of EDI Segments for both reading and writing.""" def __init__(self): @@ -135,14 +136,148 @@ def add_segment(self, segment: Segment) -> "SegmentCollection": self.segments.append(segment) return self + def get_header_segment(self) -> Optional[Segment]: + """Craft and return this container header segment (if any) + + :returns: None if there is no header for that container + """ + return None + + def get_footer_segment(self) -> Optional[Segment]: + """Craft and return this container footer segment (if any) + :returns: None if there is no footer for that container + """ + return None + def serialize(self, break_lines: bool = False) -> str: """Serialize all the segments added to this object. :param break_lines: if True, insert line break after each segment terminator. """ + header = self.get_header_segment() + footer = self.get_footer_segment() + out = [] + + if header: + out.append(header) + out += self.segments + if footer: + out.append(footer) + return Serializer(self.characters).serialize( - self.segments, self.has_una_segment, break_lines + out, self.has_una_segment, break_lines, ) def __str__(self) -> str: """Allow the object to be serialized by casting to a string.""" return self.serialize() + +# For backward compatibility +SegmentCollection = AbstractSegmentsContainer + + +class Message(AbstractSegmentsContainer): + """ + A message (started by UNH segment, ended by UNT segment) + + Optional features of UNH are not yet supported. + + https://www.stylusstudio.com/edifact/40100/UNH_.htm + https://www.stylusstudio.com/edifact/40100/UNT_.htm + """ + def __init__( + self, + reference_number: str, + identifier: Tuple, + ): + super().__init__() + self.reference_number = reference_number + self.identifier = identifier + + def get_header_segment(self) -> Segment: + return Segment( + "UNH", + self.reference_number, + [str(i) for i in self.identifier], + ) + def get_footer_segment(self) -> Segment: + return Segment( + "UNT", + self.reference_number, + str(len(self.segments)), + ) + + +class Interchange(AbstractSegmentsContainer): + """ + An interchange (started by UNB segment, ended by UNZ segment) + + Optional features of UNB are not yet supported. + + Functional groups are not yet supported + + https://www.stylusstudio.com/edifact/40100/UNB_.htm + https://www.stylusstudio.com/edifact/40100/UNZ_.htm + """ + def __init__( + self, + sender: str, + recipient: str, + control_reference: str, + syntax_identifier: Tuple[str, int], + delimiters: Characters = Characters(), + timestamp: datetime.datetime = None, + ): + super().__init__() + self.sender = sender + self.recipient = recipient + self.control_reference = control_reference + self.syntax_identifier = syntax_identifier + self.delimiters = delimiters + self.timestamp = timestamp or datetime.datetime.now() + + def get_header_segment(self) -> Segment: + return Segment( + "UNB", + [str(i) for i in self.syntax_identifier], + self.sender, + self.recipient, + ['{:%y%m%d}'.format(self.timestamp), '{:%H%M}'.format(self.timestamp)], + self.control_reference, + ) + + def get_footer_segment(self) -> Segment: + return Segment( + "UNZ", + str(len(self.segments)), + self.control_reference, + ) + + @classmethod + def from_segments( + cls, segments: list or collections.Iterable + ) -> "Interchange": + segments = iter(segments) + + first_segment = next(segments) + if first_segment.tag == 'UNA': + self.has_una_segment = True + self.characters = Characters.from_str(segment.elements[0]) + unb = next(segments) + elif first_segment.tag == 'UNB': + unb = first_segment + else: + raise SyntaxError('An interchange must start with UNB or UNA and UNB') + + datetime_str = '-'.join(unb.elements[3]) + timestamp = datetime.datetime.strptime(datetime_str, '%y%m%d-%H%M') + interchange = Interchange( + syntax_identifier=unb.elements[0], + sender=unb.elements[1], + recipient=unb.elements[2], + timestamp=timestamp, + control_reference=unb.elements[4], + ) + + return interchange.add_segments( + segment for segment in segments if segment.tag != 'UNZ' + ) diff --git a/tests/test_segmentcollection.py b/tests/test_segmentcollection.py index 1374915..b100853 100644 --- a/tests/test_segmentcollection.py +++ b/tests/test_segmentcollection.py @@ -13,9 +13,11 @@ # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . +import datetime + import pytest -from pydifact.segmentcollection import SegmentCollection +from pydifact.segmentcollection import Interchange, Message, SegmentCollection from pydifact.segments import Segment from pydifact.api import EDISyntaxError @@ -95,3 +97,41 @@ def test_malformed_tag4(): def test_malformed_tag5(): with pytest.raises(EDISyntaxError): SegmentCollection.from_str("IMD+F++:::This is '-:malformed string'") + + +def test_empty_interchange(): + i = Interchange( + sender='1234', + recipient='3333', + timestamp=datetime.datetime(2020,1,2,22,12), + control_reference='42', + syntax_identifier=('UNOC', 1), + ) + + assert str(i) == ( + "UNB+UNOC:1+1234+3333+200102:2212+42'" + "UNZ+0+42'" + ) + + +def test_empty_interchange_from_str(): + i = Interchange.from_str( + "UNB+UNOC:1+1234+3333+200102:2212+42'" + "UNZ+0+42'" + ) + assert str(i) == ( + "UNB+UNOC:1+1234+3333+200102:2212+42'" + "UNZ+0+42'" + ) + + +def test_empty_message(): + m = Message( + reference_number='42z42', + identifier=('PAORES', 93, 1, 'IA'), + ) + + assert str(m) == ( + "UNH+42z42+PAORES:93:1:IA'" + "UNT+42z42+0'" + )