From 882127cdc638fc5f0bb7181ee5edbf5affbc3b5f Mon Sep 17 00:00:00 2001 From: Christian Ege Date: Sat, 23 Dec 2023 15:22:05 +0100 Subject: [PATCH] refactor: use a result handler callback Signed-off-by: Christian Ege --- pkg/pcic/protocol.go | 86 ++++++++++++++++++++++++++++++++++----- pkg/pcic/protocol_test.go | 84 ++++++++++++++++++++++++++++++++------ 2 files changed, 147 insertions(+), 23 deletions(-) diff --git a/pkg/pcic/protocol.go b/pkg/pcic/protocol.go index f2afde0..795fe7c 100644 --- a/pkg/pcic/protocol.go +++ b/pkg/pcic/protocol.go @@ -33,47 +33,109 @@ const ( endMarker string = "stop" ) -func (p *PCIC) Receive(reader io.Reader) (Frame, error) { - frame := Frame{} +var ( + resultTicket []byte = []byte{'0', '0', '0', '0'} + errorTicket []byte = []byte{'0', '0', '0', '1'} + notificationTicket []byte = []byte{'0', '0', '1', '0'} +) + +type Async interface { + Result(Frame) + Error(ErrorMessage) + Notification(NotificationMessage) +} + +type NotificationMessage struct { + ID int + Message string +} + +type ErrorMessage struct { + ID int + Message string +} + +func (p *PCIC) Receive(reader io.Reader, handler Async) error { header := make([]byte, headerSize) n, err := io.ReadFull(reader, header) if err != nil { - return frame, err + return err } if n < headerSize { - return frame, fmt.Errorf("not enough data received: %d", n) + return fmt.Errorf("not enough data received: %d", n) } firstTicket := header[:ticketFieldLength] secondTicket := header[secondTicketOffset:dataOffset] if !bytes.Equal(firstTicket, secondTicket) { - return frame, fmt.Errorf("mismatch in the tickets %s != %s ", + return fmt.Errorf("mismatch in the tickets %s != %s ", string(firstTicket), string(secondTicket), ) } lengthBuffer := string(header[lengthOffset:secondTicketOffset]) if lengthBuffer[0] != 'L' { - return frame, fmt.Errorf("the length field does not start with 'L': %v", lengthBuffer) + return fmt.Errorf("the length field does not start with 'L': %v", lengthBuffer) } length := 0 n, err = fmt.Sscanf(lengthBuffer, "L%09d\r\n", &length) if err != nil { - return frame, err + return err } if n != 1 { - return frame, errors.New("no length in the length field detected") + return errors.New("no length in the length field detected") } if length < minimumContentLength { - return frame, errors.New("the length information is too short") + return errors.New("the length information is too short") } data := make([]byte, length-ticketFieldLength) if _, err = io.ReadFull(reader, data); err != nil { - return frame, err + return err } trailer := data[len(data)-delimiterFieldLength:] if !bytes.Equal(trailer, []byte{'\r', '\n'}) { - return frame, errors.New("invalid trailer detected") + return errors.New("invalid trailer detected") + } + if bytes.Equal(resultTicket, firstTicket) { + frame, err := asyncResultParser(data) + handler.Result(frame) + return err + } else if bytes.Equal(errorTicket, firstTicket) { + errorStatus, err := errorParser(data) + handler.Error(errorStatus) + return err + } else if bytes.Equal(notificationTicket, firstTicket) { + notification, err := notificationParser(data) + handler.Notification(notification) + return err } + return fmt.Errorf("unknown ticket received: %s", string(firstTicket)) +} + +func errorParser(data []byte) (ErrorMessage, error) { + var err error + errorStatus := ErrorMessage{} + n, err := fmt.Sscanf( + string(data), + "%09d:%s", + &errorStatus.ID, + &errorStatus.Message, + ) + if n != 2 { + return ErrorMessage{}, errors.New("unable to parse the error message") + } + return errorStatus, err +} + +func notificationParser(data []byte) (NotificationMessage, error) { + var err error + notification := NotificationMessage{} + return notification, err +} + +func asyncResultParser(data []byte) (Frame, error) { + fmt.Printf("Async Data received\n") + frame := Frame{} + var err error contentDecorated := data[:len(data)-delimiterFieldLength] if len(startMarker)+len(endMarker) > len(contentDecorated) { return frame, fmt.Errorf("missing start (%s) and end markers (%s) buffer length: %d", @@ -97,6 +159,8 @@ func (p *PCIC) Receive(reader io.Reader) (Frame, error) { frame.Chunks = append(frame.Chunks, c) offset += c.Size() remainingBytes -= c.Size() + } return frame, err + } diff --git a/pkg/pcic/protocol_test.go b/pkg/pcic/protocol_test.go index 32ec0d8..3429bec 100644 --- a/pkg/pcic/protocol_test.go +++ b/pkg/pcic/protocol_test.go @@ -20,15 +20,35 @@ const miniMalContentLength int = 14 //go:embed testdata/*.bz2 var tfs embed.FS +type PCICAsyncReceiver struct { + frame pcic.Frame + notificationMsg pcic.NotificationMessage + errorMsg pcic.ErrorMessage +} + +func (r *PCICAsyncReceiver) Result(frame pcic.Frame) { + r.frame = frame +} + +func (r *PCICAsyncReceiver) Error(msg pcic.ErrorMessage) { + r.errorMsg = msg +} + +func (r *PCICAsyncReceiver) Notification(msg pcic.NotificationMessage) { + r.notificationMsg = msg +} + +var testHandler *PCICAsyncReceiver = &PCICAsyncReceiver{} + func TestMinimalReceive(t *testing.T) { r := strings.NewReader("Hello, Reader!") p := pcic.PCIC{} - _, err := p.Receive(r) + err := p.Receive(r, testHandler) assert.Error(t, err, "We expect an error while receiving malformed data") // Test the minimal possible PCIC message - r = strings.NewReader("0001L000000014\r\n0001starstop\r\n") - _, err = p.Receive(r) + r = strings.NewReader("0000L000000014\r\n0000starstop\r\n") + err = p.Receive(r, testHandler) assert.NoError(t, err, "We expect no error while receiving data") } @@ -40,7 +60,7 @@ func TestReceiveWithChunk(t *testing.T) { 0x30, 0x00, 0x00, 0x00, /* HEADER_SIZE */ 0x02, 0x00, 0x00, 0x00, /* HEADER_VERSION */ 0x04, 0x00, 0x00, 0x00, /* IMAGE_WIDTH */ - 0x01, 0x00, 0x00, 0x00, /* IMAGE_HEIGTH */ + 0x01, 0x00, 0x00, 0x00, /* IMAGE_HEIGHT */ 0x00, 0x00, 0x00, 0x00, /* DATA_FORMAT */ 0x00, 0x00, 0x00, 0x00, /* TIME_STAMP */ 0x00, 0x00, 0x00, 0x00, /* FRAME_COUNT */ @@ -55,31 +75,48 @@ func TestReceiveWithChunk(t *testing.T) { ) p := pcic.PCIC{} buffer := fmt.Sprintf( - "0001L%09d\r\n0001star%sstop\r\n", + "0000L%09d\r\n0000star%sstop\r\n", miniMalContentLength+len(chunkData), string(chunkData), ) // Test the PCIC message with single chunk r := strings.NewReader(buffer) - f, err := p.Receive(r) + err := p.Receive(r, testHandler) assert.NoError(t, err, "We expect no error while receiving data") - assert.Equal(t, chunk.RADIAL_DISTANCE_NOISE, f.Chunks[0].Type()) + assert.Equal(t, + chunk.RADIAL_DISTANCE_NOISE, + testHandler.frame.Chunks[0].Type(), + ) // test with trailing XX after the chunk buffer = fmt.Sprintf( - "0001L%09d\r\n0001star%sXXstop\r\n", + "0000L%09d\r\n0000star%sXXstop\r\n", miniMalContentLength+len(chunkData)+2, string(chunkData), ) // Test the PCIC message with single chunk r = strings.NewReader(buffer) - _, err = p.Receive(r) + err = p.Receive(r, testHandler) assert.Error(t, err, "We expect an error while receiving malformed data") + // test with invalid ticket after the chunk + buffer = fmt.Sprintf( + "0002L%09d\r\n0002star%sstop\r\n", + miniMalContentLength+len(chunkData), + string(chunkData), + ) + r = strings.NewReader(buffer) + err = p.Receive(r, testHandler) + assert.Error( + t, + err, + "We expect an error while receiving data with an invalid ticket", + ) + } -func TestWithRealData(t *testing.T) { +func TestWithRealChunkData(t *testing.T) { file, err := tfs.Open("testdata/pcic-test-data.blob.bz2") assert.NoError(t, err, "No error expected while reading the input") defer file.Close() @@ -87,16 +124,39 @@ func TestWithRealData(t *testing.T) { cr := bzip2.NewReader(buf) p := pcic.PCIC{} for { - f, err := p.Receive(cr) + err := p.Receive(cr, testHandler) if errors.Is(err, io.EOF) { break } assert.NoError(t, err, "No error expected while reading the compressed input") fmt.Print("Chunks: [ ") - for _, c := range f.Chunks { + for _, c := range testHandler.frame.Chunks { fmt.Printf("%d, ", c.Type()) } fmt.Println("]") } } + +func TestWithRealErrorData(t *testing.T) { + file, err := tfs.Open("testdata/pcic-diagnostic.blob.bz2") + assert.NoError(t, err, "No error expected while reading the input") + defer file.Close() + buf := bufio.NewReader(file) + cr := bzip2.NewReader(buf) + p := pcic.PCIC{} + for { + err := p.Receive(cr, testHandler) + if errors.Is(err, io.EOF) { + break + } + assert.NoError(t, err, "No error expected while reading the compressed input") + assert.NotEqual( + t, + 0, /* 0 means no Error, what does not make sense */ + testHandler.errorMsg.ID, + "An invalid error ID received", + ) + } + +}