Skip to content

Commit

Permalink
increase timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsilaghi committed Jun 24, 2024
1 parent 60fdee6 commit e0efc07
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</parent>
<groupId>edu.stanford.protege</groupId>
<artifactId>webprotege-ipc</artifactId>
<version>1.0.3</version>
<version>1.0.5</version>
<name>webprotege-ipc</name>
<description>Inter Process Communication framework</description>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private R handleResponse(Message rabbitResponse) {

if(exception != null) {
try {
logger.error("Found error on response " + exception);
logger.error("Found error on response {}. Action : {}" ,exception, rabbitResponse.getMessageProperties().getHeaders().get(Headers.METHOD));
throw objectMapper.readValue(exception, CommandExecutionException.class);
} catch (JsonProcessingException e) {
logger.error("Error ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public void dispatchEvent(Event event) {
message.getMessageProperties().getHeaders().put(PROJECT_ID, projectId);
}
eventRabbitTemplate.convertAndSend(RabbitMQEventsConfiguration.EVENT_EXCHANGE, "", message);
logger.info("Sent event message!");
} catch (JsonProcessingException | AmqpException e) {
logger.info("Could not serialize event: {}", e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public RabbitMQEventHandlerWrapper(List<EventHandler<? extends Event>> eventHand

@Override
public void onMessage(Message message) {
logger.info("Handling event with id {}", message.getMessageProperties().getMessageId());
EventHandler eventHandler = eventHandlers.stream()
.filter(handler -> {
String channel = String.valueOf(message.getMessageProperties().getHeaders().get(CHANNEL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public RabbitMqCommandHandlerWrapper(List<CommandHandler<? extends Request, ? ex

@Override
public void onMessage(Message message, Channel channel) throws Exception {
logger.info("Received message " + message);

var replyChannel = message.getMessageProperties().getReplyTo();
if (replyChannel == null) {
String errorMessage = Headers.REPLY_CHANNEL + " header is missing. Cannot reply to message.";
Expand Down Expand Up @@ -86,7 +84,6 @@ public void onMessage(Message message, Channel channel) throws Exception {
}

CommandHandler handler = extractHandler(messageType);
logger.info("Dispatch handling to {}", handler.getClass());
parseAndHandleRequest(handler, message, channel, new UserId(userId), accessToken);
}

Expand Down Expand Up @@ -171,10 +168,15 @@ private void authorizeAndReplyToRequest(CommandHandler<Q,R> handler,

private void handleAndReplyToRequest(CommandHandler<Q,R> handler, Channel channel, Message message, UserId userId, Q request, String accessToken) {
var executionContext = new ExecutionContext(userId, accessToken);
long startTime = System.currentTimeMillis();

try {
var response = handler.handleRequest(request, executionContext);
response.subscribe(r -> {
replyWithSuccessResponse(channel, message, userId, r);
long endtime = System.currentTimeMillis();
logger.info("Request executed " + request.getChannel() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}, throwable -> {
if (throwable instanceof CommandExecutionException ex) {
logger.info(
Expand All @@ -183,14 +185,23 @@ private void handleAndReplyToRequest(CommandHandler<Q,R> handler, Channel channe
throwable.getMessage(),
request);
replyWithErrorResponse(message,channel, userId, ex.getStatus());
long endtime = System.currentTimeMillis();
logger.info("Request failed " + request.getChannel() + "with error " + throwable.getMessage() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");
}
else {
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
long endtime = System.currentTimeMillis();

logger.info("Request failed " + request.getChannel() + "with error " + throwable.getMessage() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}
});
} catch (Throwable throwable) {
logger.error("Uncaught exception when handling request", throwable);
replyWithErrorResponse(message, channel, userId, HttpStatus.INTERNAL_SERVER_ERROR);
long endtime = System.currentTimeMillis();
logger.info("Request failed " + request.getChannel() + ". Time taken for Execution is : " + (endtime-startTime) +"ms");

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,17 @@ DirectExchange exchange() {
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(rabbitMqTimeout);
rabbitTemplate.setReplyTimeout(120000);
rabbitTemplate.setExchange(COMMANDS_EXCHANGE);
return rabbitTemplate;
}

@Bean(name = "asyncRabbitTemplate")
@ConditionalOnProperty(prefix = "webprotege.rabbitmq", name = "commands-subscribe", havingValue = "true", matchIfMissing = true)
public AsyncRabbitTemplate asyncRabbitTemplate(@Qualifier("rabbitTemplate") RabbitTemplate rabbitTemplate, SimpleMessageListenerContainer replyListenerContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyListenerContainer, getCommandResponseQueue());
AsyncRabbitTemplate response = new AsyncRabbitTemplate(rabbitTemplate, replyListenerContainer, getCommandResponseQueue());
response.setReceiveTimeout(120000);
return response;
}


Expand All @@ -105,6 +107,7 @@ public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory c
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(replyQueue);
container.setConcurrency("15-20");
return container;
}

Expand All @@ -121,6 +124,7 @@ public SimpleMessageListenerContainer messageListenerContainers() {
container.setQueueNames(getCommandQueue());
container.setConnectionFactory(connectionFactory);
container.setMessageListener(rabbitMqCommandHandlerWrapper());
container.setConcurrency("15-20");
return container;
}

Expand Down

0 comments on commit e0efc07

Please sign in to comment.