Skip to content

Commit

Permalink
Implement Message and Interchange containers
Browse files Browse the repository at this point in the history
  • Loading branch information
JocelynDelalande committed Oct 1, 2020
1 parent e031ae1 commit dd61345
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 13 deletions.
161 changes: 149 additions & 12 deletions pydifact/segmentcollection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
# THE SOFTWARE.

import collections
from typing import List, Optional
from typing import List, Optional, Tuple
import datetime

from pydifact.parser import Parser
from pydifact.segments import Segment
Expand All @@ -30,7 +31,7 @@
import codecs


class SegmentCollection:
class AbstractSegmentsContainer:
"""Represent a collection of EDI Segments for both reading and writing."""

def __init__(self):
Expand All @@ -43,8 +44,8 @@ def __init__(self):
self.has_una_segment = False

@classmethod
def from_file(cls, file: str, encoding: str = "iso8859-1") -> "SegmentCollection":
"""Create a SegmentCollection instance from a file.
def from_file(cls, file: str, encoding: str = "iso8859-1") -> "AstractSegmentsContainer":
"""Create a AstractSegmentsContainer instance from a file.
Raises FileNotFoundError if filename is not found.
:param encoding: an optional string which specifies the encoding. Default is "iso8859-1".
Expand All @@ -63,7 +64,6 @@ def from_file(cls, file: str, encoding: str = "iso8859-1") -> "SegmentCollection
def from_str(cls, string: str) -> "SegmentCollection":
"""Create a SegmentCollection instance from a string.
:param string: The EDI content
:rtype: SegmentCollection
"""
segments = Parser().parse(string)

