Skip to content

Commit

Permalink
refactor: use a result handler callback
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Ege <[email protected]>
  • Loading branch information
graugans committed Dec 23, 2023
1 parent f3d319a commit 882127c
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 23 deletions.
86 changes: 75 additions & 11 deletions pkg/pcic/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,47 +33,109 @@ const (
endMarker string = "stop"
)

func (p *PCIC) Receive(reader io.Reader) (Frame, error) {
frame := Frame{}
var (
resultTicket []byte = []byte{'0', '0', '0', '0'}
errorTicket []byte = []byte{'0', '0', '0', '1'}
notificationTicket []byte = []byte{'0', '0', '1', '0'}
)

type Async interface {
Result(Frame)
Error(ErrorMessage)
Notification(NotificationMessage)
}

type NotificationMessage struct {
ID int
Message string
}

type ErrorMessage struct {
ID int
Message string
}

func (p *PCIC) Receive(reader io.Reader, handler Async) error {
header := make([]byte, headerSize)
n, err := io.ReadFull(reader, header)
if err != nil {
return frame, err
return err
}
if n < headerSize {
return frame, fmt.Errorf("not enough data received: %d", n)
return fmt.Errorf("not enough data received: %d", n)
}
firstTicket := header[:ticketFieldLength]
secondTicket := header[secondTicketOffset:dataOffset]
if !bytes.Equal(firstTicket, secondTicket) {
return frame, fmt.Errorf("mismatch in the tickets %s != %s ",
return fmt.Errorf("mismatch in the tickets %s != %s ",
string(firstTicket),
string(secondTicket),
)
}
lengthBuffer := string(header[lengthOffset:secondTicketOffset])
if lengthBuffer[0] != 'L' {
return frame, fmt.Errorf("the length field does not start with 'L': %v", lengthBuffer)
return fmt.Errorf("the length field does not start with 'L': %v", lengthBuffer)
}
length := 0
n, err = fmt.Sscanf(lengthBuffer, "L%09d\r\n", &length)
if err != nil {
return frame, err
return err
}
if n != 1 {
return frame, errors.New("no length in the length field detected")
return errors.New("no length in the length field detected")
}
if length < minimumContentLength {
return frame, errors.New("the length information is too short")
return errors.New("the length information is too short")
}
data := make([]byte, length-ticketFieldLength)
if _, err = io.ReadFull(reader, data); err != nil {
return frame, err
return err
}
trailer := data[len(data)-delimiterFieldLength:]
if !bytes.Equal(trailer, []byte{'\r', '\n'}) {
return frame, errors.New("invalid trailer detected")
return errors.New("invalid trailer detected")
}
if bytes.Equal(resultTicket, firstTicket) {
frame, err := asyncResultParser(data)
handler.Result(frame)
return err
} else if bytes.Equal(errorTicket, firstTicket) {
errorStatus, err := errorParser(data)
handler.Error(errorStatus)
return err
} else if bytes.Equal(notificationTicket, firstTicket) {
notification, err := notificationParser(data)
handler.Notification(notification)
return err
}
return fmt.Errorf("unknown ticket received: %s", string(firstTicket))
}

func errorParser(data []byte) (ErrorMessage, error) {
var err error
errorStatus := ErrorMessage{}
n, err := fmt.Sscanf(
string(data),
"%09d:%s",
&errorStatus.ID,
&errorStatus.Message,
)
if n != 2 {
return ErrorMessage{}, errors.New("unable to parse the error message")
}
return errorStatus, err
}

func notificationParser(data []byte) (NotificationMessage, error) {
var err error
notification := NotificationMessage{}
return notification, err
}

func asyncResultParser(data []byte) (Frame, error) {
fmt.Printf("Async Data received\n")
frame := Frame{}
var err error
contentDecorated := data[:len(data)-delimiterFieldLength]
if len(startMarker)+len(endMarker) > len(contentDecorated) {
return frame, fmt.Errorf("missing start (%s) and end markers (%s) buffer length: %d",
Expand All @@ -97,6 +159,8 @@ func (p *PCIC) Receive(reader io.Reader) (Frame, error) {
frame.Chunks = append(frame.Chunks, c)
offset += c.Size()
remainingBytes -= c.Size()

}
return frame, err

}
84 changes: 72 additions & 12 deletions pkg/pcic/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,35 @@ const miniMalContentLength int = 14
//go:embed testdata/*.bz2
var tfs embed.FS

type PCICAsyncReceiver struct {
frame pcic.Frame
notificationMsg pcic.NotificationMessage
errorMsg pcic.ErrorMessage
}

func (r *PCICAsyncReceiver) Result(frame pcic.Frame) {
r.frame = frame
}

func (r *PCICAsyncReceiver) Error(msg pcic.ErrorMessage) {
r.errorMsg = msg
}

func (r *PCICAsyncReceiver) Notification(msg pcic.NotificationMessage) {
r.notificationMsg = msg
}

var testHandler *PCICAsyncReceiver = &PCICAsyncReceiver{}

func TestMinimalReceive(t *testing.T) {
r := strings.NewReader("Hello, Reader!")
p := pcic.PCIC{}
_, err := p.Receive(r)
err := p.Receive(r, testHandler)
assert.Error(t, err, "We expect an error while receiving malformed data")

// Test the minimal possible PCIC message
r = strings.NewReader("0001L000000014\r\n0001starstop\r\n")
_, err = p.Receive(r)
r = strings.NewReader("0000L000000014\r\n0000starstop\r\n")
err = p.Receive(r, testHandler)
assert.NoError(t, err, "We expect no error while receiving data")
}

Expand All @@ -40,7 +60,7 @@ func TestReceiveWithChunk(t *testing.T) {
0x30, 0x00, 0x00, 0x00, /* HEADER_SIZE */
0x02, 0x00, 0x00, 0x00, /* HEADER_VERSION */
0x04, 0x00, 0x00, 0x00, /* IMAGE_WIDTH */
0x01, 0x00, 0x00, 0x00, /* IMAGE_HEIGTH */
0x01, 0x00, 0x00, 0x00, /* IMAGE_HEIGHT */
0x00, 0x00, 0x00, 0x00, /* DATA_FORMAT */
0x00, 0x00, 0x00, 0x00, /* TIME_STAMP */
0x00, 0x00, 0x00, 0x00, /* FRAME_COUNT */
Expand All @@ -55,48 +75,88 @@ func TestReceiveWithChunk(t *testing.T) {
)
p := pcic.PCIC{}
buffer := fmt.Sprintf(
"0001L%09d\r\n0001star%sstop\r\n",
"0000L%09d\r\n0000star%sstop\r\n",
miniMalContentLength+len(chunkData),
string(chunkData),
)
// Test the PCIC message with single chunk
r := strings.NewReader(buffer)
f, err := p.Receive(r)
err := p.Receive(r, testHandler)
assert.NoError(t, err, "We expect no error while receiving data")

assert.Equal(t, chunk.RADIAL_DISTANCE_NOISE, f.Chunks[0].Type())
assert.Equal(t,
chunk.RADIAL_DISTANCE_NOISE,
testHandler.frame.Chunks[0].Type(),
)

// test with trailing XX after the chunk
buffer = fmt.Sprintf(
"0001L%09d\r\n0001star%sXXstop\r\n",
"0000L%09d\r\n0000star%sXXstop\r\n",
miniMalContentLength+len(chunkData)+2,
string(chunkData),
)
// Test the PCIC message with single chunk
r = strings.NewReader(buffer)
_, err = p.Receive(r)
err = p.Receive(r, testHandler)
assert.Error(t, err, "We expect an error while receiving malformed data")

// test with invalid ticket after the chunk
buffer = fmt.Sprintf(
"0002L%09d\r\n0002star%sstop\r\n",
miniMalContentLength+len(chunkData),
string(chunkData),
)
r = strings.NewReader(buffer)
err = p.Receive(r, testHandler)
assert.Error(
t,
err,
"We expect an error while receiving data with an invalid ticket",
)

}

func TestWithRealData(t *testing.T) {
func TestWithRealChunkData(t *testing.T) {
file, err := tfs.Open("testdata/pcic-test-data.blob.bz2")
assert.NoError(t, err, "No error expected while reading the input")
defer file.Close()
buf := bufio.NewReader(file)
cr := bzip2.NewReader(buf)
p := pcic.PCIC{}
for {
f, err := p.Receive(cr)
err := p.Receive(cr, testHandler)
if errors.Is(err, io.EOF) {
break
}
assert.NoError(t, err, "No error expected while reading the compressed input")
fmt.Print("Chunks: [ ")
for _, c := range f.Chunks {
for _, c := range testHandler.frame.Chunks {
fmt.Printf("%d, ", c.Type())
}
fmt.Println("]")
}

}

func TestWithRealErrorData(t *testing.T) {
file, err := tfs.Open("testdata/pcic-diagnostic.blob.bz2")
assert.NoError(t, err, "No error expected while reading the input")
defer file.Close()
buf := bufio.NewReader(file)
cr := bzip2.NewReader(buf)
p := pcic.PCIC{}
for {
err := p.Receive(cr, testHandler)
if errors.Is(err, io.EOF) {
break
}
assert.NoError(t, err, "No error expected while reading the compressed input")
assert.NotEqual(
t,
0, /* 0 means no Error, what does not make sense */
testHandler.errorMsg.ID,
"An invalid error ID received",
)
}

}

0 comments on commit 882127c

Please sign in to comment.