Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STOMP Connection Closes Due to Missing Heartbeats Despite Frontend Showing Ping-Pongs #34009

Open
abhishek0499 opened this issue Dec 3, 2024 · 5 comments
Labels
in: messaging Issues in messaging modules (jms, messaging) status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged or decided on

Comments

@abhishek0499
Copy link

I’m experiencing a connection issue with WebSocket STOMP in Spring Boot using ActiveMQ Artemis. Despite proper heartbeat configurations, the broker closes the connection with the following error:
AMQ229014: Did not receive data from /192.0.2.1:46748 within the 20000ms connection TTL.

Setup

  • Frontend: SockJS and Stomp.js, showing regular ping/pong exchanges in the browser console.
  • Backend: Spring Boot using StompBrokerRelayMessageHandler with custom ReactorNettyTcpClient for broker connections. Relevant configuration:
public void configureMessageBroker(MessageBrokerRegistry config) {
    int sendInterval = 10000; // Send interval in milliseconds
    int receiveInterval = (int) (sendInterval * heartBeatReceiveScale);
    config.setApplicationDestinationPrefixes("/app");
    config.enableStompBrokerRelay("/topic", "/queue")
            .setUserDestinationBroadcast("/topic/random")
            .setUserRegistryBroadcast("/topic/simp-user-registry")
            .setTcpClient(createTcpClient())
            .setSystemLogin(username)
            .setSystemPasscode(password)
            .setClientLogin(username)
            .setClientPasscode(password)
            .setSystemHeartbeatSendInterval(sendInterval) // Set heartbeat send interval
            .setSystemHeartbeatReceiveInterval(receiveInterval);
}

Logs confirm a CONNECTED frame with heart-beat=[10000, 10000].

Observations

  • Frontend pings/pongs appear consistent.
  • Backend logs indicate heartbeats are sent and received.
  • Connection closes after the TTL timeout (20 seconds).

Questions

  • How can I verify heartbeats are properly received by the broker?
  • Are additional Spring Boot or Artemis configurations required to prevent disconnections?

Steps to Reproduce

  • Configure WebSocket STOMP with a broker using the above setup.
  • Send/receive heartbeats with the specified intervals.
  • Observe connection closure in logs despite consistent heartbeats.

Additional Context
Backend logs:
DEBUG StompBrokerRelayMessageHandler : Received CONNECTED heart-beat=[10000, 10000] session=itwv0lto DEBUG StompBrokerRelayMessageHandler : Forwarding SEND /topic/simp-user-registry session=_system_

@v-perfilev
Copy link
Contributor

Hi @abhishek0499,

I've been playing with ActiveMQ-Artemis as relay broker but haven't been able to reproduce the error you described. It seems you might be using a custom configuration for Artemis because the default connection TTL is 60000ms and not 20000ms. Unfortunately you didn't share how you created your custom TcpClient.

This is my configuration:

  1. docker-compose for ActiveMQ-Artemis:
version: '3.8'
services:
  broker:
    image: apache/activemq-artemis:2.39.0
    ports:
      - "61613:61613"
      - "61616:61616"
      - "8161:8161"
  1. WebSocketConfig on Spring Boot 3.4.1:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

  private final String host = "127.0.0.1";
  private final int port = 61613;
  private final String username = "artemis";
  private final String password = "artemis";
  private final int sendInterval = 10000;
  private final int receiveInterval = 10000;

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry
        .addEndpoint("/ws")
        .setAllowedOrigins("*")
        .withSockJS();
  }

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
    config
        .setApplicationDestinationPrefixes("/app")
        .enableStompBrokerRelay("/topic", "/queue")
        .setUserDestinationBroadcast("/topic/random")
        .setUserRegistryBroadcast("/topic/simp-user-registry")
        .setTcpClient(createCustomTcpClient())
        .setSystemLogin(username)
        .setSystemPasscode(password)
        .setClientLogin(username)
        .setClientPasscode(password)
        .setSystemHeartbeatSendInterval(sendInterval)
        .setSystemHeartbeatReceiveInterval(receiveInterval);
  }

  private TcpOperations<byte[]> createCustomTcpClient() {
    TcpClient tcpClient = TcpClient
        .create()
        .host(host)
        .port(port);
    return new ReactorNettyTcpClient<>(tcpClient, new StompReactorNettyCodec());
  }
}
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-websocket'
    implementation 'io.projectreactor.netty:reactor-netty:1.1.21'
}

