Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of socket related functions to a separate socket_common.c #505

Merged
merged 56 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
37bf612
Rename netcommon.h
Jakio815 Dec 12, 2024
7fb3c97
Rename
Jakio815 Dec 12, 2024
81d0e57
Merge commit '37bf61294c8385e806ff007abbd49c9b79306f74' into refactor…
Jakio815 Dec 12, 2024
5ba5a08
Rename
Jakio815 Dec 12, 2024
6418a39
Rename net_util.c
Jakio815 Dec 12, 2024
d209fe3
Temp copy
Jakio815 Dec 12, 2024
9299a86
Merge commit '6418a39e90f9351b21bf2331927c43965692ae82' into refactor…
Jakio815 Dec 12, 2024
1afab20
Rename to net_util.c
Jakio815 Dec 12, 2024
a66ff84
Test
Jakio815 Dec 16, 2024
f6a4943
Remove from net_common.h
Jakio815 Dec 16, 2024
5882917
Move read_write socket functions to socket_common.c
Jakio815 Dec 16, 2024
a4313a1
Add socket_common.h header to clock-sync.c
Jakio815 Dec 16, 2024
ed30514
Add socket_common.c to CMakeLists.txt
Jakio815 Dec 16, 2024
5bed39e
Move read_write socket functions to socket_common.c
Jakio815 Dec 16, 2024
ba6c35b
Move socket_type to socket_common.h
Jakio815 Dec 16, 2024
e4363f2
Move create_rti_server to socket_common.c
Jakio815 Dec 16, 2024
f1aaa85
Fix create_rti_server
Jakio815 Dec 16, 2024
c6f313f
Add accept_socket
Jakio815 Dec 16, 2024
671e299
Fix minor errors
Jakio815 Dec 16, 2024
b82c3f0
Minor fix
Jakio815 Dec 16, 2024
a3c81de
Add connect_to socket
Jakio815 Dec 17, 2024
eddc990
Finished common create_TCP_server for RTI and federates.
Jakio815 Dec 17, 2024
ec84cb5
Fix connect_to_socket
Jakio815 Dec 17, 2024
1fc5e82
Fix formatting
Jakio815 Dec 17, 2024
287ee17
Add comments.
Jakio815 Dec 17, 2024
6e5dc7b
Fix receive_and_check_fed_id_message not to get client_fd
Jakio815 Dec 17, 2024
8ead884
Remove sockaddr from accpet_socket as input
Jakio815 Dec 17, 2024
ef44cdd
Fix accept_socket. Now federate.c's lf_handle_p2p_connections_from_fe…
Jakio815 Dec 17, 2024
efbce31
Minor formatting.
Jakio815 Dec 17, 2024
bf0573b
Split accept to two functions.
Jakio815 Dec 17, 2024
4860623
Remove code duplication on create_TCP/UDP_sever
Jakio815 Dec 17, 2024
2e643ac
Merge branch 'main' into refactor-only-comm-type
Jakio815 Dec 18, 2024
d87ddd6
Rollback the socket_type_t to be in rti_remote.h.
Jakio815 Dec 18, 2024
206d0f4
Add empty line in end and rollback socket_type.
Jakio815 Dec 18, 2024
a28e8b7
Merge branch 'refactor-only-comm-type' of github.com:lf-lang/reactor-…
Jakio815 Dec 18, 2024
33cca90
Remove commented out code.
Jakio815 Dec 18, 2024
e9642b5
Add commnets on user_specified_port.
Jakio815 Dec 18, 2024
07119ef
Apply suggestions from code review
Jakio815 Dec 20, 2024
e14d07f
Move socket_common.h header to the top with the FEDERATED guards.
Jakio815 Dec 20, 2024
4cbf83e
Change create_server() function to return 0 for success, -1 for fail.
Jakio815 Dec 20, 2024
a381b7e
Update include/core/federated/network/socket_common.h
Jakio815 Dec 20, 2024
9d161a9
Add 'static' for private functions only used in the same C file.
Jakio815 Dec 20, 2024
5f2e48b
Add comments to create_UDP_server.
Jakio815 Dec 20, 2024
57f337c
Revert rti's initial port to 0, and add a bool increment_port_on_retry.
Jakio815 Dec 20, 2024
f564383
Minor fix.
Jakio815 Dec 20, 2024
6042fbc
Minor update on comments.
Jakio815 Dec 20, 2024
4f183a4
Fix typo, and add debug messages.
Jakio815 Dec 20, 2024
fbbb1fb
Fix formatting
Jakio815 Dec 20, 2024
5a26cf5
Make a single API function accept_socket()
Jakio815 Dec 20, 2024
7eb4d55
Add more logs.
Jakio815 Dec 20, 2024
86f8395
Merge branch 'main' of github.com:lf-lang/reactor-c into refactor-onl…
Jakio815 Dec 21, 2024
9b8b7d6
Apply suggestions from code review
Jakio815 Dec 23, 2024
b2a5af8
Make create_server to one integrated function of TCP and UDP.
Jakio815 Dec 26, 2024
92fd156
Merge branch 'refactor-only-comm-type' of github.com:lf-lang/reactor-…
Jakio815 Dec 26, 2024
817d2f9
Fix formatting.
Jakio815 Dec 26, 2024
106c827
Update include/core/federated/network/socket_common.h
Jakio815 Dec 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/federated/RTI/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ add_library(${RTI_LIB} STATIC
${CoreLib}/tag.c
${CoreLib}/clock.c
${CoreLib}/federated/network/net_util.c
${CoreLib}/federated/network/socket_common.c
${CoreLib}/utils/pqueue_base.c
${CoreLib}/utils/pqueue_tag.c
${CoreLib}/utils/pqueue.c
Expand Down
182 changes: 21 additions & 161 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,132 +52,6 @@ extern int lf_critical_section_enter(environment_t* env) { return lf_mutex_lock(

extern int lf_critical_section_exit(environment_t* env) { return lf_mutex_unlock(&rti_mutex); }

/**
* Create a server and enable listening for socket connections.
* If the specified port if it is non-zero, it will attempt to acquire that port.
* If it fails, it will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times with
* a delay of PORT_BIND_RETRY_INTERVAL in between. If the specified port is
* zero, then it will attempt to acquire DEFAULT_PORT first. If this fails, then it
* will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times, incrementing the port
* number between attempts, with no delay between attempts. Once it has incremented
* the port number MAX_NUM_PORT_ADDRESSES times, it will cycle around and begin again
* with DEFAULT_PORT.
*
* @param port The port number to use or 0 to start trying at DEFAULT_PORT.
* @param socket_type The type of the socket for the server (TCP or UDP).
* @return The socket descriptor on which to accept connections.
*/
static int create_rti_server(uint16_t port, socket_type_t socket_type) {
// Timeout time for the communications of the server
struct timeval timeout_time = {.tv_sec = TCP_TIMEOUT_TIME / BILLION, .tv_usec = (TCP_TIMEOUT_TIME % BILLION) / 1000};
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
int socket_descriptor = -1;
if (socket_type == TCP) {
socket_descriptor = create_real_time_tcp_socket_errexit();
} else if (socket_type == UDP) {
socket_descriptor = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
// Set the appropriate timeout time
timeout_time =
(struct timeval){.tv_sec = UDP_TIMEOUT_TIME / BILLION, .tv_usec = (UDP_TIMEOUT_TIME % BILLION) / 1000};
}
if (socket_descriptor < 0) {
lf_print_error_system_failure("Failed to create RTI socket.");
}

// Set the option for this socket to reuse the same address
int true_variable = 1; // setsockopt() requires a reference to the value assigned to an option
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &true_variable, sizeof(int32_t)) < 0) {
lf_print_error("RTI failed to set SO_REUSEADDR option on the socket: %s.", strerror(errno));
}
// Set the timeout on the socket so that read and write operations don't block for too long
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("RTI failed to set SO_RCVTIMEO option on the socket: %s.", strerror(errno));
}
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("RTI failed to set SO_SNDTIMEO option on the socket: %s.", strerror(errno));
}

