Skip to content

Commit

Permalink
Implement Message and Interchange containers
Browse files Browse the repository at this point in the history
  • Loading branch information
JocelynDelalande committed Sep 8, 2020
1 parent e031ae1 commit ddc1dd7
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 4 deletions.
141 changes: 138 additions & 3 deletions pydifact/segmentcollection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
# THE SOFTWARE.

import collections
from typing import List, Optional
from typing import List, Optional, Tuple
import datetime

from pydifact.parser import Parser
from pydifact.segments import Segment
Expand All @@ -30,7 +31,7 @@
import codecs


class SegmentCollection:
class AbstractSegmentsContainer:
"""Represent a collection of EDI Segments for both reading and writing."""

def __init__(self):
Expand Down Expand Up @@ -135,14 +136,148 @@ def add_segment(self, segment: Segment) -> "SegmentCollection":
self.segments.append(segment)
return self

def get_header_segment(self) -> Optional[Segment]:
"""Craft and return this container header segment (if any)
:returns: None if there is no header for that container
"""
return None

def get_footer_segment(self) -> Optional[Segment]:
"""Craft and return this container footer segment (if any)
:returns: None if there is no footer for that container
"""
return None

def serialize(self, break_lines: bool = False) -> str:
"""Serialize all the segments added to this object.
:param break_lines: if True, insert line break after each segment terminator.
"""
header = self.get_header_segment()
footer = self.get_footer_segment()
out = []

if header:
out.append(header)
out += self.segments
if footer:
out.append(footer)

return Serializer(self.characters).serialize(
self.segments, self.has_una_segment, break_lines
out, self.has_una_segment, break_lines,
)

def __str__(self) -> str:
"""Allow the object to be serialized by casting to a string."""
return self.serialize()

# For backward compatibility
SegmentCollection = AbstractSegmentsContainer


class Message(AbstractSegmentsContainer):
"""
A message (started by UNH segment, ended by UNT segment)
Optional features of UNH are not yet supported.
https://www.stylusstudio.com/edifact/40100/UNH_.htm
https://www.stylusstudio.com/edifact/40100/UNT_.htm
"""
def __init__(
self,
reference_number: str,
identifier: Tuple,
):
super().__init__()
self.reference_number = reference_number
self.identifier = identifier

def get_header_segment(self) -> Segment:
return Segment(
"UNH",
self.reference_number,
[str(i) for i in self.identifier],
)
def get_footer_segment(self) -> Segment:
return Segment(
"UNT",
self.reference_number,
str(len(self.segments)),
)


class Interchange(AbstractSegmentsContainer):
"""
An interchange (started by UNB segment, ended by UNZ segment)
Optional features of UNB are not yet supported.
Functional groups are not yet supported
https://www.stylusstudio.com/edifact/40100/UNB_.htm
https://www.stylusstudio.com/edifact/40100/UNZ_.htm
"""
def __init__(
self,
sender: str,
recipient: str,
control_reference: str,
syntax_identifier: Tuple[str, int],
delimiters: Characters = Characters(),
timestamp: datetime.datetime = None,
):
super().__init__()
self.sender = sender
self.recipient = recipient
self.control_reference = control_reference
self.syntax_identifier = syntax_identifier
self.delimiters = delimiters
self.timestamp = timestamp or datetime.datetime.now()

def get_header_segment(self) -> Segment:
return Segment(
"UNB",
[str(i) for i in self.syntax_identifier],
self.sender,
self.recipient,
['{:%y%m%d}'.format(self.timestamp), '{:%H%M}'.format(self.timestamp)],
self.control_reference,
)

def get_footer_segment(self) -> Segment:
return Segment(
"UNZ",
str(len(self.segments)),
self.control_reference,
)

@classmethod
def from_segments(
cls, segments: list or collections.Iterable
) -> "Interchange":
segments = iter(segments)

first_segment = next(segments)
if first_segment.tag == 'UNA':
self.has_una_segment = True
self.characters = Characters.from_str(segment.elements[0])
unb = next(segments)
elif first_segment.tag == 'UNB':
unb = first_segment
else:
raise SyntaxError('An interchange must start with UNB or UNA and UNB')

datetime_str = '-'.join(unb.elements[3])
timestamp = datetime.datetime.strptime(datetime_str, '%y%m%d-%H%M')
interchange = Interchange(
syntax_identifier=unb.elements[0],
sender=unb.elements[1],
recipient=unb.elements[2],
timestamp=timestamp,
control_reference=unb.elements[4],
)

return interchange.add_segments(
segment for segment in segments if segment.tag != 'UNZ'
)
42 changes: 41 additions & 1 deletion tests/test_segmentcollection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime

import pytest

from pydifact.segmentcollection import SegmentCollection
from pydifact.segmentcollection import Interchange, Message, SegmentCollection
from pydifact.segments import Segment
from pydifact.api import EDISyntaxError

Expand Down Expand Up @@ -95,3 +97,41 @@ def test_malformed_tag4():
def test_malformed_tag5():
with pytest.raises(EDISyntaxError):
SegmentCollection.from_str("IMD+F++:::This is '-:malformed string'")


def test_empty_interchange():
i = Interchange(
sender='1234',
recipient='3333',
timestamp=datetime.datetime(2020,1,2,22,12),
control_reference='42',
syntax_identifier=('UNOC', 1),
)

assert str(i) == (
"UNB+UNOC:1+1234+3333+200102:2212+42'"
"UNZ+0+42'"
)


def test_empty_interchange_from_str():
i = Interchange.from_str(
"UNB+UNOC:1+1234+3333+200102:2212+42'"
"UNZ+0+42'"
)
assert str(i) == (
"UNB+UNOC:1+1234+3333+200102:2212+42'"
"UNZ+0+42'"
)


def test_empty_message():
m = Message(
reference_number='42z42',
identifier=('PAORES', 93, 1, 'IA'),
)

assert str(m) == (
"UNH+42z42+PAORES:93:1:IA'"
"UNT+42z42+0'"
)

0 comments on commit ddc1dd7

Please sign in to comment.