JS Client

import SockJS from "sockjs-client";
import * as Stomp from "@stomp/stompjs";

const socket = new SockJS('http://localhost:8080/ws');
const stompClient = Stomp.Stomp.over(socket);

stompClient.connect({}, (frame) => {
  stompClient.subscribe('/topic/random', (message) => {
    console.log('Received message: ' + message.body);
  });
}, (error) => {
  console.error('Error: ', error);
});

So... check your TCP client - maybe you've set a very short connection timeout. Or maybe the traffic is interrupted at the network level in your environment. You can display extended logs from your client by setting wiretap in your client:

TcpClient tcpClient = TcpClient.create()
    .host(host)
    .port(port)
    .wiretap("reactor.netty", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL);

and in your applicaton.yaml:

logging:
  level:
    reactor.netty: DEBUG

So you'll be able to check if heartbeat messages are sent correctly for both the Spring app and the JS client (ports for two will differ):

2025-01-04T17:17:32.916+01:00 DEBUG 13665 --- [actor-tcp-nio-3] reactor.netty                            : [bc92da25, L:/127.0.0.1:64607 - R:/127.0.0.1:61613] READ: 1B 
2025-01-04T17:17:32.921+01:00 DEBUG 13665 --- [actor-tcp-nio-3] reactor.netty                            : [bc92da25, L:/127.0.0.1:64607 - R:/127.0.0.1:61613] WRITE: 1B

Cheers,
Vladimir Perfilev

@bclozel bclozel added the status: waiting-for-feedback We need additional information before we can continue label Jan 4, 2025
@abhishek0499
Copy link
Author

Thank you for the response. I've tested the suggested approach with a single broker host-port configuration, and it works as expected. However, my use case involves multiple broker host-ports (e.g., tcp://host1:61613,tcp://host2:61613,tcp://host2:61613).

I implemented a custom TcpClient using a Supplier<SocketAddress> that:

  1. Attempts to connect to each broker
  2. Maintains a list of valid broker addresses
  3. Randomly selects one broker from the valid addresses

But when using multiple brokers, I'm still encountering the same TTL error as in my original issue.

I've attached my current implementation for reference. Could you provide guidance on how to properly handle multiple broker connections while avoiding the TTL errors?

private String[] brokerAddresses = "tcp://host1:61613,tcp://host2:61613,tcp://host2:61613"

