Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed TCP/UART message handling #75

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ class FixpositionDriver {
int client_fd_ = -1; //!< TCP or Serial file descriptor
int connection_status_ = -1;
struct termios options_save_;
//uint8_t readBuf[8192];
//int buf_size = 0;
uint8_t readBuf[8192];
int buf_size = 0;
};
} // namespace fixposition
#endif //__FIXPOSITION_DRIVER_LIB_FIXPOSITION_DRIVER__
124 changes: 74 additions & 50 deletions fixposition_driver_lib/src/fixposition_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,66 +257,90 @@ bool FixpositionDriver::RunOnce() {
}

bool FixpositionDriver::ReadAndPublish() {
char readBuf[8192];

ssize_t rv;
if (params_.fp_output.type == INPUT_TYPE::TCP) {
rv = recv(client_fd_, (void*)&readBuf, sizeof(readBuf), MSG_DONTWAIT);
} else if (params_.fp_output.type == INPUT_TYPE::SERIAL) {
rv = read(client_fd_, (void*)&readBuf, sizeof(readBuf));
} else {
rv = 0;
}

if (rv == 0) {
std::cerr << "Connection closed.\n";
return false;
}

if (rv < 0 && errno == EAGAIN) {
/* no data for now, call back when the socket is readable */
return true;
}
if (rv < 0) {
std::cerr << "Connection error.\n";
return false;
}

ssize_t start_id = 0;
while (start_id < rv) {
int msg_size = 0;
// Nov B
msg_size = IsNovMessage((uint8_t*)&readBuf[start_id], rv - start_id);
int msg_size = 0;

// Extract messages until the buffer is empty
while (buf_size > 0) {
// Check whether the message is NOV_B
msg_size = IsNovMessage(readBuf, buf_size);

// a) Message is NOV_B --> Process message and remove it from the buffer
if (msg_size > 0) {
NovConvertAndPublish((uint8_t*)&readBuf[start_id]);
start_id += msg_size;
continue;
NovConvertAndPublish(readBuf);
buf_size -= msg_size;
if (buf_size > 0) {
memmove(readBuf, &readBuf[msg_size], buf_size);
}
// Look for new messages in the buffer
}
if (msg_size == 0) {
// do nothing

// b) Message is not NOV_B. Check whether it is NMEA/FP_A
else if (msg_size == 0) {
msg_size = IsNmeaMessage((char*)readBuf, buf_size);

// Message is NMEA/FP_A --> Process message and remove it from the buffer
if (msg_size > 0) {
NmeaConvertAndPublish({(const char*)readBuf, (const char*)readBuf + msg_size});
buf_size -= msg_size;
if (buf_size > 0) {
memmove(readBuf, &readBuf[msg_size], buf_size);
}
// Look for new messages in the buffer
}

// Message is neither NMEA/FP_A or NOV_B --> Remove one byte from the buffer and try again until it is empty
else if (msg_size == 0) {
if (buf_size > 0) {
buf_size -= 1;
memmove(readBuf, &readBuf[1], buf_size);
}

// Buffer is empty --> Wait for more data
else /* buf_size == 0 */ {
break;
}
}

// NMEA message might be incomplete --> Wait for more data
else /* msg_size < 0 */ {
break;
}
}
if (msg_size < 0) {

// c) NOV_B message might be incomplete --> Wait for more data
else /* msg_size < 0 */ {
break;
}
}

// Nmea (incl. FP_A)
msg_size = IsNmeaMessage(&readBuf[start_id], rv - start_id);
if (msg_size > 0) {
// NovConvertAndPublish(start, msg_size);
std::string msg(&readBuf[start_id], msg_size);
NmeaConvertAndPublish(msg);
start_id += msg_size;
continue;
// Read more data from the TCP/Serial port
int rem_size = sizeof(readBuf) - buf_size;
if (rem_size > 0) {
ssize_t rv;
if (params_.fp_output.type == INPUT_TYPE::TCP) {
rv = recv(client_fd_, (void*)&readBuf[buf_size], sizeof(readBuf) - buf_size, MSG_DONTWAIT);
} else if (params_.fp_output.type == INPUT_TYPE::SERIAL) {
rv = read(client_fd_, (void*)&readBuf[buf_size], sizeof(readBuf) - buf_size);
} else {
rv = 0;
}
if (msg_size == 0) {
// do nothing

if (rv == 0) {
std::cerr << "Connection closed.\n";
return false;
}
if (msg_size < 0) {
break;

if (rv < 0 && errno == EAGAIN) {
/* no data for now, call back when the socket is readable */
return true;
}

if (rv < 0) {
std::cerr << "Connection error.\n";
return false;
}

// No Match, increment by 1
++start_id;
buf_size += rv;
}

return true;
Expand Down
2 changes: 1 addition & 1 deletion fixposition_driver_ros1/launch/serial.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ fp_output:
]
type: serial
port: "/dev/ttyUSB0"
serial_baudrate: 115200
baudrate: 115200
rate: 200
reconnect_delay: 5.0 # wait time in [s] until retry connection
cov_warning: false
Expand Down