/*
* The following used to permit reuse of a port that an RTI has previously
* used that has not been released. We no longer do this, and instead retry
* some number of times after waiting.

// SO_REUSEPORT (since Linux 3.9)
// Permits multiple AF_INET or AF_INET6 sockets to be bound to an
// identical socket address. This option must be set on each
// socket (including the first socket) prior to calling bind(2)
// on the socket. To prevent port hijacking, all of the
// processes binding to the same address must have the same
// effective UID. This option can be employed with both TCP and
// UDP sockets.

int reuse = 1;
#ifdef SO_REUSEPORT
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT,
(const char*)&reuse, sizeof(reuse)) < 0) {
perror("setsockopt(SO_REUSEPORT) failed");
}
#endif
*/

// Server file descriptor.
struct sockaddr_in server_fd;
// Zero out the server address structure.
bzero((char*)&server_fd, sizeof(server_fd));

uint16_t specified_port = port;
if (specified_port == 0)
port = DEFAULT_PORT;

server_fd.sin_family = AF_INET; // IPv4
server_fd.sin_addr.s_addr = INADDR_ANY; // All interfaces, 0.0.0.0.
// Convert the port number from host byte order to network byte order.
server_fd.sin_port = htons(port);

int result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd));

// Try repeatedly to bind to a port. If no specific port is specified, then
// increment the port number each time.