private static SocketAddress selectRandomBroker(List<SocketAddress> validAddresses) {
        if (validAddresses.isEmpty()) {
            throw new ArtemisException("No valid Artemis broker addresses found.");
        }
        Random random = new Random();
        int randomIndex = random.nextInt(validAddresses.size());
        return validAddresses.get(randomIndex);
    }
 private TcpOperations<byte[]> createCustomTcpClient() {
        Supplier<SocketAddress> addressSupplier = new Supplier<>() {
            @Override
            public SocketAddress get() {
                if (brokerAddresses == null || brokerAddresses.length == 0) {
                    throw new ArtemisException("No Artemis broker URLs found. Please set 'artemis.broker.urls' property.");
                }
                List<SocketAddress> validAddresses = new ArrayList<>();
                for (String address : brokerAddresses) {
                    URI uri;
                    try {
                        uri = new URI(address);
                        LOGGER.info("Trying to connect to Artemis broker with URL {}", uri);
                        ConnectionFactory factory = new ActiveMQConnectionFactory(uri.toString(), username, password);
                        Connection connection = factory.createConnection();
                        connection.close();  // Close temporary connection
                        validAddresses.add(new InetSocketAddress(uri.getHost(), uri.getPort()));
                    } catch (JMSException e) {
                        LOGGER.error("Connection failed on: {}. Reason: {}", address, e.getMessage());
                        LOGGER.info("Retrying connection in 5 seconds");
                        try {
                            Thread.sleep(5 * 1000);
                        } catch (InterruptedException ex) {
                            Thread.currentThread().interrupt();
                        }
                    } catch (URISyntaxException e) {
                        throw new ArtemisException(e.getMessage());
                    }
                }
                if (validAddresses.isEmpty()) {
                    throw new ArtemisException("No valid Artemis broker addresses found.");
                }
                return selectRandomBroker(validAddresses);
            }
        };

        TcpClient tcpClient = TcpClient
                .create()
                .remoteAddress(addressSupplier) // Use the address supplier for random selection
                .wiretap(true); // Enable wiretap for monitoring TCP traffic

        return new
``` ReactorNettyTcpClient<>(tcpClient, new StompReactorNettyCodec());
    }
    

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jan 7, 2025
@v-perfilev
Copy link
Contributor

Hi @abhishek0499,

I just tested your configuration with multiple brokers, and it still works well on spring-websocket:6.2.1. Check maybe you're using an older version. That might explain the issue.

Anyway, it seems like the problem might not be with Spring but could be related to the network or brokers configuration. I would recommend investigating those areas first.

P.S. If I may say so, your "createCustomTcp()" feels a bit overkill for just implementing failover. I would use a simpler address supplier with round-robin (you can replace it with random), Netty will automatically attempt reconnections to the next address in case of a failure. So you can reduce the number of dependencies in your project. Here is an example:

private final SocketAddress[] brokerAddresses = new SocketAddress[]{
      new InetSocketAddress("hostname1", 616163),
      new InetSocketAddress("hostname2", 616163),
      new InetSocketAddress("hostname3", 616163)
  };

  private TcpOperations<byte[]> createCustomTcpClient() {
    Supplier<SocketAddress> failoverAddressSupplier = new Supplier<>() {
      int currentIndex = 0;

      @Override
      public SocketAddress get() {
        if (brokerAddresses.length == 0) {
          throw new IllegalStateException("TODO add exception message");
        }
        int nextIndex = currentIndex++ % brokerAddresses.length;
        return brokerAddresses[nextIndex];
      }
    };

    TcpClient tcpClient = TcpClient
        .create()
        .remoteAddress(failoverAddressSupplier);

    return new ReactorNettyTcpClient<>(tcpClient, new StompReactorNettyCodec());
  }

Don't forget to set right cluster-connection parameters in your brokers including message-load-balancing.

@abhishek0499
Copy link
Author

Follow-up: I've made some configuration changes but still experiencing issues with both TTL and message delivery.

Current workaround implemented:

  • Increased the heartBeatToConnectionTtlModifier to 10.0 in the broker XML configuration
  • Modified the acceptor configuration as follows:
<acceptor name="stomp">
    tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;heartBeatToConnectionTtlModifier=10.0
</acceptor>

New Issue Observed:
While the connection is no longer closing after the heartBeatToConnectionTtlModifier change, I'm now experiencing message delivery failures. Specifically:

  • The connection and session are confirmed present in Artemis
  • The session exists in the database
  • However, messages are not being delivered, with logs showing:
    [NotificationService] [ XNIO-1 task-3] o.s.m.s.s.StompBrokerRelayMessageHandler : No TCP connection for session a34trfys in GenericMessage [payload=byte[372], headers={simpMessageType=MESSAGE, contentType=application/json, simpSessionId=a34trfys, simpDestination=/user/a34trfys/queue/report}]

This suggests a disconnect between the broker's connection state and Spring's message handling.

I'd appreciate:

  1. Recommendations for optimal broker configurations to handle this scenario
  2. Any other specific configuration parameters that should be adjusted
  3. Best practices for handling STOMP connection TTL in production environments
  4. Guidance on resolving the message delivery issue despite active connections

This is the current code for connection to broker

@NotNull
 private SocketAddress[] brokerAddresses;

 @PostConstruct
 private void initializeBrokerAddresses() {
     // Initialize the array after the @Value fields are populated
     brokerAddresses = new SocketAddress[]{
             new InetSocketAddress(host1, port),
             new InetSocketAddress(host2, port),
             new InetSocketAddress(host3, port)
     };
 }

 private TcpOperations<byte[]> createCustomTcpClient() {
     Supplier<SocketAddress> failoverAddressSupplier = new Supplier<>() {
         private int currentIndex = 0;

         @Override
         public SocketAddress get() {
             if (brokerAddresses.length == 0) {
                 throw new ArtemisException("No Artemis broker URLs found. Please set artemis host properties.");
             }
             int nextIndex = currentIndex++ % brokerAddresses.length;
             return brokerAddresses[nextIndex];
         }
     };
     TcpClient tcpClient = TcpClient
             .create()
             .remoteAddress(failoverAddressSupplier);
     return new ReactorNettyTcpClient<>(tcpClient, new StompReactorNettyCodec());
 }

Current broker version: 2.39.0
Spring boot version: 3.4.1

@abhishek0499
Copy link
Author

abhishek0499 commented Feb 6, 2025

Hi @v-perfilev
I’m now able to establish the connection successfully, but I’m facing an issue when scaling my backend instances. When I increase the number of backend instances to more than one, I encounter the following error:
[NotificationService] [ XNIO-1 task-3] o.s.m.s.s.StompBrokerRelayMessageHandler : No TCP connection for session a34trfys in GenericMessage [payload=byte[372], headers={simpMessageType=MESSAGE, contentType=application/json, simpSessionId=a34trfys, simpDestination=/user/a34trfys/queue/report}].
However, when I use only one backend instance, everything works fine, and I’m able to receive messages without any issues.

public void onApplicationEvent(SessionConnectEvent connectEvent) {
        LOGGER.info("ConnectEventListener :: Session Connected");
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.wrap(connectEvent.getMessage());
        String emailId = headerAccessor.getFirstNativeHeader("emailId");
        String token = headerAccessor.getFirstNativeHeader("Authorization");

        try {
            Authentication authentication = tokenProcessor.authenticate(token);
            if (authentication == null) {
                throw new BadCredentialsException("Authentication Failed");
                }
            userSessionService.saveUserSessionDetails(headerAccessor, emailId);
        } catch (AuthenticationException | javax.naming.AuthenticationException e) {
            LOGGER.error("Authentication failed or error saving session details & the cause is {}", e.getMessage());
            throw new ArtemisException("Authentication failed or error saving session details", e);
        }
    }
public void saveUserSessionDetails(SimpMessageHeaderAccessor simpMessageHeaderAccessor, String emailId) {
        Optional<UserSession> userSessionOptional = userSessionRepository.findByEmailId(emailId);
        if (userSessionOptional.isPresent()) {
            LOGGER.info("UserSessionServiceImpl :: Deleting previous user session for user with email ID: {}", emailId);
            deleteUserSession(userSessionOptional.get().getSessionId());
        }

        UserSession userSession = new UserSession();
        userSession.setEmailId(emailId);
        userSession.setSessionId(simpMessageHeaderAccessor.getSessionId());

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("NotificationServiceImpl :: User with email ID: {} and session ID: {} connected", emailId, simpMessageHeaderAccessor.getSessionId());
        }

        userSessionRepository.save(userSession);
    }
@Service
public class NotificationService {

    private final NotificationRepository notificationRepository;
    private final UserSessionRepository userSessionRepository;
    private final SimpMessagingTemplate messagingTemplate;
    private final NotificationTemplateRepository notificationTemplateRepository;

    @Autowired
    public NotificationServiceImpl(NotificationRepository notificationRepository, UserSessionRepository userSessionRepository, SimpMessagingTemplate messagingTemplate, NotificationTemplateRepository notificationTemplateRepository) {
        this.notificationRepository = notificationRepository;
        this.userSessionRepository = userSessionRepository;
        this.messagingTemplate = messagingTemplate;
        this.notificationTemplateRepository = notificationTemplateRepository;
    }

    @Override
    public Optional<MessageVo> sendToSpecificUser(MessageVo messageVo) {
        String emailId = messageVo.getRecipientInformation().getHttpSessionUsername();
        Optional<UserSession> userSession = userSessionRepository.findByEmailId(emailId);
        Message message = new Message();
        message.setMessageType(messageVo.getMessageType());
        message.setMessageText(messageVo.getMessageText());
        notificationRepository.save(message);
        messageVo.setMessageId(message.getMessageId());
        String destination = "/queue/".concat(messageVo.getMessageType());
        if (userSession.isPresent()) {         
            messagingTemplate.convertAndSendToUser(userSession.get().getSessionId(), destination, messageVo, 
              createHeaders(userSession.get().getSessionId()));
            return Optional.of(messageVo);
        } else {
            return Optional.empty();
        }
    }

    private MessageHeaders createHeaders(String sessionId) {
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);
        return headerAccessor.getMessageHeaders();
    }
}

Tech Stack:

Spring Boot

WebSockets / STOMP / Artemis ActiveMQ (if applicable)

Running in a load-balanced environment with multiple instances

Any guidance on ensuring consistent message delivery across all instances would be greatly appreciated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: messaging Issues in messaging modules (jms, messaging) status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged or decided on
Projects
None yet
Development

No branches or pull requests

4 participants