Expand All @@ -72,15 +72,14 @@ def from_str(cls, string: str) -> "SegmentCollection":
@classmethod
def from_segments(
cls, segments: list or collections.Iterable
) -> "SegmentCollection":
"""Create a new SegmentCollection instance from a iterable list of segments.
) -> "AbstractSegmentsContainer":
"""Create a new AbstractSegmentsContainer instance from a iterable list of segments.
:param segments: The segments of the EDI interchange
:type segments: list/iterable of Segment
:rtype: SegmentCollection
"""

# create a new instance of SegmentCollection and return it
# create a new instance of AbstractSegmentsContainer and return it
# with the added segments
return cls().add_segments(segments)

Expand All @@ -106,7 +105,7 @@ def get_segment(self, name: str) -> Optional[Segment]:

def add_segments(
self, segments: List[Segment] or collections.Iterable
) -> "SegmentCollection":
) -> "AbstractSegmentsContainer":
"""Add multiple segments to the collection. Passing a UNA segment means setting/overriding the control
characters and setting the serializer to output the Service String Advice. If you wish to change the control
characters from the default and not output the Service String Advice, change self.characters instead,
Expand All @@ -120,7 +119,7 @@ def add_segments(

return self

def add_segment(self, segment: Segment) -> "SegmentCollection":
def add_segment(self, segment: Segment) -> "AbstractSegmentsContainer":
"""Append a segment to the collection. Passing a UNA segment means setting/overriding the control
characters and setting the serializer to output the Service String Advice. If you wish to change the control
characters from the default and not output the Service String Advice, change self.characters instead,
Expand All @@ -135,14 +134,152 @@ def add_segment(self, segment: Segment) -> "SegmentCollection":
self.segments.append(segment)
return self

def get_header_segment(self) -> Optional[Segment]:
"""Craft and return this container header segment (if any)
:returns: None if there is no header for that container
"""
return None

def get_footer_segment(self) -> Optional[Segment]:
"""Craft and return this container footer segment (if any)
:returns: None if there is no footer for that container
"""
return None

def serialize(self, break_lines: bool = False) -> str:
"""Serialize all the segments added to this object.
:param break_lines: if True, insert line break after each segment terminator.
"""
header = self.get_header_segment()
footer = self.get_footer_segment()
out = []

if header:
out.append(header)
out += self.segments
if footer:
out.append(footer)

return Serializer(self.characters).serialize(
self.segments, self.has_una_segment, break_lines
out, self.has_una_segment, break_lines,
)

def __str__(self) -> str:
"""Allow the object to be serialized by casting to a string."""
return self.serialize()


# For backward compatibility
SegmentCollection = AbstractSegmentsContainer


class Message(AbstractSegmentsContainer):
"""
A message (started by UNH segment, ended by UNT segment)
Optional features of UNH are not yet supported.
https://www.stylusstudio.com/edifact/40100/UNH_.htm
https://www.stylusstudio.com/edifact/40100/UNT_.htm
"""
def __init__(
self,
reference_number: str,
identifier: Tuple,
):
super().__init__()
self.reference_number = reference_number
self.identifier = identifier

def get_header_segment(self) -> Segment:
return Segment(
"UNH",
self.reference_number,
[str(i) for i in self.identifier],
)

def get_footer_segment(self) -> Segment:
return Segment(
"UNT",
self.reference_number,
str(len(self.segments)),
)


class Interchange(AbstractSegmentsContainer):
"""
An interchange (started by UNB segment, ended by UNZ segment)
Optional features of UNB are not yet supported.
Functional groups are not yet supported
https://www.stylusstudio.com/edifact/40100/UNB_.htm
https://www.stylusstudio.com/edifact/40100/UNZ_.htm
"""
def __init__(
self,
sender: str,
recipient: str,
control_reference: str,
syntax_identifier: Tuple[str, int],
delimiters: Characters = Characters(),
timestamp: datetime.datetime = None,
):
super().__init__()
self.sender = sender
self.recipient = recipient
self.control_reference = control_reference
self.syntax_identifier = syntax_identifier
self.delimiters = delimiters
self.timestamp = timestamp or datetime.datetime.now()

def get_header_segment(self) -> Segment:
return Segment(
"UNB",
[str(i) for i in self.syntax_identifier],
self.sender,
self.recipient,
[f'{self.timestamp:%y%m%d}', f'{self.timestamp:%H%M}'],
self.control_reference,
)

def get_footer_segment(self) -> Segment:
return Segment(
"UNZ",
str(len(self.segments)),
self.control_reference,
)

@classmethod
def from_segments(
cls, segments: list or collections.Iterable
) -> "Interchange":
segments = iter(segments)

first_segment = next(segments)
if first_segment.tag == 'UNA':
unb = next(segments)
elif first_segment.tag == 'UNB':
unb = first_segment
else:
raise SyntaxError('An interchange must start with UNB or UNA and UNB')

datetime_str = '-'.join(unb.elements[3])
timestamp = datetime.datetime.strptime(datetime_str, '%y%m%d-%H%M')
interchange = Interchange(
syntax_identifier=unb.elements[0],
sender=unb.elements[1],
recipient=unb.elements[2],
timestamp=timestamp,
control_reference=unb.elements[4],
)

if first_segment.tag == 'UNA':
interchange.has_una_segment = True
interchange.characters = Characters.from_str(unb.elements[0])

return interchange.add_segments(
segment for segment in segments if segment.tag != 'UNZ'
)
42 changes: 41 additions & 1 deletion tests/test_segmentcollection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime

import pytest

from pydifact.segmentcollection import SegmentCollection
from pydifact.segmentcollection import Interchange, Message, SegmentCollection
from pydifact.segments import Segment
from pydifact.api import EDISyntaxError

Expand Down Expand Up @@ -95,3 +97,41 @@ def test_malformed_tag4():
def test_malformed_tag5():
with pytest.raises(EDISyntaxError):
SegmentCollection.from_str("IMD+F++:::This is '-:malformed string'")


def test_empty_interchange():
i = Interchange(
sender='1234',
recipient='3333',
timestamp=datetime.datetime(2020,1,2,22,12),
control_reference='42',
syntax_identifier=('UNOC', 1),
)

assert str(i) == (
"UNB+UNOC:1+1234+3333+200102:2212+42'"
"UNZ+0+42'"
)


def test_empty_interchange_from_str():
i = Interchange.from_str(
"UNB+UNOC:1+1234+3333+200102:2212+42'"
"UNZ+0+42'"
)
assert str(i) == (
"UNB+UNOC:1+1234+3333+200102:2212+42'"
"UNZ+0+42'"
)


def test_empty_message():
m = Message(
reference_number='42z42',
identifier=('PAORES', 93, 1, 'IA'),
)

assert str(m) == (
"UNH+42z42+PAORES:93:1:IA'"
"UNT+42z42+0'"
)

0 comments on commit dd61345

Please sign in to comment.