int count = 1;
while (result != 0 && count++ < PORT_BIND_RETRY_LIMIT) {
if (specified_port == 0) {
lf_print_warning("RTI failed to get port %d.", port);
port++;
if (port >= DEFAULT_PORT + MAX_NUM_PORT_ADDRESSES)
port = DEFAULT_PORT;
lf_print_warning("RTI will try again with port %d.", port);
server_fd.sin_port = htons(port);
// Do not sleep.
} else {
lf_print("RTI failed to get port %d. Will try again.", port);
lf_sleep(PORT_BIND_RETRY_INTERVAL);
}
result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd));
}
if (result != 0) {
lf_print_error_and_exit("Failed to bind the RTI socket. Port %d is not available. ", port);
}
char* type = "TCP";
if (socket_type == UDP) {
type = "UDP";
}
lf_print("RTI using %s port %d for federation %s.", type, port, rti_remote->federation_id);

if (socket_type == TCP) {
rti_remote->final_port_TCP = port;
// Enable listening for socket connections.
// The second argument is the maximum number of queued socket requests,
// which according to the Mac man page is limited to 128.
listen(socket_descriptor, 128);
} else if (socket_type == UDP) {
rti_remote->final_port_UDP = port;
// No need to listen on the UDP socket
}

return socket_descriptor;
}

