diff --git a/core/federated/RTI/CMakeLists.txt b/core/federated/RTI/CMakeLists.txt index 5bfbf0196..d9a93c246 100644 --- a/core/federated/RTI/CMakeLists.txt +++ b/core/federated/RTI/CMakeLists.txt @@ -17,6 +17,7 @@ add_library(${RTI_LIB} STATIC ${CoreLib}/tag.c ${CoreLib}/clock.c ${CoreLib}/federated/network/net_util.c + ${CoreLib}/federated/network/socket_common.c ${CoreLib}/utils/pqueue_base.c ${CoreLib}/utils/pqueue_tag.c ${CoreLib}/utils/pqueue.c diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 4cfe83630..6f705d2b9 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -52,132 +52,6 @@ extern int lf_critical_section_enter(environment_t* env) { return lf_mutex_lock( extern int lf_critical_section_exit(environment_t* env) { return lf_mutex_unlock(&rti_mutex); } -/** - * Create a server and enable listening for socket connections. - * If the specified port if it is non-zero, it will attempt to acquire that port. - * If it fails, it will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times with - * a delay of PORT_BIND_RETRY_INTERVAL in between. If the specified port is - * zero, then it will attempt to acquire DEFAULT_PORT first. If this fails, then it - * will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times, incrementing the port - * number between attempts, with no delay between attempts. Once it has incremented - * the port number MAX_NUM_PORT_ADDRESSES times, it will cycle around and begin again - * with DEFAULT_PORT. - * - * @param port The port number to use or 0 to start trying at DEFAULT_PORT. - * @param socket_type The type of the socket for the server (TCP or UDP). - * @return The socket descriptor on which to accept connections. - */ -static int create_rti_server(uint16_t port, socket_type_t socket_type) { - // Timeout time for the communications of the server - struct timeval timeout_time = {.tv_sec = TCP_TIMEOUT_TIME / BILLION, .tv_usec = (TCP_TIMEOUT_TIME % BILLION) / 1000}; - // Create an IPv4 socket for TCP (not UDP) communication over IP (0). - int socket_descriptor = -1; - if (socket_type == TCP) { - socket_descriptor = create_real_time_tcp_socket_errexit(); - } else if (socket_type == UDP) { - socket_descriptor = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - // Set the appropriate timeout time - timeout_time = - (struct timeval){.tv_sec = UDP_TIMEOUT_TIME / BILLION, .tv_usec = (UDP_TIMEOUT_TIME % BILLION) / 1000}; - } - if (socket_descriptor < 0) { - lf_print_error_system_failure("Failed to create RTI socket."); - } - - // Set the option for this socket to reuse the same address - int true_variable = 1; // setsockopt() requires a reference to the value assigned to an option - if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &true_variable, sizeof(int32_t)) < 0) { - lf_print_error("RTI failed to set SO_REUSEADDR option on the socket: %s.", strerror(errno)); - } - // Set the timeout on the socket so that read and write operations don't block for too long - if (setsockopt(socket_descriptor, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) { - lf_print_error("RTI failed to set SO_RCVTIMEO option on the socket: %s.", strerror(errno)); - } - if (setsockopt(socket_descriptor, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) { - lf_print_error("RTI failed to set SO_SNDTIMEO option on the socket: %s.", strerror(errno)); - } - - /* - * The following used to permit reuse of a port that an RTI has previously - * used that has not been released. We no longer do this, and instead retry - * some number of times after waiting. - - // SO_REUSEPORT (since Linux 3.9) - // Permits multiple AF_INET or AF_INET6 sockets to be bound to an - // identical socket address. This option must be set on each - // socket (including the first socket) prior to calling bind(2) - // on the socket. To prevent port hijacking, all of the - // processes binding to the same address must have the same - // effective UID. This option can be employed with both TCP and - // UDP sockets. - - int reuse = 1; - #ifdef SO_REUSEPORT - if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT, - (const char*)&reuse, sizeof(reuse)) < 0) { - perror("setsockopt(SO_REUSEPORT) failed"); - } - #endif - */ - - // Server file descriptor. - struct sockaddr_in server_fd; - // Zero out the server address structure. - bzero((char*)&server_fd, sizeof(server_fd)); - - uint16_t specified_port = port; - if (specified_port == 0) - port = DEFAULT_PORT; - - server_fd.sin_family = AF_INET; // IPv4 - server_fd.sin_addr.s_addr = INADDR_ANY; // All interfaces, 0.0.0.0. - // Convert the port number from host byte order to network byte order. - server_fd.sin_port = htons(port); - - int result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd)); - - // Try repeatedly to bind to a port. If no specific port is specified, then - // increment the port number each time. - - int count = 1; - while (result != 0 && count++ < PORT_BIND_RETRY_LIMIT) { - if (specified_port == 0) { - lf_print_warning("RTI failed to get port %d.", port); - port++; - if (port >= DEFAULT_PORT + MAX_NUM_PORT_ADDRESSES) - port = DEFAULT_PORT; - lf_print_warning("RTI will try again with port %d.", port); - server_fd.sin_port = htons(port); - // Do not sleep. - } else { - lf_print("RTI failed to get port %d. Will try again.", port); - lf_sleep(PORT_BIND_RETRY_INTERVAL); - } - result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd)); - } - if (result != 0) { - lf_print_error_and_exit("Failed to bind the RTI socket. Port %d is not available. ", port); - } - char* type = "TCP"; - if (socket_type == UDP) { - type = "UDP"; - } - lf_print("RTI using %s port %d for federation %s.", type, port, rti_remote->federation_id); - - if (socket_type == TCP) { - rti_remote->final_port_TCP = port; - // Enable listening for socket connections. - // The second argument is the maximum number of queued socket requests, - // which according to the Mac man page is limited to 128. - listen(socket_descriptor, 128); - } else if (socket_type == UDP) { - rti_remote->final_port_UDP = port; - // No need to listen on the UDP socket - } - - return socket_descriptor; -} - void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 || lf_tag_compare(tag, e->last_provisionally_granted) < 0) { @@ -1171,7 +1045,7 @@ void send_reject(int* socket_id, unsigned char error_code) { * @param client_fd The socket address. * @return The federate ID for success or -1 for failure. */ -static int32_t receive_and_check_fed_id_message(int* socket_id, struct sockaddr_in* client_fd) { +static int32_t receive_and_check_fed_id_message(int* socket_id) { // Buffer for message ID, federate ID, and federation ID length. size_t length = 1 + sizeof(uint16_t) + 1; // Message ID, federate ID, length of fedration ID. unsigned char buffer[length]; @@ -1261,13 +1135,14 @@ static int32_t receive_and_check_fed_id_message(int* socket_id, struct sockaddr_ } federate_info_t* fed = GET_FED_INFO(fed_id); // The MSG_TYPE_FED_IDS message has the right federation ID. - // Assign the address information for federate. - // The IP address is stored here as an in_addr struct (in .server_ip_addr) that can be useful - // to create sockets and can be efficiently sent over the network. - // First, convert the sockaddr structure into a sockaddr_in that contains an internet address. - struct sockaddr_in* pV4_addr = client_fd; - // Then extract the internet address (which is in IPv4 format) and assign it as the federate's socket server - fed->server_ip_addr = pV4_addr->sin_addr; + + // Get the peer address from the connected socket_id. Then assign it as the federate's socket server. + struct sockaddr_in peer_addr; + socklen_t addr_len = sizeof(peer_addr); + if (getpeername(*socket_id, (struct sockaddr*)&peer_addr, &addr_len) != 0) { + lf_print_error("RTI failed to get peer address."); + } + fed->server_ip_addr = peer_addr.sin_addr; #if LOG_LEVEL >= LOG_LEVEL_DEBUG // Create the human readable format and copy that into @@ -1536,25 +1411,7 @@ static bool authenticate_federate(int* socket) { void lf_connect_to_federates(int socket_descriptor) { for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { - // Wait for an incoming connection request. - struct sockaddr client_fd; - uint32_t client_length = sizeof(client_fd); - // The following blocks until a federate connects. - int socket_id = -1; - while (1) { - socket_id = accept(rti_remote->socket_descriptor_TCP, &client_fd, &client_length); - if (socket_id >= 0) { - // Got a socket - break; - } else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK)) { - lf_print_error_system_failure("RTI failed to accept the socket."); - } else { - // Try again - lf_print_warning("RTI failed to accept the socket. %s. Trying again.", strerror(errno)); - continue; - } - } - + int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1); // Wait for the first message from the federate when RTI -a option is on. #ifdef __RTI_AUTH__ if (rti_remote->authentication_enabled) { @@ -1572,7 +1429,7 @@ void lf_connect_to_federates(int socket_descriptor) { #endif // The first message from the federate should contain its ID and the federation ID. - int32_t fed_id = receive_and_check_fed_id_message(&socket_id, (struct sockaddr_in*)&client_fd); + int32_t fed_id = receive_and_check_fed_id_message(&socket_id); if (fed_id >= 0 && socket_id >= 0 && receive_connection_information(&socket_id, (uint16_t)fed_id) && receive_udp_message_and_set_up_clock_sync(&socket_id, (uint16_t)fed_id)) { @@ -1612,14 +1469,12 @@ void* respond_to_erroneous_connections(void* nothing) { initialize_lf_thread_id(); while (true) { // Wait for an incoming connection request. - struct sockaddr client_fd; - uint32_t client_length = sizeof(client_fd); // The following will block until either a federate attempts to connect // or close(rti->socket_descriptor_TCP) is called. - int socket_id = accept(rti_remote->socket_descriptor_TCP, &client_fd, &client_length); - if (socket_id < 0) + int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1); + if (socket_id < 0) { return NULL; - + } if (rti_remote->all_federates_exited) { return NULL; } @@ -1653,12 +1508,17 @@ void initialize_federate(federate_info_t* fed, uint16_t id) { int32_t start_rti_server(uint16_t port) { _lf_initialize_clock(); // Create the TCP socket server - rti_remote->socket_descriptor_TCP = create_rti_server(port, TCP); + if (create_server(port, &rti_remote->socket_descriptor_TCP, &rti_remote->final_port_TCP, TCP, true)) { + lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno)); + }; lf_print("RTI: Listening for federates."); // Create the UDP socket server // Try to get the rti_remote->final_port_TCP + 1 port if (rti_remote->clock_sync_global_status >= clock_sync_on) { - rti_remote->socket_descriptor_UDP = create_rti_server(rti_remote->final_port_TCP + 1, UDP); + if (create_server(rti_remote->final_port_TCP + 1, &rti_remote->socket_descriptor_UDP, &rti_remote->final_port_UDP, + UDP, true)) { + lf_print_error_system_failure("RTI failed to create UDP server: %s.", strerror(errno)); + } } return rti_remote->socket_descriptor_TCP; } diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index 8d3012b95..de6b144aa 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -31,6 +31,7 @@ #include "lf_types.h" #include "pqueue_tag.h" +#include "socket_common.h" /** Time allowed for federates to reply to stop request. */ #define MAX_TIME_FOR_REPLY_TO_STOP_REQUEST SEC(30) @@ -38,8 +39,6 @@ ///////////////////////////////////////////// //// Data structures -typedef enum socket_type_t { TCP, UDP } socket_type_t; - /** * Information about a federate known to the RTI, including its runtime state, * mode of execution, and connectivity with other federates. @@ -413,4 +412,4 @@ int process_args(int argc, const char* argv[]); void initialize_RTI(rti_remote_t* rti); #endif // RTI_REMOTE_H -#endif // STANDALONE_RTI \ No newline at end of file +#endif // STANDALONE_RTI diff --git a/core/federated/clock-sync.c b/core/federated/clock-sync.c index ef5403c89..60a7bd16f 100644 --- a/core/federated/clock-sync.c +++ b/core/federated/clock-sync.c @@ -42,6 +42,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "clock-sync.h" #include "net_common.h" #include "net_util.h" +#include "socket_common.h" #include "util.h" /** Offset calculated by the clock synchronization algorithm. */ diff --git a/core/federated/federate.c b/core/federated/federate.c index c98b5d0bc..fc2f86911 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -936,42 +936,6 @@ static int perform_hmac_authentication() { } #endif -static void close_rti_socket() { - shutdown(_fed.socket_TCP_RTI, SHUT_RDWR); - close(_fed.socket_TCP_RTI); - _fed.socket_TCP_RTI = -1; -} - -/** - * Return in the result a struct with the address info for the specified hostname and port. - * The memory for the result is dynamically allocated and must be freed using freeaddrinfo. - * @param hostname The host name. - * @param port The port number. - * @param result The struct into which to write. - */ -static void rti_address(const char* hostname, uint16_t port, struct addrinfo** result) { - struct addrinfo hints; - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; /* Allow IPv4 */ - hints.ai_socktype = SOCK_STREAM; /* Stream socket */ - hints.ai_protocol = IPPROTO_TCP; /* TCP protocol */ - hints.ai_addr = NULL; - hints.ai_next = NULL; - hints.ai_flags = AI_NUMERICSERV; /* Allow only numeric port numbers */ - - // Convert port number to string. - char str[6]; - sprintf(str, "%u", port); - - // Get address structure matching hostname and hints criteria, and - // set port to the port number provided in str. There should only - // ever be one matching address structure, and we connect to that. - if (getaddrinfo(hostname, (const char*)&str, &hints, result)) { - lf_print_error_and_exit("No host for RTI matching given hostname: %s", hostname); - } -} - /** * Send the specified timestamp to the RTI and wait for a response. * The specified timestamp should be current physical time of the @@ -1793,95 +1757,70 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { assert(port > 0); uint16_t uport = (uint16_t)port; -#if LOG_LEVEL > 3 - // Print the received IP address in a human readable format - // Create the human readable format of the received address. - // This is avoided unless LOG_LEVEL is high enough to - // subdue the overhead caused by inet_ntop(). char hostname[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &host_ip_addr, hostname, INET_ADDRSTRLEN); - LF_PRINT_LOG("Received address %s port %d for federate %d from RTI.", hostname, uport, remote_federate_id); -#endif - + int socket_id = create_real_time_tcp_socket_errexit(); + if (connect_to_socket(socket_id, (const char*)hostname, uport) < 0) { + lf_print_error_and_exit("Failed to connect() to RTI."); + } // Iterate until we either successfully connect or we exceed the CONNECT_TIMEOUT start_connect = lf_time_physical(); - int socket_id = -1; while (result < 0 && !_lf_termination_executed) { - // Create an IPv4 socket for TCP (not UDP) communication over IP (0). - socket_id = create_real_time_tcp_socket_errexit(); - - // Server file descriptor. - struct sockaddr_in server_fd; - // Zero out the server_fd struct. - bzero((char*)&server_fd, sizeof(server_fd)); - - // Set up the server_fd fields. - server_fd.sin_family = AF_INET; // IPv4 - server_fd.sin_addr = host_ip_addr; // Received from the RTI - - // Convert the port number from host byte order to network byte order. - server_fd.sin_port = htons(uport); - result = connect(socket_id, (struct sockaddr*)&server_fd, sizeof(server_fd)); - - if (result != 0) { - lf_print_error("Failed to connect to federate %d on port %d.", remote_federate_id, uport); + // Try again after some time if the connection failed. + // Note that this should not really happen since the remote federate should be + // accepting socket connections. But possibly it will be busy (in process of accepting + // another socket connection?). Hence, we retry. + if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) { + // If the remote federate is not accepting the connection after CONNECT_TIMEOUT + // treat it as a soft error condition and return. + lf_print_error("Failed to connect to federate %d with timeout: " PRINTF_TIME ". Giving up.", remote_federate_id, + CONNECT_TIMEOUT); + return; + } - // Try again after some time if the connection failed. - // Note that this should not really happen since the remote federate should be - // accepting socket connections. But possibly it will be busy (in process of accepting - // another socket connection?). Hence, we retry. - if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) { - // If the remote federate is not accepting the connection after CONNECT_TIMEOUT - // treat it as a soft error condition and return. - lf_print_error("Failed to connect to federate %d with timeout: " PRINTF_TIME ". Giving up.", remote_federate_id, - CONNECT_TIMEOUT); - return; - } - lf_print_warning("Could not connect to federate %d. Will try again every" PRINTF_TIME "nanoseconds.\n", - remote_federate_id, ADDRESS_QUERY_RETRY_INTERVAL); + // Check whether the RTI is still there. + if (rti_failed()) { + break; + } + // Connect was successful. + size_t buffer_length = 1 + sizeof(uint16_t) + 1; + unsigned char buffer[buffer_length]; + buffer[0] = MSG_TYPE_P2P_SENDING_FED_ID; + if (_lf_my_fed_id == UINT16_MAX) { + lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX - 1); + } + encode_uint16((uint16_t)_lf_my_fed_id, (unsigned char*)&(buffer[1])); + unsigned char federation_id_length = (unsigned char)strnlen(federation_metadata.federation_id, 255); + buffer[sizeof(uint16_t) + 1] = federation_id_length; + // Trace the event when tracing is enabled + tracepoint_federate_to_federate(send_FED_ID, _lf_my_fed_id, remote_federate_id, NULL); - // Check whether the RTI is still there. - if (rti_failed()) - break; + // No need for a mutex because we have the only handle on the socket. + write_to_socket_fail_on_error(&socket_id, buffer_length, buffer, NULL, "Failed to send fed_id to federate %d.", + remote_federate_id); + write_to_socket_fail_on_error(&socket_id, federation_id_length, (unsigned char*)federation_metadata.federation_id, + NULL, "Failed to send federation id to federate %d.", remote_federate_id); + read_from_socket_fail_on_error(&socket_id, 1, (unsigned char*)buffer, NULL, + "Failed to read MSG_TYPE_ACK from federate %d in response to sending fed_id.", + remote_federate_id); + if (buffer[0] != MSG_TYPE_ACK) { + // Get the error code. + read_from_socket_fail_on_error(&socket_id, 1, (unsigned char*)buffer, NULL, + "Failed to read error code from federate %d in response to sending fed_id.", + remote_federate_id); + lf_print_error("Received MSG_TYPE_REJECT message from remote federate (%d).", buffer[0]); + result = -1; // Wait ADDRESS_QUERY_RETRY_INTERVAL nanoseconds. lf_sleep(ADDRESS_QUERY_RETRY_INTERVAL); + lf_print_warning("Could not connect to federate %d. Will try again every" PRINTF_TIME "nanoseconds.\n", + remote_federate_id, ADDRESS_QUERY_RETRY_INTERVAL); + continue; } else { - // Connect was successful. - size_t buffer_length = 1 + sizeof(uint16_t) + 1; - unsigned char buffer[buffer_length]; - buffer[0] = MSG_TYPE_P2P_SENDING_FED_ID; - if (_lf_my_fed_id == UINT16_MAX) { - lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX - 1); - } - encode_uint16((uint16_t)_lf_my_fed_id, (unsigned char*)&(buffer[1])); - unsigned char federation_id_length = (unsigned char)strnlen(federation_metadata.federation_id, 255); - buffer[sizeof(uint16_t) + 1] = federation_id_length; + lf_print("Connected to federate %d, port %hu.", remote_federate_id, uport); // Trace the event when tracing is enabled - tracepoint_federate_to_federate(send_FED_ID, _lf_my_fed_id, remote_federate_id, NULL); - - // No need for a mutex because we have the only handle on the socket. - write_to_socket_fail_on_error(&socket_id, buffer_length, buffer, NULL, "Failed to send fed_id to federate %d.", - remote_federate_id); - write_to_socket_fail_on_error(&socket_id, federation_id_length, (unsigned char*)federation_metadata.federation_id, - NULL, "Failed to send federation id to federate %d.", remote_federate_id); - - read_from_socket_fail_on_error(&socket_id, 1, (unsigned char*)buffer, NULL, - "Failed to read MSG_TYPE_ACK from federate %d in response to sending fed_id.", - remote_federate_id); - if (buffer[0] != MSG_TYPE_ACK) { - // Get the error code. - read_from_socket_fail_on_error(&socket_id, 1, (unsigned char*)buffer, NULL, - "Failed to read error code from federate %d in response to sending fed_id.", - remote_federate_id); - lf_print_error("Received MSG_TYPE_REJECT message from remote federate (%d).", buffer[0]); - result = -1; - continue; - } else { - lf_print("Connected to federate %d, port %d.", remote_federate_id, port); - // Trace the event when tracing is enabled - tracepoint_federate_to_federate(receive_ACK, _lf_my_fed_id, remote_federate_id, NULL); - } + tracepoint_federate_to_federate(receive_ACK, _lf_my_fed_id, remote_federate_id, NULL); + break; } } // Once we set this variable, then all future calls to close() on this @@ -1896,59 +1835,14 @@ void lf_connect_to_rti(const char* hostname, int port) { hostname = federation_metadata.rti_host ? federation_metadata.rti_host : hostname; port = federation_metadata.rti_port >= 0 ? federation_metadata.rti_port : port; - // Adjust the port. - uint16_t uport = 0; - if (port < 0 || port > INT16_MAX) { - lf_print_error("lf_connect_to_rti(): Specified port (%d) is out of range," - " using the default port %d instead.", - port, DEFAULT_PORT); - uport = DEFAULT_PORT; - port = 0; // Mark so that increments occur between tries. - } else { - uport = (uint16_t)port; - } - if (uport == 0) { - uport = DEFAULT_PORT; - } - // Create a socket _fed.socket_TCP_RTI = create_real_time_tcp_socket_errexit(); - - int result = -1; - struct addrinfo* res = NULL; + if (connect_to_socket(_fed.socket_TCP_RTI, hostname, port) < 0) { + lf_print_error_and_exit("Failed to connect() to RTI."); + } instant_t start_connect = lf_time_physical(); while (!CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT) && !_lf_termination_executed) { - if (res != NULL) { - // This is a repeated attempt. - if (_fed.socket_TCP_RTI >= 0) - close_rti_socket(); - - lf_sleep(CONNECT_RETRY_INTERVAL); - - // Create a new socket. - _fed.socket_TCP_RTI = create_real_time_tcp_socket_errexit(); - - if (port == 0) { - // Free previously allocated address info. - freeaddrinfo(res); - // Increment the port number. - uport++; - if (uport >= DEFAULT_PORT + MAX_NUM_PORT_ADDRESSES) - uport = DEFAULT_PORT; - - // Reconstruct the address info. - rti_address(hostname, uport, &res); - } - lf_print("Trying RTI again on port %d.", uport); - } else { - // This is the first attempt. - rti_address(hostname, uport, &res); - } - - result = connect(_fed.socket_TCP_RTI, res->ai_addr, res->ai_addrlen); - if (result < 0) - continue; // Connect failed. // Have connected to an RTI, but not sure it's the right RTI. // Send a MSG_TYPE_FED_IDS message and wait for a reply. @@ -1961,7 +1855,6 @@ void lf_connect_to_rti(const char* hostname, int port) { continue; // Try again with a new port. } else { // No point in trying again because it will be the same port. - close_rti_socket(); lf_print_error_and_exit("Authentication failed."); } } @@ -2014,7 +1907,7 @@ void lf_connect_to_rti(const char* hostname, int port) { read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, 1, &cause, NULL, "Failed to read the cause of rejection by the RTI."); if (cause == FEDERATION_ID_DOES_NOT_MATCH || cause == WRONG_SERVER) { - lf_print_warning("Connected to the wrong RTI on port %d. Will try again", uport); + lf_print_warning("Connected to the wrong RTI. Will try again"); continue; } } else if (response == MSG_TYPE_ACK) { @@ -2023,18 +1916,13 @@ void lf_connect_to_rti(const char* hostname, int port) { LF_PRINT_LOG("Received acknowledgment from the RTI."); break; } else if (response == MSG_TYPE_RESIGN) { - lf_print_warning("RTI on port %d resigned. Will try again", uport); + lf_print_warning("RTI resigned. Will try again"); continue; } else { - lf_print_warning("RTI on port %d gave unexpect response %u. Will try again", uport, response); + lf_print_warning("RTI gave unexpect response %u. Will try again", response); continue; } } - if (result < 0) { - lf_print_error_and_exit("Failed to connect to RTI with timeout: " PRINTF_TIME, CONNECT_TIMEOUT); - } - - freeaddrinfo(res); /* No longer needed */ // Call a generated (external) function that sends information // about connections between this federate and other federates @@ -2050,55 +1938,13 @@ void lf_connect_to_rti(const char* hostname, int port) { encode_uint16(udp_port, &(UDP_port_number[1])); write_to_socket_fail_on_error(&_fed.socket_TCP_RTI, 1 + sizeof(uint16_t), UDP_port_number, NULL, "Failed to send the UDP port number to the RTI."); - - lf_print("Connected to RTI at %s:%d.", hostname, uport); } void lf_create_server(int specified_port) { assert(specified_port <= UINT16_MAX && specified_port >= 0); - uint16_t port = (uint16_t)specified_port; - LF_PRINT_LOG("Creating a socket server on port %d.", port); - // Create an IPv4 socket for TCP (not UDP) communication over IP (0). - int socket_descriptor = create_real_time_tcp_socket_errexit(); - - // Server file descriptor. - struct sockaddr_in server_fd; - // Zero out the server address structure. - bzero((char*)&server_fd, sizeof(server_fd)); - - server_fd.sin_family = AF_INET; // IPv4 - server_fd.sin_addr.s_addr = INADDR_ANY; // All interfaces, 0.0.0.0. - // Convert the port number from host byte order to network byte order. - server_fd.sin_port = htons(port); - - int result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd)); - int count = 0; - while (result < 0 && count++ < PORT_BIND_RETRY_LIMIT) { - lf_sleep(PORT_BIND_RETRY_INTERVAL); - result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd)); - } - if (result < 0) { - lf_print_error_and_exit("Failed to bind socket on port %d.", port); - } - - // Set the global server port. - if (specified_port == 0) { - // Need to retrieve the port number assigned by the OS. - struct sockaddr_in assigned; - socklen_t addr_len = sizeof(assigned); - if (getsockname(socket_descriptor, (struct sockaddr*)&assigned, &addr_len) < 0) { - lf_print_error_and_exit("Failed to retrieve assigned port number."); - } - _fed.server_port = ntohs(assigned.sin_port); - } else { - _fed.server_port = port; - } - - // Enable listening for socket connections. - // The second argument is the maximum number of queued socket requests, - // which according to the Mac man page is limited to 128. - listen(socket_descriptor, 128); - + if (create_server(specified_port, &_fed.server_socket, (uint16_t*)&_fed.server_port, TCP, false)) { + lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno)); + }; LF_PRINT_LOG("Server for communicating with other federates started using port %d.", _fed.server_port); // Send the server port number to the RTI @@ -2115,9 +1961,6 @@ void lf_create_server(int specified_port) { "Failed to send address advertisement."); LF_PRINT_DEBUG("Sent port %d to the RTI.", _fed.server_port); - - // Set the global server socket - _fed.server_socket = socket_descriptor; } void lf_enqueue_port_absent_reactions(environment_t* env) { @@ -2151,21 +1994,10 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { _fed.inbound_socket_listeners = (lf_thread_t*)calloc(_fed.number_of_inbound_p2p_connections, sizeof(lf_thread_t)); while (received_federates < _fed.number_of_inbound_p2p_connections && !_lf_termination_executed) { // Wait for an incoming connection request. - struct sockaddr client_fd; - uint32_t client_length = sizeof(client_fd); - int socket_id = accept(_fed.server_socket, &client_fd, &client_length); - + int socket_id = accept_socket(_fed.server_socket, _fed.socket_TCP_RTI); if (socket_id < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { - if (rti_failed()) - break; - else - continue; // Try again. - } else if (errno == EPERM) { - lf_print_error_system_failure("Firewall permissions prohibit connection."); - } else { - lf_print_error_system_failure("A fatal error occurred while accepting a new socket."); - } + lf_print_warning("Federate failed to accept the socket."); + return NULL; } LF_PRINT_LOG("Accepted new connection from remote federate."); diff --git a/core/federated/network/CMakeLists.txt b/core/federated/network/CMakeLists.txt index 5306eae02..f61d69897 100644 --- a/core/federated/network/CMakeLists.txt +++ b/core/federated/network/CMakeLists.txt @@ -1,4 +1,4 @@ -set(LF_NETWORK_FILES net_util.c) +set(LF_NETWORK_FILES net_util.c socket_common.c) list(TRANSFORM LF_NETWORK_FILES PREPEND federated/network/) list(APPEND REACTORC_SOURCES ${LF_NETWORK_FILES}) diff --git a/core/federated/network/net_util.c b/core/federated/network/net_util.c index 61d4804bd..bcea05495 100644 --- a/core/federated/network/net_util.c +++ b/core/federated/network/net_util.c @@ -45,184 +45,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "net_util.h" #include "util.h" -// Define socket functions only for federated execution. -#ifdef FEDERATED -#include // Defines read(), write(), and close() - -#ifndef NUMBER_OF_FEDERATES -#define NUMBER_OF_FEDERATES 1 -#endif - -/** Number of nanoseconds to sleep before retrying a socket read. */ -#define SOCKET_READ_RETRY_INTERVAL 1000000 - -// Mutex lock held while performing socket close operations. -// A deadlock can occur if two threads simulataneously attempt to close the same socket. -lf_mutex_t socket_mutex; - -int create_real_time_tcp_socket_errexit() { - int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (sock < 0) { - lf_print_error_system_failure("Could not open TCP socket."); - } - // Disable Nagle's algorithm which bundles together small TCP messages to - // reduce network traffic. - // TODO: Re-consider if we should do this, and whether disabling delayed ACKs - // is enough. - int flag = 1; - int result = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int)); - - if (result < 0) { - lf_print_error_system_failure("Failed to disable Nagle algorithm on socket server."); - } - -#if defined(PLATFORM_Linux) - // Disable delayed ACKs. Only possible on Linux - result = setsockopt(sock, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(int)); - - if (result < 0) { - lf_print_error_system_failure("Failed to disable Nagle algorithm on socket server."); - } -#endif // Linux - - return sock; -} - -int read_from_socket(int socket, size_t num_bytes, unsigned char* buffer) { - if (socket < 0) { - // Socket is not open. - errno = EBADF; - return -1; - } - ssize_t bytes_read = 0; - while (bytes_read < (ssize_t)num_bytes) { - ssize_t more = read(socket, buffer + bytes_read, num_bytes - (size_t)bytes_read); - if (more < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { - // Those error codes set by the socket indicates - // that we should try again (@see man errno). - LF_PRINT_DEBUG("Reading from socket %d failed with error: `%s`. Will try again.", socket, strerror(errno)); - lf_sleep(DELAY_BETWEEN_SOCKET_RETRIES); - continue; - } else if (more < 0) { - // A more serious error occurred. - lf_print_error("Reading from socket %d failed. With error: `%s`", socket, strerror(errno)); - return -1; - } else if (more == 0) { - // EOF received. - return 1; - } - bytes_read += more; - } - return 0; -} - -int read_from_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* buffer) { - assert(socket); - int read_failed = read_from_socket(*socket, num_bytes, buffer); - if (read_failed) { - // Read failed. - // Socket has probably been closed from the other side. - // Shut down and close the socket from this side. - shutdown(*socket, SHUT_RDWR); - close(*socket); - // Mark the socket closed. - *socket = -1; - return -1; - } - return 0; -} - -void read_from_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, - char* format, ...) { - va_list args; - assert(socket); - int read_failed = read_from_socket_close_on_error(socket, num_bytes, buffer); - if (read_failed) { - // Read failed. - if (mutex != NULL) { - LF_MUTEX_UNLOCK(mutex); - } - if (format != NULL) { - va_start(args, format); - lf_print_error_system_failure(format, args); - va_end(args); - } else { - lf_print_error_system_failure("Failed to read from socket."); - } - } -} - -ssize_t peek_from_socket(int socket, unsigned char* result) { - ssize_t bytes_read = recv(socket, result, 1, MSG_DONTWAIT | MSG_PEEK); - if (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) - return 0; - else - return bytes_read; -} - -int write_to_socket(int socket, size_t num_bytes, unsigned char* buffer) { - if (socket < 0) { - // Socket is not open. - errno = EBADF; - return -1; - } - ssize_t bytes_written = 0; - while (bytes_written < (ssize_t)num_bytes) { - ssize_t more = write(socket, buffer + bytes_written, num_bytes - (size_t)bytes_written); - if (more <= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { - // The error codes EAGAIN or EWOULDBLOCK indicate - // that we should try again (@see man errno). - // The error code EINTR means the system call was interrupted before completing. - LF_PRINT_DEBUG("Writing to socket %d was blocked. Will try again.", socket); - lf_sleep(DELAY_BETWEEN_SOCKET_RETRIES); - continue; - } else if (more < 0) { - // A more serious error occurred. - lf_print_error("Writing to socket %d failed. With error: `%s`", socket, strerror(errno)); - return -1; - } - bytes_written += more; - } - return 0; -} - -int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* buffer) { - assert(socket); - int result = write_to_socket(*socket, num_bytes, buffer); - if (result) { - // Write failed. - // Socket has probably been closed from the other side. - // Shut down and close the socket from this side. - shutdown(*socket, SHUT_RDWR); - close(*socket); - // Mark the socket closed. - *socket = -1; - } - return result; -} - -void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, - char* format, ...) { - va_list args; - assert(socket); - int result = write_to_socket_close_on_error(socket, num_bytes, buffer); - if (result) { - // Write failed. - if (mutex != NULL) { - LF_MUTEX_UNLOCK(mutex); - } - if (format != NULL) { - va_start(args, format); - lf_print_error_system_failure(format, args); - va_end(args); - } else { - lf_print_error("Failed to write to socket. Closing it."); - } - } -} - -#endif // FEDERATED - // Below are more generally useful functions. void encode_int64(int64_t data, unsigned char* buffer) { diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c new file mode 100644 index 000000000..cc71e897b --- /dev/null +++ b/core/federated/network/socket_common.c @@ -0,0 +1,404 @@ +#include // Defines read(), write(), and close() +#include // IPPROTO_TCP, IPPROTO_UDP +#include // TCP_NODELAY +#include +#include +#include +#include +#include +#include +#include //va_list +#include // strerror + +#include "util.h" +#include "socket_common.h" + +#ifndef NUMBER_OF_FEDERATES +#define NUMBER_OF_FEDERATES 1 +#endif + +/** Number of nanoseconds to sleep before retrying a socket read. */ +#define SOCKET_READ_RETRY_INTERVAL 1000000 + +// Mutex lock held while performing socket close operations. +// A deadlock can occur if two threads simulataneously attempt to close the same socket. +lf_mutex_t socket_mutex; + +int create_real_time_tcp_socket_errexit() { + int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sock < 0) { + lf_print_error_system_failure("Could not open TCP socket."); + } + // Disable Nagle's algorithm which bundles together small TCP messages to + // reduce network traffic. + // TODO: Re-consider if we should do this, and whether disabling delayed ACKs + // is enough. + int flag = 1; + int result = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int)); + + if (result < 0) { + lf_print_error_system_failure("Failed to disable Nagle algorithm on socket server."); + } + +#if defined(PLATFORM_Linux) + // Disable delayed ACKs. Only possible on Linux + result = setsockopt(sock, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(int)); + + if (result < 0) { + lf_print_error_system_failure("Failed to disable Nagle algorithm on socket server."); + } +#endif // Linux + + return sock; +} + +/** + * Set the socket timeout options. + * @param socket_descriptor The file descriptor of the socket on which to set options. + * @param timeout_time A pointer to a `struct timeval` that specifies the timeout duration + * for socket operations (receive and send). + */ +static void set_socket_timeout_option(int socket_descriptor, struct timeval* timeout_time) { + // Set the option for this socket to reuse the same address + int true_variable = 1; // setsockopt() requires a reference to the value assigned to an option + if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &true_variable, sizeof(int32_t)) < 0) { + lf_print_error("Failed to set SO_REUSEADDR option on the socket: %s.", strerror(errno)); + } + // Set the timeout on the socket so that read and write operations don't block for too long + if (setsockopt(socket_descriptor, SOL_SOCKET, SO_RCVTIMEO, (const char*)timeout_time, sizeof(*timeout_time)) < 0) { + lf_print_error("Failed to set SO_RCVTIMEO option on the socket: %s.", strerror(errno)); + } + if (setsockopt(socket_descriptor, SOL_SOCKET, SO_SNDTIMEO, (const char*)timeout_time, sizeof(*timeout_time)) < 0) { + lf_print_error("Failed to set SO_SNDTIMEO option on the socket: %s.", strerror(errno)); + } +} + +/** + * Assign a port to the socket, and bind the socket. + * + * @param socket_descriptor The file descriptor of the socket to be bound to an address and port. + * @param specified_port The port number to bind the socket to. + * @param increment_port_on_retry Boolean to retry port increment. + * @return The final port number used. + */ +static int set_socket_bind_option(int socket_descriptor, uint16_t specified_port, bool increment_port_on_retry) { + // Server file descriptor. + struct sockaddr_in server_fd; + // Zero out the server address structure. + bzero((char*)&server_fd, sizeof(server_fd)); + uint16_t used_port = specified_port; + if (specified_port == 0 && increment_port_on_retry == true) { + used_port = DEFAULT_PORT; + } + server_fd.sin_family = AF_INET; // IPv4 + server_fd.sin_addr.s_addr = INADDR_ANY; // All interfaces, 0.0.0.0. + server_fd.sin_port = htons(used_port); // Convert the port number from host byte order to network byte order. + + int result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd)); + + // Try repeatedly to bind to a port. + int count = 1; + while (result != 0 && count++ < PORT_BIND_RETRY_LIMIT) { + if (specified_port == 0 && increment_port_on_retry == true) { + // If the specified port number is zero, and the increment_port_on_retry is true, increment the port number each + // time. + lf_print_warning("RTI failed to get port %d.", used_port); + used_port++; + if (used_port >= DEFAULT_PORT + MAX_NUM_PORT_ADDRESSES) + used_port = DEFAULT_PORT; + lf_print_warning("RTI will try again with port %d.", used_port); + server_fd.sin_port = htons(used_port); + // Do not sleep. + } else { + lf_print("Failed to bind socket on port %d. Will try again.", used_port); + lf_sleep(PORT_BIND_RETRY_INTERVAL); + } + result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd)); + } + + // Set the global server port. + if (specified_port == 0 && increment_port_on_retry == false) { + // Need to retrieve the port number assigned by the OS. + struct sockaddr_in assigned; + socklen_t addr_len = sizeof(assigned); + if (getsockname(socket_descriptor, (struct sockaddr*)&assigned, &addr_len) < 0) { + lf_print_error_and_exit("Federate failed to retrieve assigned port number."); + } + used_port = ntohs(assigned.sin_port); + } + if (result != 0) { + lf_print_error_and_exit("Failed to bind the socket. Port %d is not available. ", used_port); + } + lf_print_debug("Socket is binded to port %d.", used_port); + return used_port; +} + +int create_server(uint16_t port, int* final_socket, uint16_t* final_port, socket_type_t sock_type, + bool increment_port_on_retry) { + int socket_descriptor; + struct timeval timeout_time; + if (sock_type == TCP) { + // Create an IPv4 socket for TCP. + socket_descriptor = create_real_time_tcp_socket_errexit(); + // Set the timeout time for the communications of the server + timeout_time = + (struct timeval){.tv_sec = TCP_TIMEOUT_TIME / BILLION, .tv_usec = (TCP_TIMEOUT_TIME % BILLION) / 1000}; + } else { + // Create a UDP socket. + socket_descriptor = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + timeout_time = + (struct timeval){.tv_sec = UDP_TIMEOUT_TIME / BILLION, .tv_usec = (UDP_TIMEOUT_TIME % BILLION) / 1000}; + } + char* type = (sock_type == TCP) ? "TCP" : "UDP"; + if (socket_descriptor < 0) { + lf_print_error("Failed to create %s socket.", type); + return -1; + } + set_socket_timeout_option(socket_descriptor, &timeout_time); + int used_port = set_socket_bind_option(socket_descriptor, port, increment_port_on_retry); + if (sock_type == TCP) { + // Enable listening for socket connections. + // The second argument is the maximum number of queued socket requests, + // which according to the Mac man page is limited to 128. + if (listen(socket_descriptor, 128)) { + lf_print_error("Failed to listen on %d socket: %s.", socket_descriptor, strerror(errno)); + return -1; + } + } + *final_socket = socket_descriptor; + *final_port = used_port; + return 0; +} + +/** + * Return true if either the socket to the RTI is broken or the socket is + * alive and the first unread byte on the socket's queue is MSG_TYPE_FAILED. + */ +static bool check_socket_closed(int socket) { + unsigned char first_byte; + ssize_t bytes = peek_from_socket(socket, &first_byte); + if (bytes < 0 || (bytes == 1 && first_byte == MSG_TYPE_FAILED)) { + return true; + } else { + return false; + } +} + +int accept_socket(int socket, int rti_socket) { + struct sockaddr client_fd; + // Wait for an incoming connection request. + uint32_t client_length = sizeof(client_fd); + // The following blocks until a federate connects. + int socket_id = -1; + while (true) { + // When close(socket) is called, the accept() will return -1. + socket_id = accept(socket, &client_fd, &client_length); + if (socket_id >= 0) { + // Got a socket + break; + } else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK || errno != EINTR)) { + lf_print_warning("Failed to accept the socket. %s.", strerror(errno)); + break; + } else if (errno == EPERM) { + lf_print_error_system_failure("Firewall permissions prohibit connection."); + } else { + // For the federates, it should check if the rti_socket is still open, before retrying accept(). + if (rti_socket == -1) { + if (check_socket_closed(rti_socket)) { + break; + } + } + // Try again + lf_print_warning("Failed to accept the socket. %s. Trying again.", strerror(errno)); + continue; + } + } + return socket_id; +} + +int connect_to_socket(int sock, const char* hostname, int port) { + struct addrinfo hints; + struct addrinfo* result; + int ret = -1; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; /* Allow IPv4 */ + hints.ai_socktype = SOCK_STREAM; /* Stream socket */ + hints.ai_protocol = IPPROTO_TCP; /* TCP protocol */ + hints.ai_addr = NULL; + hints.ai_next = NULL; + hints.ai_flags = AI_NUMERICSERV; /* Allow only numeric port numbers */ + + uint16_t used_port = (port == 0) ? DEFAULT_PORT : (uint16_t)port; + + instant_t start_connect = lf_time_physical(); + // while (!_lf_termination_executed) { // Not working... + while (1) { + if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) { + lf_print_error("Failed to connect with timeout: " PRINTF_TIME ". Giving up.", CONNECT_TIMEOUT); + break; + } + // Convert port number to string. + char str[6]; + sprintf(str, "%u", used_port); + + // Get address structure matching hostname and hints criteria, and + // set port to the port number provided in str. There should only + // ever be one matching address structure, and we connect to that. + if (getaddrinfo(hostname, (const char*)&str, &hints, &result)) { + lf_print_error("No host matching given hostname: %s", hostname); + break; + } + ret = connect(sock, result->ai_addr, result->ai_addrlen); + if (ret < 0) { + lf_sleep(CONNECT_RETRY_INTERVAL); + if (port == 0) { + used_port++; + if (used_port >= DEFAULT_PORT + MAX_NUM_PORT_ADDRESSES) { + used_port = DEFAULT_PORT; + } + } + lf_print_warning("Could not connect. Will try again every " PRINTF_TIME " nanoseconds. Connecting to port %d.\n", + CONNECT_RETRY_INTERVAL, used_port); + continue; + } else { + break; + } + freeaddrinfo(result); + } + lf_print("Connected to %s:%d.", hostname, used_port); + return ret; +} + +int read_from_socket(int socket, size_t num_bytes, unsigned char* buffer) { + if (socket < 0) { + // Socket is not open. + errno = EBADF; + return -1; + } + ssize_t bytes_read = 0; + while (bytes_read < (ssize_t)num_bytes) { + ssize_t more = read(socket, buffer + bytes_read, num_bytes - (size_t)bytes_read); + if (more < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { + // Those error codes set by the socket indicates + // that we should try again (@see man errno). + LF_PRINT_DEBUG("Reading from socket %d failed with error: `%s`. Will try again.", socket, strerror(errno)); + lf_sleep(DELAY_BETWEEN_SOCKET_RETRIES); + continue; + } else if (more < 0) { + // A more serious error occurred. + lf_print_error("Reading from socket %d failed. With error: `%s`", socket, strerror(errno)); + return -1; + } else if (more == 0) { + // EOF received. + return 1; + } + bytes_read += more; + } + return 0; +} + +int read_from_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* buffer) { + assert(socket); + int read_failed = read_from_socket(*socket, num_bytes, buffer); + if (read_failed) { + // Read failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown(*socket, SHUT_RDWR); + close(*socket); + // Mark the socket closed. + *socket = -1; + return -1; + } + return 0; +} + +void read_from_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...) { + va_list args; + assert(socket); + int read_failed = read_from_socket_close_on_error(socket, num_bytes, buffer); + if (read_failed) { + // Read failed. + if (mutex != NULL) { + LF_MUTEX_UNLOCK(mutex); + } + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error_system_failure("Failed to read from socket."); + } + } +} + +ssize_t peek_from_socket(int socket, unsigned char* result) { + ssize_t bytes_read = recv(socket, result, 1, MSG_DONTWAIT | MSG_PEEK); + if (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + return 0; + else + return bytes_read; +} + +int write_to_socket(int socket, size_t num_bytes, unsigned char* buffer) { + if (socket < 0) { + // Socket is not open. + errno = EBADF; + return -1; + } + ssize_t bytes_written = 0; + while (bytes_written < (ssize_t)num_bytes) { + ssize_t more = write(socket, buffer + bytes_written, num_bytes - (size_t)bytes_written); + if (more <= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { + // The error codes EAGAIN or EWOULDBLOCK indicate + // that we should try again (@see man errno). + // The error code EINTR means the system call was interrupted before completing. + LF_PRINT_DEBUG("Writing to socket %d was blocked. Will try again.", socket); + lf_sleep(DELAY_BETWEEN_SOCKET_RETRIES); + continue; + } else if (more < 0) { + // A more serious error occurred. + lf_print_error("Writing to socket %d failed. With error: `%s`", socket, strerror(errno)); + return -1; + } + bytes_written += more; + } + return 0; +} + +int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* buffer) { + assert(socket); + int result = write_to_socket(*socket, num_bytes, buffer); + if (result) { + // Write failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown(*socket, SHUT_RDWR); + close(*socket); + // Mark the socket closed. + *socket = -1; + } + return result; +} + +void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...) { + va_list args; + assert(socket); + int result = write_to_socket_close_on_error(socket, num_bytes, buffer); + if (result) { + // Write failed. + if (mutex != NULL) { + LF_MUTEX_UNLOCK(mutex); + } + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error("Failed to write to socket. Closing it."); + } + } +} diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 79ce19550..47826be3e 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -179,18 +179,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #ifndef NET_COMMON_H #define NET_COMMON_H -/** - * The timeout time in ns for TCP operations. - * Default value is 10 secs. - */ -#define TCP_TIMEOUT_TIME SEC(10) - -/** - * The timeout time in ns for UDP operations. - * Default value is 1 sec. - */ -#define UDP_TIMEOUT_TIME SEC(1) - /** * Size of the buffer used for messages sent between federates. * This is used by both the federates and the rti, so message lengths @@ -198,25 +186,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define FED_COM_BUFFER_SIZE 256u -/** - * Time between a federate's attempts to connect to the RTI. - */ -#define CONNECT_RETRY_INTERVAL MSEC(500) - -/** - * Bound on the number of retries to connect to the RTI. - * A federate will retry every CONNECT_RETRY_INTERVAL seconds until - * CONNECTION_TIMEOUT expires. - */ -#define CONNECT_TIMEOUT MINUTES(1) - -/** - * Maximum number of port addresses that a federate will try to connect to the RTI on. - * If you are using automatic ports begining at DEFAULT_PORT, this puts an upper bound - * on the number of RTIs that can be running on the same host. - */ -#define MAX_NUM_PORT_ADDRESSES 16 - /** * Time that a federate waits before asking * the RTI again for the port and IP address of a federate @@ -225,28 +194,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define ADDRESS_QUERY_RETRY_INTERVAL MSEC(250) -/** - * Time to wait before re-attempting to bind to a port. - * When a process closes, the network stack typically waits between 30 and 120 - * seconds before releasing the port. This is to allow for delayed packets so - * that a new process does not receive packets from a previous process. - * Here, we limit the retries to 60 seconds. - */ -#define PORT_BIND_RETRY_INTERVAL SEC(1) - -/** - * Number of attempts to bind to a port before giving up. - */ -#define PORT_BIND_RETRY_LIMIT 60 - -/** - * Default port number for the RTI. - * Unless a specific port has been specified by the LF program in the "at" - * for the RTI or on the command line, when the RTI starts up, it will attempt - * to open a socket server on this port. - */ -#define DEFAULT_PORT 15045u - /** * Delay the start of all federates by this amount. * This helps ensure that the federates do not start at the same time. diff --git a/include/core/federated/network/net_util.h b/include/core/federated/network/net_util.h index 24b4782f9..1e9008816 100644 --- a/include/core/federated/network/net_util.h +++ b/include/core/federated/network/net_util.h @@ -51,8 +51,9 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "low_level_platform.h" #include "tag.h" -#define NUM_SOCKET_RETRIES 10 -#define DELAY_BETWEEN_SOCKET_RETRIES MSEC(100) +#ifdef FEDERATED +#include "socket_common.h" +#endif #define HOST_LITTLE_ENDIAN 1 #define HOST_BIG_ENDIAN 2 @@ -63,125 +64,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ int host_is_big_endian(void); -#ifdef FEDERATED - -/** - * Mutex protecting socket close operations. - */ -extern lf_mutex_t socket_mutex; - -/** - * @brief Create an IPv4 TCP socket with Nagle's algorithm disabled - * (TCP_NODELAY) and Delayed ACKs disabled (TCP_QUICKACK). Exits application - * on any error. - * - * @return The socket ID (a file descriptor). - */ -int create_real_time_tcp_socket_errexit(); - -/** - * Read the specified number of bytes from the specified socket into the specified buffer. - * If an error occurs during this reading, return -1 and set errno to indicate - * the cause of the error. If the read succeeds in reading the specified number of bytes, - * return 0. If an EOF occurs before reading the specified number of bytes, return 1. - * This function repeats the read attempt until the specified number of bytes - * have been read, an EOF is read, or an error occurs. Specifically, errors EAGAIN, - * EWOULDBLOCK, and EINTR are not considered errors and instead trigger - * another attempt. A delay between attempts is given by DELAY_BETWEEN_SOCKET_RETRIES. - * @param socket The socket ID. - * @param num_bytes The number of bytes to read. - * @param buffer The buffer into which to put the bytes. - * @return 0 for success, 1 for EOF, and -1 for an error. - */ -int read_from_socket(int socket, size_t num_bytes, unsigned char* buffer); - -/** - * Read the specified number of bytes to the specified socket using read_from_socket - * and close the socket if an error occurs. If an error occurs, this will change the - * socket ID pointed to by the first argument to -1 and will return -1. - * @param socket Pointer to the socket ID. - * @param num_bytes The number of bytes to write. - * @param buffer The buffer from which to get the bytes. - * @return 0 for success, -1 for failure. - */ -int read_from_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* buffer); - -/** - * Read the specified number of bytes from the specified socket into the - * specified buffer. If a disconnect or an EOF occurs during this - * reading, then if format is non-null, report an error and exit. - * If the mutex argument is non-NULL, release the mutex before exiting. - * If format is null, then report the error, but do not exit. - * This function takes a formatted string and additional optional arguments - * similar to printf(format, ...) that is appended to the error messages. - * @param socket The socket ID. - * @param num_bytes The number of bytes to read. - * @param buffer The buffer into which to put the bytes. - * @param format A printf-style format string, followed by arguments to - * fill the string, or NULL to not exit with an error message. - * @return The number of bytes read, or 0 if an EOF is received, or - * a negative number for an error. - */ -void read_from_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, - char* format, ...); - -/** - * Without blocking, peek at the specified socket and, if there is - * anything on the queue, put its first byte at the specified address and return 1. - * If there is nothing on the queue, return 0, and if an error occurs, - * return -1. - * @param socket The socket ID. - * @param result Pointer to where to put the first byte available on the socket. - */ -ssize_t peek_from_socket(int socket, unsigned char* result); - -/** - * Write the specified number of bytes to the specified socket from the - * specified buffer. If an error occurs, return -1 and set errno to indicate - * the cause of the error. If the write succeeds, return 0. - * This function repeats the attempt until the specified number of bytes - * have been written or an error occurs. Specifically, errors EAGAIN, - * EWOULDBLOCK, and EINTR are not considered errors and instead trigger - * another attempt. A delay between attempts is given by - * DELAY_BETWEEN_SOCKET_RETRIES. - * @param socket The socket ID. - * @param num_bytes The number of bytes to write. - * @param buffer The buffer from which to get the bytes. - * @return 0 for success, -1 for failure. - */ -int write_to_socket(int socket, size_t num_bytes, unsigned char* buffer); - -/** - * Write the specified number of bytes to the specified socket using write_to_socket - * and close the socket if an error occurs. If an error occurs, this will change the - * socket ID pointed to by the first argument to -1 and will return -1. - * @param socket Pointer to the socket ID. - * @param num_bytes The number of bytes to write. - * @param buffer The buffer from which to get the bytes. - * @return 0 for success, -1 for failure. - */ -int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* buffer); - -/** - * Write the specified number of bytes to the specified socket using - * write_to_socket_close_on_error and exit with an error code if an error occurs. - * If the mutex argument is non-NULL, release the mutex before exiting. If the - * format argument is non-null, then use it an any additional arguments to form - * the error message using printf conventions. Otherwise, print a generic error - * message. - * @param socket Pointer to the socket ID. - * @param num_bytes The number of bytes to write. - * @param buffer The buffer from which to get the bytes. - * @param mutex If non-NULL, the mutex to unlock before exiting. - * @param format A format string for error messages, followed by any number of - * fields that will be used to fill the format string as in printf, or NULL - * to print a generic error message. - */ -void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, - char* format, ...); - -#endif // FEDERATED - /** * Write the specified data as a sequence of bytes starting * at the specified address. This encodes the data in little-endian diff --git a/include/core/federated/network/socket_common.h b/include/core/federated/network/socket_common.h new file mode 100644 index 000000000..9c6138e05 --- /dev/null +++ b/include/core/federated/network/socket_common.h @@ -0,0 +1,243 @@ +#ifndef SOCKET_COMMON_H +#define SOCKET_COMMON_H + +#include "low_level_platform.h" + +/** + * The amount of time to wait after a failed socket read or write before trying again. This defaults to 100 ms. + */ +#define DELAY_BETWEEN_SOCKET_RETRIES MSEC(100) + +/** + * The timeout time in ns for TCP operations. + * Default value is 10 secs. + */ +#define TCP_TIMEOUT_TIME SEC(10) + +/** + * The timeout time in ns for UDP operations. + * Default value is 1 sec. + */ +#define UDP_TIMEOUT_TIME SEC(1) + +/** + * Time between a federate's attempts to connect to the RTI. + */ +#define CONNECT_RETRY_INTERVAL MSEC(500) + +/** + * Bound on the number of retries to connect to the RTI. + * A federate will retry every CONNECT_RETRY_INTERVAL nanoseconds until + * CONNECTION_TIMEOUT expires. + */ +#define CONNECT_TIMEOUT MINUTES(1) + +/** + * Maximum number of port addresses that a federate will try to connect to the RTI on. + * If you are using automatic ports begining at DEFAULT_PORT, this puts an upper bound + * on the number of RTIs that can be running on the same host. + */ +#define MAX_NUM_PORT_ADDRESSES 16u + +/** + * Time to wait before re-attempting to bind to a port. + * When a process closes, the network stack typically waits between 30 and 120 + * seconds before releasing the port. This is to allow for delayed packets so + * that a new process does not receive packets from a previous process. + * Here, we limit the retries to 60 seconds. + */ +#define PORT_BIND_RETRY_INTERVAL SEC(1) + +/** + * Number of attempts to bind to a port before giving up. + */ +#define PORT_BIND_RETRY_LIMIT 60 + +/** + * Default port number for the RTI. + * Unless a specific port has been specified by the LF program in the "at" + * for the RTI or on the command line, when the RTI starts up, it will attempt + * to open a socket server on this port. + */ +#define DEFAULT_PORT 15045u + +/** + * Byte identifying that the federate or the RTI has failed. + */ +#define MSG_TYPE_FAILED 25 + +typedef enum socket_type_t { TCP, UDP } socket_type_t; + +/** + * Mutex protecting socket close operations. + */ +extern lf_mutex_t socket_mutex; + +/** + * @brief Create an IPv4 TCP socket with Nagle's algorithm disabled + * (TCP_NODELAY) and Delayed ACKs disabled (TCP_QUICKACK). Exits application + * on any error. + * + * @return The socket ID (a file descriptor). + */ +int create_real_time_tcp_socket_errexit(); + +/** + * @brief Create a TCP server that listens for socket connections. + * + * If the specified port number is greater than zero, this function will attempt to acquire that port. + * If the specified port number is zero, and the increment_port_on_retry is true, it will attempt to acquire + * DEFAULT_PORT. If it fails to acquire DEFAULT_PORT, then it will increment the port number from DEFAULT_PORT on each + * attempt until it has incremented MAX_NUM_PORT_ADDRESSES times, at which point it will cycle around and begin again + * with DEFAULT_PORT. + * If the port number is zero, and the increment_port_on_retry is false, it delegates to the operating system to provide + * an available port number. + * If acquiring the port fails, then this function will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times with a + * delay of PORT_BIND_RETRY_INTERVAL in between each try. + * + * @param port The port number to use or 0 to let the OS pick or 1 to start trying at DEFAULT_PORT. + * @param final_socket Pointer to the returned socket descriptor on which accepting connections will occur. + * @param final_port Pointer to the final port the server will use. + * @param sock_type Type of the socket, TCP or UDP. + * @param increment_port_on_retry Boolean to retry port increment. + * @return 0 for success, -1 for failure. + */ +int create_server(uint16_t port, int* final_socket, uint16_t* final_port, socket_type_t sock_type, + bool increment_port_on_retry); + +/** + * Wait for an incoming connection request on the specified server socket. + * This blocks until a connection is successfully accepted. If an error occurs that is not + * temporary (e.g., `EAGAIN` or `EWOULDBLOCK`), it reports the error and exits. Temporary + * errors cause the function to retry accepting the connection. + * + * If the `rti_socket` is not -1, this function checks whether the specified socket is still open. + * If it is not open, then this function returns -1. + * This is useful for federates to determine whether they are still connected to the federation + * and to stop waiting when they are not. + * + * @param socket The server socket file descriptor that is listening for incoming connections. + * @param rti_socket The rti socket for the federate to check if it is still open. + * @return The file descriptor for the newly accepted socket on success, or -1 on failure + * (with an appropriate error message printed). + */ + +int accept_socket(int socket, int rti_socket); + +/** + * + * Attempt to establish a TCP connection to the specified hostname + * and port. This function uses `getaddrinfo` to resolve the hostname and retries the connection + * periodically if it fails. If the specified port is 0, it iterates through a range + * of default ports starting from `DEFAULT_PORT`. The function will stop retrying + * if the `CONNECT_TIMEOUT` is reached. + * + * @param sock The socket file descriptor that has already been created (using `socket()`). + * @param hostname The hostname or IP address of the server to connect to. + * @param port The port number to connect to. If 0 is specified, a default port range will be used. + * @return 0 on success, -1 on failure, and `errno` is set to indicate the specific error. + */ +int connect_to_socket(int sock, const char* hostname, int port); + +/** + * Read the specified number of bytes from the specified socket into the specified buffer. + * If an error occurs during this reading, return -1 and set errno to indicate + * the cause of the error. If the read succeeds in reading the specified number of bytes, + * return 0. If an EOF occurs before reading the specified number of bytes, return 1. + * This function repeats the read attempt until the specified number of bytes + * have been read, an EOF is read, or an error occurs. Specifically, errors EAGAIN, + * EWOULDBLOCK, and EINTR are not considered errors and instead trigger + * another attempt. A delay between attempts is given by DELAY_BETWEEN_SOCKET_RETRIES. + * @param socket The socket ID. + * @param num_bytes The number of bytes to read. + * @param buffer The buffer into which to put the bytes. + * @return 0 for success, 1 for EOF, and -1 for an error. + */ +int read_from_socket(int socket, size_t num_bytes, unsigned char* buffer); + +/** + * Read the specified number of bytes to the specified socket using read_from_socket + * and close the socket if an error occurs. If an error occurs, this will change the + * socket ID pointed to by the first argument to -1 and will return -1. + * @param socket Pointer to the socket ID. + * @param num_bytes The number of bytes to write. + * @param buffer The buffer from which to get the bytes. + * @return 0 for success, -1 for failure. + */ +int read_from_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* buffer); + +/** + * Read the specified number of bytes from the specified socket into the + * specified buffer. If a disconnect or an EOF occurs during this + * reading, then if format is non-null, report an error and exit. + * If the mutex argument is non-NULL, release the mutex before exiting. + * If format is null, then report the error, but do not exit. + * This function takes a formatted string and additional optional arguments + * similar to printf(format, ...) that is appended to the error messages. + * @param socket The socket ID. + * @param num_bytes The number of bytes to read. + * @param buffer The buffer into which to put the bytes. + * @param format A printf-style format string, followed by arguments to + * fill the string, or NULL to not exit with an error message. + * @return The number of bytes read, or 0 if an EOF is received, or + * a negative number for an error. + */ +void read_from_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...); + +/** + * Without blocking, peek at the specified socket and, if there is + * anything on the queue, put its first byte at the specified address and return 1. + * If there is nothing on the queue, return 0, and if an error occurs, + * return -1. + * @param socket The socket ID. + * @param result Pointer to where to put the first byte available on the socket. + */ +ssize_t peek_from_socket(int socket, unsigned char* result); + +/** + * Write the specified number of bytes to the specified socket from the + * specified buffer. If an error occurs, return -1 and set errno to indicate + * the cause of the error. If the write succeeds, return 0. + * This function repeats the attempt until the specified number of bytes + * have been written or an error occurs. Specifically, errors EAGAIN, + * EWOULDBLOCK, and EINTR are not considered errors and instead trigger + * another attempt. A delay between attempts is given by + * DELAY_BETWEEN_SOCKET_RETRIES. + * @param socket The socket ID. + * @param num_bytes The number of bytes to write. + * @param buffer The buffer from which to get the bytes. + * @return 0 for success, -1 for failure. + */ +int write_to_socket(int socket, size_t num_bytes, unsigned char* buffer); + +/** + * Write the specified number of bytes to the specified socket using write_to_socket + * and close the socket if an error occurs. If an error occurs, this will change the + * socket ID pointed to by the first argument to -1 and will return -1. + * @param socket Pointer to the socket ID. + * @param num_bytes The number of bytes to write. + * @param buffer The buffer from which to get the bytes. + * @return 0 for success, -1 for failure. + */ +int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* buffer); + +/** + * Write the specified number of bytes to the specified socket using + * write_to_socket_close_on_error and exit with an error code if an error occurs. + * If the mutex argument is non-NULL, release the mutex before exiting. If the + * format argument is non-null, then use it an any additional arguments to form + * the error message using printf conventions. Otherwise, print a generic error + * message. + * @param socket Pointer to the socket ID. + * @param num_bytes The number of bytes to write. + * @param buffer The buffer from which to get the bytes. + * @param mutex If non-NULL, the mutex to unlock before exiting. + * @param format A format string for error messages, followed by any number of + * fields that will be used to fill the format string as in printf, or NULL + * to print a generic error message. + */ +void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...); + +#endif /* SOCKET_COMMON_H */