void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 ||
lf_tag_compare(tag, e->last_provisionally_granted) < 0) {
Expand Down Expand Up @@ -1171,7 +1045,7 @@ void send_reject(int* socket_id, unsigned char error_code) {
* @param client_fd The socket address.
* @return The federate ID for success or -1 for failure.
*/
static int32_t receive_and_check_fed_id_message(int* socket_id, struct sockaddr_in* client_fd) {
static int32_t receive_and_check_fed_id_message(int* socket_id) {
// Buffer for message ID, federate ID, and federation ID length.
size_t length = 1 + sizeof(uint16_t) + 1; // Message ID, federate ID, length of fedration ID.
unsigned char buffer[length];
Expand Down Expand Up @@ -1261,13 +1135,14 @@ static int32_t receive_and_check_fed_id_message(int* socket_id, struct sockaddr_
}
federate_info_t* fed = GET_FED_INFO(fed_id);
// The MSG_TYPE_FED_IDS message has the right federation ID.
// Assign the address information for federate.
// The IP address is stored here as an in_addr struct (in .server_ip_addr) that can be useful
// to create sockets and can be efficiently sent over the network.
// First, convert the sockaddr structure into a sockaddr_in that contains an internet address.
struct sockaddr_in* pV4_addr = client_fd;
// Then extract the internet address (which is in IPv4 format) and assign it as the federate's socket server
fed->server_ip_addr = pV4_addr->sin_addr;

// Get the peer address from the connected socket_id. Then assign it as the federate's socket server.
struct sockaddr_in peer_addr;
socklen_t addr_len = sizeof(peer_addr);
if (getpeername(*socket_id, (struct sockaddr*)&peer_addr, &addr_len) != 0) {
lf_print_error("RTI failed to get peer address.");
}
fed->server_ip_addr = peer_addr.sin_addr;

#if LOG_LEVEL >= LOG_LEVEL_DEBUG
// Create the human readable format and copy that into
Expand Down Expand Up @@ -1538,25 +1413,7 @@ static bool authenticate_federate(int* socket) {

void lf_connect_to_federates(int socket_descriptor) {
for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
// Wait for an incoming connection request.
struct sockaddr client_fd;
uint32_t client_length = sizeof(client_fd);
// The following blocks until a federate connects.
int socket_id = -1;
while (1) {
socket_id = accept(rti_remote->socket_descriptor_TCP, &client_fd, &client_length);
if (socket_id >= 0) {
// Got a socket
break;
} else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK)) {
lf_print_error_system_failure("RTI failed to accept the socket.");
} else {
// Try again
lf_print_warning("RTI failed to accept the socket. %s. Trying again.", strerror(errno));
continue;
}
}

int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1);
// Wait for the first message from the federate when RTI -a option is on.
#ifdef __RTI_AUTH__
if (rti_remote->authentication_enabled) {
Expand All @@ -1574,7 +1431,7 @@ void lf_connect_to_federates(int socket_descriptor) {
#endif

// The first message from the federate should contain its ID and the federation ID.
int32_t fed_id = receive_and_check_fed_id_message(&socket_id, (struct sockaddr_in*)&client_fd);
int32_t fed_id = receive_and_check_fed_id_message(&socket_id);
if (fed_id >= 0 && socket_id >= 0 && receive_connection_information(&socket_id, (uint16_t)fed_id) &&
receive_udp_message_and_set_up_clock_sync(&socket_id, (uint16_t)fed_id)) {

Expand Down Expand Up @@ -1614,14 +1471,12 @@ void* respond_to_erroneous_connections(void* nothing) {
initialize_lf_thread_id();
while (true) {
// Wait for an incoming connection request.
struct sockaddr client_fd;
uint32_t client_length = sizeof(client_fd);
// The following will block until either a federate attempts to connect
// or close(rti->socket_descriptor_TCP) is called.
int socket_id = accept(rti_remote->socket_descriptor_TCP, &client_fd, &client_length);
if (socket_id < 0)
int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1);
if (socket_id < 0) {
return NULL;

}
if (rti_remote->all_federates_exited) {
return NULL;
}
Expand Down Expand Up @@ -1655,12 +1510,17 @@ void initialize_federate(federate_info_t* fed, uint16_t id) {
int32_t start_rti_server(uint16_t port) {
_lf_initialize_clock();
// Create the TCP socket server
rti_remote->socket_descriptor_TCP = create_rti_server(port, TCP);
if (create_TCP_server(port, &rti_remote->socket_descriptor_TCP, &rti_remote->final_port_TCP, true)) {
lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno));
};
lf_print("RTI: Listening for federates.");
// Create the UDP socket server
// Try to get the rti_remote->final_port_TCP + 1 port
if (rti_remote->clock_sync_global_status >= clock_sync_on) {
rti_remote->socket_descriptor_UDP = create_rti_server(rti_remote->final_port_TCP + 1, UDP);
if (create_UDP_server(rti_remote->final_port_TCP + 1, &rti_remote->socket_descriptor_UDP,
&rti_remote->final_port_UDP, true)) {
lf_print_error_system_failure("RTI failed to create UDP server: %s.", strerror(errno));
}
}
return rti_remote->socket_descriptor_TCP;
}
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,4 +413,4 @@ int process_args(int argc, const char* argv[]);
void initialize_RTI(rti_remote_t* rti);

#endif // RTI_REMOTE_H
#endif // STANDALONE_RTI
#endif // STANDALONE_RTI
1 change: 1 addition & 0 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "clock-sync.h"
#include "net_common.h"
#include "net_util.h"
#include "socket_common.h"
#include "util.h"

/** Offset calculated by the clock synchronization algorithm. */
Expand Down
Loading
Loading